Large-scale networks are complex, dynamic systems with many parts, managed by many different teams. Each team has tools they use to monitor their part of the system, but they measure very different things. Before we built our own infrastructure, Magic Pocket, we didn’t have a global view of our production network, and we didn’t have a way to look at the interactions between different parts in real time. Most of the logs from our production network have semi-structured or unstructured data formats, which makes it very difficult to track a large amount of log data in real-time. Relational database models do not support these logs very well, and while NoSQL solutions such as HBase or Hive can store large amounts of logs easily, they aren’t readily stored in a form that can be indexed in real-time.
The real-time view was particularly critical when we moved 500 petabytes of data—in network terms more than 4 exabits of traffic—into our exabyte-scale Magic Pocket infrastructure in less than six months. This aggressive goal required our network infrastructure to support high traffic volume over a long period of time without failure. Knowing the traffic profile on the production backbone between AWS and our datacenter helped us detect anomalies. Mapping NetFlow data to our production infrastructure made it much faster to recognize the root source of problems. We developed the NetFlash system to answer the scale and real-time challenges.
NetFlash collects large volumes of NetFlow data from our network that we enhance with information specific to our infrastructure. We then use Elasticsearch (ES) to query these enhanced logs, and Kibana as a web UI for creating dashboards to monitor the queries in real-time.
Here’s an example so you can see the insights we are able to surface with this system. In the image below, you can see that the network traffic in several clusters momentarily drops without warning at nine in the morning:
Before NetFlash, on-call engineers would have a much harder time diagnosing issues as they were happening. In our previous system, it could take many hours to query the network data from an event like the one we’ve just seen. Now, thanks to this chart, it’s just a matter of sending a quick message directly to the right team, and getting things back on track.
Simple, right? Actually there was quite a bit we had to do to make this all work in production.
First, a few definitions so you understand the different pieces of the system and how they are affected by the massive scale of our infrastructure.
NetFlow is an industry-standard datagram format proposed by Cisco. A NetFlow datagram contains information like source IP addresses, destination IP addresses, the source and destination ports, IP protocol, and the next hop IP address. In principle, if we gather NetFlow logs from all of our routers, we have a concise view of where our network traffic data is coming from, where it’s going, and any hurdles it encounters along the way.
Elasticsearch is an open source distributed search service. It is the most popular enterprise search engine that powers well-known services from Microsoft Azure Search to the full archive of The New York Times. In large deployments like ours, ES is running on multiple clusters to enable real-time querying of large amounts of data.
Kibana is an open source project that enables data visualizations (like the samples above) from the content indexed by an Elasticsearch cluster.
We found this combination would allow us to monitor our network data in real-time at scale. Dropbox generates roughly 260 Billion NetFlow datagram records every day. That’s terabytes of aggregated data about our data flows. In our first implementation of NetFlow collection, we stored the logs to our data infrastructure in Hive/Hadoop clusters, and analyzed them with HiveSQL queries. We still use this data pipeline for permanent storage, but it’s slow. New data isn’t available to query for somewhere between two and twelve hours after it’s collected, due to the nature of our data ingestion pipeline. That’s fine for long-term or historical data analysis, but makes real time monitoring impossible.
To enable real-time queries, we built a solution in parallel with our existing data pipeline. Before we dive into the details, here’s a look at the architecture below:
Dropbox collects NetFlow datagram records every day, from production backbone, edge, and data center routers. Each of these routers sends NetFlow datagrams to two different collectors which are distributed geographically. The copy serves as a backup to the original packet, just in case a collector fails.
In the NetFlash system, the collectors now send the processed datagrams to two data pipelines: the HDFS/Hive clusters for permanent data storage, and the new Kafka/Elasticsearch cluster that gives us a near real-time view.
Let’s drill down a bit further into the details of our data pipeline:
We worked on three key components: the collector, the search backend, and the Kafka pipeline.
The collector
There are about six collectors at each data center, and each collector receives billions of log entries a day. So we optimized the performance of our collectors in a few ways. We chose high-performance servers with lots of RAM. The collector code is written in Go and designed to make use of multi-core processors. And we found that increasing the I/O buffer size is really helpful for reducing packet loss at the collectors.
The raw datagrams themselves aren’t all that useful unless you can associate the IP addresses with meaningful information. The IP is the key to these other attributes, but with so many queries per second it is impossible to query the production database each time. So we built a local DB cache, where the IP is linked to machine, router, load balancer, and GeoIP attributes. The processors inside a collector annotate each raw diagram with a lookup to this local cache. After annotation, each data record contains server names, project names, server owners, roles, locations and more. Those attributes will be the index tokens on the search backend. We send these enriched logs to both the Hive and Elasticsearch pipelines.
Kafka cluster
LogHub
Elasticsearch cluster
On the backend, our best solution for real-time document search is Elasticsearch(ES), but deploying it at scale proved to be the challenge. At first, we set up one master node and three data node clusters for prototyping. It crashed almost immediately under our data load!
In order to clear this hurdle we’ve made many improvements:
- We expanded our cluster to three masters and ten data nodes.
- We send bulk requests to ES, with each request containing more than thirty logs, which reduces our ES I/O load.
- We only keep the last eight days indexed; expired data shards are deleted daily.
- We register ES data nodes into a NetFlash Zookeeper quorum, and use a connection pool to send logs to ES. The connections in the pool are re-used, and requests are evenly distributed among data nodes.
- We tuned indexing parameters to improve indexing performance, and we only ask ES to keep one copy of its indexed data, instead of three.
- We downsampled the logs by a factor of four at the collectors for our real-time pipeline, and keep everything on the permanent data store.
What we learned
We’re very happy with the current production setup for ES. The failure of data nodes are now invisible to internal users, and this configuration requires no extra effort or maintenance from us.
The ES cluster was a bit fragile at first, so we deployed a few tricks to improve stability. We observed that ES masters could be too busy to re-allocate indexing shards among data nodes, causing some data nodes to lose their connections from the master. The masters would then attempt to re-allocate the lost shards from lost nodes to the rest of their live nodes, a cycle that would repeat until the entire cluster crashed.
To solve this, we deployed more than twice as many data nodes as shards. So for 10 shards of indexed data, we’d allocate 22 nodes—two copies of each shard, and then two free nodes. If there are any node failures, there will always be an available node with a copy of the shard from the failed node that can replace it seamlessly. At the point of failure, the duplicate node replaces the failed node, the shard from this node is copied to one of the free nodes as a new backup, and then the failed node is released from the cluster. None of the other data nodes are affected by the failure. We upgraded ES to 2.0.0, and plan to use HDFS as an index backup in case the cluster enters a turbulent state. This will also give users the ability to search time period data without that eight day limit.
Our small team wouldn’t have been able to complete this project without working together closely with the people using the system. Kibana’s query format is not the SQL-like format that network engineers are used to working with. Our partners in neteng facilitated adoption of the tool by saving the most useful queries, graphs, and dashboards for their teammates. To make the data relevant to the network engineers, they also re-indexed tokens so that the frequency of tokens is calculated as a bps (bit per second) rate for every 5 min., to match network terminologies.
Results
Thanks to these efforts, we can offer instantaneous access to our network data, complete with informative dashboards that show the traffic matrix at the cluster and metropolitan levels. The graphs at the beginning of this post are examples of how we monitor backbone flows for our block storage system, one of our largest traffic producers.
We also leverage our NetFlow data stream to help us make intelligent business decisions. By watching the flow of data on our networks across the globe, our physical infrastructure team can more easily understand where to deploy new hardware where it’ll have the most impact. We can also see the markets that are potentially underserved, and begin planning expansions in ways that are efficient, and cost effective.
Without our NetFlash pipeline, gathering this data takes a lot of time and effort. Now, this kind of intelligence is more readily available to decision makers at Dropbox. This means that capacity planning is always informed with a bird’s eye view of our network landscape, helping us deploy our resources effectively. The real-time aspect of our implementation isn’t required here, but the speed helps our production engineers and capacity planners get immediate answers to many of their questions. Knowing where our traffic spikes and when is also helpful for selecting other networks to establish peer relationships with.
The monumental task of data migration is behind us, but NetFlash continues to help us monitor high-volume projects. What’s exciting now is to think about what else we can do with this pipeline in the future.
Solving the scaling problems was the hard part—building new applications is easy. And one of the advantages of building tools at scale is the ability to adapt those tools for other purposes. We’re now using NetFlash to monitor ongoing operations, including marketing analytic logs, smart sensor data for analysis, and production operation logs. We can now adapt what we’ve built to collect, enrich, search, monitor, and analyze any kind of log data on our systems in real-time.