How to Write a Better Scribe

// By Patrick Lee • May 20, 2015
Like many companies, Dropbox uses scribe to aggregate log data into our analytics pipeline. After a recent scribe pipeline outage, we decided to rewrite scribe with the goals of reducing operational overhead, reducing data loss, and adding enhancements that are missing from the original scribe. This blog post describes some of the design choices we made for the rewrite.

Scribe Basics

This section describes the scribe pipeline with respect to how it is setup at Dropbox (we suspect most companies deploy/use scribe in similar fashion).  Feel free to skip this section if you are already familiar with scribe.

A scribe server can be thought of as an input-queued network switch which performs routing based on the message’s category.  The server’s configurations, including its upstream locations, are loaded on startup from an xml-like config file.  The upstreams may be either other scribe nodes, hdfs (or local file), or a sink which drops all messages.

Scribe’s primary client interface is

Log(list of (category, message)-tuples) -> status

When a client sends a batch of messages to a scribe server, the scribe server forwards each message in the batch to the corresponding upstream and returns “ok” to the client to indicate success.  If the message batch is throttled, the server returns “try again” to client and the client should retry sending at a later time.

Scribe nodes are typically configured into a fan-in tree topology.  At Dropbox, we deploy scribe nodes in a 3-tier (leaf, remote hub and central hub) fan-in tree configuration:

Application (production services, scripts, etc.) scribe clients are topology-agnostic and can only talk to the local leaf node running on the same machine (i.e., Leaf nodes run on every machine in our fleet).  Each leaf scribe node is configured to randomly forward log messages to a small subset of remote hub scribe upstreams.

The remote hub tier is our primary message buffering tier.  We want to minimize buffering on the leaf tier since leaf nodes runs on the serving machines and thus compete with serving systems for memory and disk IO.  Remote hub scribe nodes will forward messages to the correct central node based on a hash of the message’s category.

The central hub tier is responsible for appending message into HDFS files; messages for the same category are appended to the same file.  HDFS does not support multiple writers appending to the same file. This implies that each category can only be written by exactly one central hub node.  This limitation also means that every central node is a single point of failure and is a bottleneck for categories with high write throughput.  To reduce potential data loss due to central hub node crashes, our central hub tier is configured with a very small memory buffer and no disk buffer.  To improve write throughput, our application scribe client shards known high-throughput categories by appending shard suffixes to these categories.  Our analytics pipeline then merges the sharded categories back into a single category.

Since parts of Dropbox’s serving stack runs on EC2, while our analytics pipeline runs entirely within our datacenter, we have to transfer EC2 logs into our datacenter. For this purpose, an extra EC2 remote hub tier is setup to forward logs into our standard scribe fan-in tree. Note that all traffic between EC2 and our datacenter goes through secure tunnels.

How Can We Make Scribe Better?

With the basics out of the way, let us dive into some key issues with the original scribe.  We will henceforth refer to the original scribe as OldScribe and the rewrite as NewScribe wherever the distinction matters, and simply scribe if there is no distinction.

1. Config management:

Problem: OldScribe’s configurations are stored in xml-like config files.  Whenever configuration changes, we need to restart nodes to pick up the changes.  While this approach works well for tens, maybe even hundreds, of nodes, this approach is unsustainable for tens of thousands of nodes.  In particular, it is difficult to ensure the entire fleet is running on the most up-to-date configurations, and it is unpleasant to restart nodes on the entire fleet.  This is especially problematic for leaf nodes since adding remote hub nodes requires manually modifying and syncing the leaf’s configuration before restarting all of them.

Solution: NewScribe solves these issues by storing the configurations in zookeeper.  Whenever configurations change in zookeeper, NewScribe will pick up the changes and reconfigure itself automatically (without restarting).  NewScribe also supports service discovery; this eliminates the need to manually update configuration whenever the scribe upstream changes.

2. Throttling:

Problem: OldScribe’s throttling occurs on ingress.  When a message batch comes into OldScribe, OldScribe will look for category queues associated to the message batch; if any of the queues is full, then the entire message batch is rejected and the OldScribe will ask the downstream client to “try again”.  This scheme works well when the message batch contains only a single category.  However, when the message batch contains multiple categories, this scheme can cause unintentional head of line blocking (assuming the downstream client retries).  Worst yet, if the downstream client ignores the “try again” directive, the messages are lost forever.  Also, note that this scheme is very wasteful since a lot of network bandwidth and cpu are wasted on retries.

Solution: NewScribe’s throttling occurs on egress.  It always accepts incoming messages and attempts to buffer as much as it can until the category’s queue is full; this eliminates accidental head of line blocking.  To ensure upstreams are not overwhelmed, each category’s upstream writing is rate limited by a leaky bucket.

3. Queuing:

Problem: OldScribe treats each category’s memory buffer and disk buffer as a single logical queue.

Solution: NewScribe treats the memory buffer and disk buffer as individual queues.  Message ordering is not preserved.

When a message enters the system, NewScribe will try to put the message into the memory queue first.  If the memory queue is full, then NewScribe will try to put the message into the disk queue (if that fails, the message is dropped). The upstream writer will grab messages from both queues and will send messages upstream when at least one queue has messages.  This allows some messages to bypass the disk queue which reduces disk IO stress, and allows the upstream writer to continue processing the memory queue at full speed while the disk queue pauses to load more messages.

NOTE: message ordering does not matter too much since the analytics pipeline must handle message reordering regardless of how scribe writes the messages.

4. Single point of failure:

Problem: Currently, our application scribe client assigns each message’s category with a shard suffix. The OldScribe remote hub will route the message to a specific OldScribe central hub based on the hash(category with suffix).  Since the routing is predetermined by the application scribe client, whenever an OldScribe central hub dies, messages destined for that central hub will get backlogged until we replace that node.

(Partial) Solution: For sharded categories, NewScribe ignores the shard suffix provided by downstream and assigns each upstream message batch with a new shard suffix; the shard suffixes are round robin, hence we’re rotating upstreams at the same time (and thus ensuring progress in face of single upstream node failure).  For unsharded categories, single point of failure remains an issue (This is partly the motivation for migrating to Kafka; see below).  NOTE: we can “fix” the single point of failure by forcing each category to have at least 2 shards.

5. Disk buffering:

Problem: OldScribe creates a new file every time it flushes to disk.  The un-checksummed files are all written into the same directory.  When it tries to upstream the disk buffer, it loads the entire file and sends the entire file as a single message batch.  This is problematic since it creates a ton of log files in a single directory, which makes the os unhappy.  Whenever file corruption occurs, OldScribe may enter into an endless crash loop.

Solution: NewScribe uses checksummed logs with rotations/checkpoints to handle writing to/reading from disk queues.  Each disk queue is composed of a reader thread and a writer thread.  The reader thread will only operate on immutable log files, while the writer thread will operate on a single mutable log file at any given time.  Mutable log files are rotated (and become immutable) on one of two conditions:

  • the reader thread is starving
  • the log file has reached a certain size

Each category’s logs are written into a different subdirectories.

6. Alternative storage:

Problem: Our analytics team want to replace HDFS with Kafka to take advantage of newer analytics tool such as Storm.  To support that in OldScribe, we will need either a scribe-to-kafka shim server or a server that tails HDFS; both solutions are unattractive since they both introduce yet another server into our already complex ecosystem.

Solution: NewScribe natively supports writing to kafka upstreams.

Architecture Comparison

For comparison, here are the architecture block diagrams for OldScribe and NewScribe.

OldScribe’s Architecture

As mentioned previously, OldScribe’s architecture resembles a simple network switch.  Its (logical) architecture is as follow:

Each category runs in its own thread (The individual components within a category are just library calls to the common abstract Store interface).  Notice the lack of control plane since routing configuration are loaded during startup.

NewScribe’s Architecture

NewScribe’s architecture also resembles a network switch.  Its architecture is as follows:

Control Plane:

  • Config manager: listens to zookeeper for any configuration changes and notifies category config updater when changes occurs
  • Upstream manager: keeps track of (scribe / kafka) upstream connection pools.  It also keeps track of discoverable scribe upstream services, and notifies category config updater whenever upstream services change.
  • Category config updater: updates all registered categories’ throttling, buffering, and upstream configurations whenever it receive an notification from either the config manager or the upstream manger.

Data Plane:

  • Memory queue: is just a standard thread-safe in-memory queue
  • Disk queue: is composed of a background writer thread which writes messages to log files, and a background reader thread which actively loads messages from logs files.
  • upstream writer: is responsible for pulling events from both memory and disk queues and sending the messages upstream(s).  The upstreams are configured by the control plane.

While we cannot open-source our rewrite at current time because it is tightly coupled to Dropbox’s internal infrastructure, we hope this post provides enough details for you to re-implement your own.

Per usual, we’re looking for awesome engineers to join our team in San FranciscoSeattle and New York, especially Infrastructure Software Engineers and Site Reliability Engineers!

Contributors: John Watson, Patrick Lee, and Sean Fellows

// Copy link