A Constraint Based Approach to Kafka Partition Balancing

I wrote this with the primary goal of playing around with notation for modeling concepts in a Kafka cluster. Please don't take it too seriously. If you're not interested in having fun with mathy notation to represent Kafka, the TL;DR is that individually using replica count and size on disk to balance a cluster is (very) roughly equivalent to balancing on throughput.


Apache Kafka is a distributed and decentralized publish and describe system. To reduce contention, Kafka uses the concept of partitions to allow minimal coordination between the production and consumption of messages. However, due to the nature of partition growth, an unbalanced state may be created during normal use. This can lead to performance loss and underutilization. In addition, since future use is difficult to predict, newly created sets of partitions may not be optimally placed. This paper will attempt to formalize the concept of partitions and balance within a Kafka cluster, then introduce a constraint based formula to achieve balance in practice.

Problem Statement

Typical usage of Kafka consists of a cluster B which includes multiple brokers b. Each broker is a process running on an operating system such as Linux. A broker abstracts a set of queues (partitions) associated with a topic.

The leader and follower replicas for a partition is known as a replica set. Multiple replica sets logically become a topic when grouped into an ordered list. For the purposes of this paper, each replica consists of a leader replica and zero or more follower replicas. In practice, the partition may not have a leader elected and replicas may not be in-sync. However, for the purposes of this paper, we’ll be ignoring most ephemeral failure modes.

The capacity of each broker is defined by the packing of item over bin (we assume a bin packing approach by intuition). Initial placement of each replica on the brokers in the cluster can be modeled as random. Once a replica has been placed, properties of the replica, such as disk capacity, will change over time as messages are encountered. In addition, failure modes will skew the distribution of replica placement across all brokers in the cluster. Our goal is to minimize the use of resources at a single point in time.

Basic Model

We relate the basic model[1] for bin packing to partitions in a cluster. Let be the set of partitions. Each partition is associated with the capacity and each broker is associated with the capacity . For each queue and each broker , a 0-1 membership variable is defined. If variable is , is placed on broker , otherwise the variable is set to . To show the finite nature of a broker, is introduced as a 0-1 variable. If is at capacity, the variable is set to , otherwise, it is set to .

These initial constraints will the the foundation we’ll use to develop practical objectives. The first constraint ensures that the broker capacity is not exceeded. The second constraint ensures that each partition replica actually placed. Finally, the third constraint ensures that a replica is not collocated on a broker with other members of its in-sync set. In a future work, this may be extended to rack and cage capacity.

To reduce quadratic complexity, it is important to note that capacities may be interchanged (depending on similar attributes) to allow for membership symmetry.


Although the cardinality of broker replica members has been used in the past as a constraint, this may be harmful on occasion as it does not reflect a response to an actual capacity limitation. For example, in practice, a high number of replicas may cause issues with file descriptor depletion, but this can be generally mitigated through tuning operating system parameters.

When balancing a cluster, three interrelated criteria should be used instead: replica disk capacity, per-broker network capacity, and topic retention policy. We assume that topics have reached a reasonable steady-state and retention policy is actively reached and applied. As all replicas should eventually approach steady-state, repeatedly applying a solution to the objective is still effective.

Topic retention policy and disk capacity are intertwined. Consider the case of a replica with time-based retention with an interval of and disk capacity . We can then determine the throughput of the replica with . Comparing this with the per-broker network capacity then allows us to build on the basic model to create a common capacity limitation constraint.


In this post we have presented a model as well as a set of four simple constraints that can be used to generate solutions to the replica balance problem using traditional linear programming techniques. We have determined that some existing methods, such as broker replica cardinality, are likely ineffective in achieving these goals. In the future, we may extend this model with a partition-replica consumption constraint.

[1] Régin, Jean-Charles, and Mohamed Rezgui. "Discussion about Constraint Programming Bin Packing Models." AI for data center management and cloud computing 24 Aug. 2011.

A quick comparison of Mesos and Yarn

I've recently done some work on a very rough prototype of a Mesos scheduler for Samza. While going through the paces to get this working, I've noticed a few similarities between Mesos and Yarn that might be worth talking about.

The high-level architecture of both Yarn and Mesos are basically the same. Both resource managers have a master-slave architecture (both support a leader election via Zookeeper). In addition, both have adopted an SMPD MPI rank-0 style job control (the Scheduler and AppMaster) with job steps (tasks) managed by the rank-0. Here's a little diagram I've whipped up to compare similar functionality between the two.

Starting from the top to the bottom, we first have the ResourceManager and mesos-master. This is where the state of the cluster is abstracted -- both have the ability to operate with the cluster state in Zookeeper, a replicated CP key-value store.

Below this, we have the rank-0 job abstraction, which is known as a Scheduler in Mesos land, and an Application Master when working with Yarn. Of note is that Yarn has the ability to restart a failed Application Master from the ResourceManager. However, Mesos seems to depend on a separate meta-scheduler (such as Marathon or Aurora) to restart a job.

Finally, at the bottom of the stack, the NodeManager and mesos-slave are compute node daemons that start the job step management. With Mesos, the job step management is known as the executor. With Yarn, it's known as the container. Both of these job step managers handle the fork/exec of the actual job step (task).

Although the architecture of Yarn and Mesos are very similar, there's a key difference in the way resources are allocated.

The Application Master and Scheduler have different models to gather resources from the master/manager. Most traditional resource managers, such as Slurm and Torque, follow the same design as Yarn. However, Mesos takes a different approach which was originally inspired by Google's Borg (see the Omega paper for details). You can think of the difference by considering how the resources are allocated by a job.

As you can see in the diagram above, Mesos follows a push model, while Yarn follows a pull model. From what I can see, a pull model is better for job submission throughput, while a push model is better for scalability across tens of thousands of servers.

So, when considering if you should use Mesos or Yarn, you should ask yourself if you have several large datacenters with long running services. If so, you should consider Mesos. If you have a ton of small batch jobs (like map-reduce jobs), Yarn may be a better choice.

Operating Apache Samza at Scale

This is a repost of an article I wrote for LinkedIn's engineering blog. To see the original, please visit: http://engineering.linkedin.com/samza/operating-apache-samza-scale

At LinkedIn, we use a log-centric system called Apache Kafka to move tons of data around. If you're not familiar with Kafka, you can think of it as a publish-subscribe system that models each message as a log entry.

However, it's difficult for a developer to spend their time thinking about a problem domain relevant to their application in addition to the complex fault tolerant semantics required to work with the log (Kafka's ISR is up there with Paxos in terms of understandability).

What is Samza?

Apache Samza is a framework that gives a developer the tools they need to process messages at an incredibly high rate of speed while still maintaining fault tolerance. It allows us to perform some interesting near-line calculations on our data without major architectural changes. Samza has the ability to retrieve messages from Kafka, process them in some way, then output the results back to Kafka. A very simple Samza application looks a bit like this:

Although Samza is designed to work with many types of streaming systems, at LinkedIn we primarily depend on Kafka for message streams. So, before diving into how we run Samza at LinkedIn, let's start with Kafka and some related terminology.

Kafka Lingo

Every day at LinkedIn, Kafka is ingesting over half a trillion messages (about 500 terabytes). You might be picturing a massive cluster that magically handles this number of messages without falling over, but that's not really accurate. What we really have is a graph of clusters that each handle specific types of messages. We control the flow of messages from cluster to cluster by mirroring messages through the directed graph of clusters.

To have the ability to fail out of data centers, we usually provision our clusters with a topology that keeps messages around when a data center goes completely offline. In each datacenter, we have a concept of local and aggregate Kafka clusters. Each local cluster is mirrored across data centers to the aggregate cluster. So, if one datacenter goes down, we can shift traffic to another without skipping a beat.

Each horizontal layer in the topology is what we refer to as a "tier". A tier is the set of clusters across multiple data centers that perform the same function. For example, in the diagram shown above, we have a "local tier" and an "aggregate tier". The local tier is the set of all local clusters in every data center and the aggregate tier is the set of all aggregate clusters in every datacenter.

We also have another tier that we refer to as the "offline aggregate tier". The production aggregate tier is mirrored to the "offline aggregate tier" for batch processing (for example, a map-reduce job that utilizes Hadoop). This allows us to separate production and batch processing into different datacenters.

Since having a single local cluster or a single aggregate cluster within a datacenter would result in huge clusters with reliability risk, we split up the local cluster into several clusters, each for a different category of data. For example, application metrics (such as a message that contains a sample of CPU usage) and tracking events (for example, an advertisement impression event message) are sent to different clusters, each with their own local and aggregate clusters. Kafka could probably handle a single huge cluster for every type of message we throw at it, but splitting things up gives us a better chance of elegantly dealing with complex failures.

We also have another term which we refer to as a "pipeline". A pipeline is the path that a message travels from source to destination. For example, here's how a "metrics pipeline" might work. First, a production application produces a message that says something like 35% of memory is being used on a host. That message will be sent by the application to the production metrics local tier. Then, the message will be mirrored to the production metrics aggregate tier, then again to the aggregate metrics offline tier. Finally, the metric will be consumed by the long term storage system designed to display pretty graphs and emit alerts for employees. This path from producer to consumer across all datacenters is what we refer to as a pipeline. Many messages share common pipelines, so it's a nice way to quickly describe where a message is going or where it came from.

So, how does Samza fit into this? We have another set of Kafka clusters dedicated to Samza jobs that are mirrored from the aggregate tiers. Each of these Kafka clusters for Samza has an associated cluster that runs Apache YARN for launching Samza jobs.

You may notice in the diagram above that Kafka and Samza do not share the same hardware. In the past, we colocated both Kafka and Samza on the same hardware, however we ran into issues when Samza jobs would interfere with Kafka's page cache. So, for now, we've separated them onto different hardware. In the future, we'd like to explore the locality of Samza and Kafka (for example, a task and the paired partition leader or partial ISR would share the same rack), but this is what works well for us at the moment.

Resource Management

Although the (very) modular design of Samza should make it simple to support almost any resource manager in the future, YARN is currently the only option at the time of this writing. At job launch, each Samza job links into YARN's Application Master API. Through this API, Samza works with YARN to request resources (containers) to run Samza tasks in. When a task fails, the Samza job negotiates with YARN to find replacement resources.

So, how do we get to the point where we're starting up a Samza job to run under YARN? Over time, LinkedIn has built internal tooling designed to simplify and automate software building and testing, so we utilize this to write, build and deploy Samza jobs.

First, a developer creates a Samza project. LinkedIn's tooling then creates a version control repository (git or subversion, depending on developer preference). Every time they commit to this repository, automated tooling builds the project using Hudson to create a Samza job package. After tests and other post commit tasks are finished, this job package is then automatically versioned and uploaded to our Artifactory binary repository. Once the job package is available in Artifactory, other tools then install a small set of files onto the same host as the YARN ResourceManager. One of these files is a script that tells YARN to download the job package from Artifactory, unpack it into the job's working directory, and to launch the Samza Application Master.

Metrics, metrics, and more metrics

Once the job is running under YARN, our job isn't over yet -- it's time to monitor everything. We have the luxury of using something we call "inGraphs" to accomplish this. inGraphs is very similar to the open source Graphite with a backend based on RRD, Whisper, or InfluxDB. When a Samza job starts up at LinkedIn, by default, metrics are produced to Kafka and consumed by inGraphs.

Since Samza jobs can easily produce tens of thousands of metrics per second, we've introduced a default metrics blacklist that keeps things a bit quieter until an engineer actually needs the data. Holding onto historical metrics and making it easily accessible (everything from young generation size to process callback latency) is an extremely valuable tool that shouldn't be overlooked.

While being able to visualize metrics is useful, it's even more important to have the ability to create alerts and take actions based on those metrics. We have another application at LinkedIn which we call "AutoAlerts". AutoAlerts has a language (mostly YAML) for alerting on metrics which is similar in functionality to the alert related features in Riemann or Cabot.

For most alerts, we just have a minimum and a maximum threshold defined. For example, in the screenshot below, we have a threshold of 100 messages per second (per task) defined as the minimum threshold. If the job throughput on any task goes below this value, the appropriate action will be taken (automatic remediation or on-call notification).

Every alert we create has an escalation path. Some alerts simply send an email for things that don't need immediate attention, such as a minor hardware failure that successfully failed over. Other alerts, such as those for a non-responsive load balancer would be escalated to the central network operations center who would then engage the appropriate person on-call for the service.

The metrics we monitor vary. For example, we have thresholds set on the Application Master heap size. If the heap size falls below a certain threshold, we know there's a good chance the job is having trouble. We also have several alerts for YARN, including the number of unhealthy NodeManagers, JVM metrics, and number of active Resource Masters (in regards to high availability). We also graph out the number of jobs which are killed by YARN or have failed for any other reason.

Outside of Samza, we also monitor disk (IOPS and bytes available), CPU, memory, and network usage (local interfaces, rack switches, and all uplinks). We also have a few hardware-specific alerts, such as boolean alerts for the health of SSDs.

One particular type of alert I'd like to emphasize is the capacity alert. We have thresholds set which are based on historical growth and lead time to order hardware. When this alert goes off, we know that it's time to plug in our current capacity and growth projections into a model that provides an estimate on how much hardware we need to order. Since we're currently limited by memory and have very rapid growth, we aim to keep 50% of memory in use by Samza jobs at all times (in other words, we double the size of all clusters every time we order hardware). As the platform matures and growth slows relative to the total size of all clusters, we expect to slowly increase this target from 50% (probably settling at around 85% with peak traffic while still having 15% for unexpected growth).

In addition to the metrics we monitor, we also graph out a ton of other data points without setting thresholds (as many as our monitoring system can handle). When a mysterious or subtle problem is encountered, having the ability to quickly skim through thousands of data points at a glance often uncovers the root of the problem, or at least points us in the general direction of what we need to investigate further.

Hardware configuration

When we order new hardware for Kafka brokers, we use the standard LinkedIn Kafka configuration. Our Kafka brokers for Samza are exactly the same as our regular brokers (except for relaxed thresholds on a low number of under-replicated partitions).

So far, at current scale, we've found that our jobs are usually memory bound. So, for nodes that run Samza tasks, we primarily look for a healthy RAM to core ratio (we currently aim for 12 cores, 2 hardware threads per core, and 64GB of RAM per server). Moving forward, we expect this to change -- likely with network as the next limiting factor.

For jobs that utilize Samza's key-value store, we typically use a PCI-E based SSD with a terabyte or two of available space. Unlike the SSDs on our Zookeeper clusters, we typically don't worry about GC pauses as long as overall throughput remains high.

Moving forward

Although we haven't had many stability issues with Samza, we'd like to make this entire system more robust as we grow larger. One thing we'll be improving soon is how we handle job and task isolation. We've experimented with cgroups and namespaces in various forms, but have yet to settle on a path that makes us completely happy (mostly when it comes to disk isolation).

Another area we'd like to explore is a self-service model where developers have minimal interaction with reliability engineering when they create new Samza applications.

So, that's it for my overview of how we operate Samza at LinkedIn. We've had a significant amount of success in working with Samza and plan to use it for many new tasks in the upcoming year.

Thinking Out Loud - Consensus Quorum Bootstrapping

Note: You may want to read the the Paxos paper, the Raft paper, the Chubby paper, the Spanner paper, and the F1 SQL layer paper before trying to read this -- it won't make sense unless you're familiar with the concept of multiple consistency domains.

The following is a simple algorithm for bootstrapping a Raft consensus quorum in a multiple consistency domain system (which utilizes a location quorum for discovering relationships between replicas and namespaces). Also, I mention a mistake I made in my initial implementation which led me to this design.

First, let's cover what the goals of bootstrapping a consensus quorum group actually means. For one thing, we need to satisfy (2f+1) for failures. So, for a minimal quorum, we want our bootstrapped group to handle a single failure. We don't need to go higher than a single failure in the bootstrap phase since we can add additional follower replicas later on with proper consistency across a term change. Because of this, the bootstrap process will always be based on creating a group with exactly three nodes (two followers and a single leader).

The first mistake.

Early on in the development an implementation of this, the assumption was made that I should first bootstrap a leader, then tell the leader what followers are attached to it. This ended up being difficult and unwieldy. It is much cleaner to utilize the consensus voting to bootstrap replicas in a follower state so they naturally become leaders through a proper candidacy. This avoids the difficult task of manually configuring state for a replica to directly come into existence as a leader. The current bootstrap process is the result of learning from the mistake of attempting to directly bootstrap a leader.

The current design.

First, we bootstrap an idle follower. The definition of "idle" in this case is the lack of (2f+1) satisfaction as well as an empty remote hook array (outgoing socket state). The only operation an idle follower will perform is to wait on join follower messages or shutdown messages.

The next step is to join an additional follower replica to the idle follower. When this happens, a join response message is sent back to the new idle follower with any replicas that the original idle follower knows about (zero in this case). The original follower also attempts to send a join message response to any known replica hooks (still zero in the case of the original follower). Now, we have two idle followers with knowledge of each other.

When a third follower joins the initial two idle followers, we can finally satisfy (2f+1::f==1). The third follower will send a join request command (control join). It can send this command to either of the two existing idle followers. When the first or second idle follower receives this message, it will send out a control join response to any hook replicas it already knows about (while avoiding a loop by only sending a message with three replicas one time). Once this happens, each follower will have three known hook replicas. Since this satisfies our requirements for a quorum, the idle followers will then independently know to leave the bootstrap state at this point and begin their election timers. Eventually, one follower will win a candidate term and begin a leader term through the regular consensus failover process.

A finishing touch.

However, we're not done yet. Once a leader is elected, it will check to see if it has knowledge of the namespace it should write into. If the namespace does not exist, the leader will not respond to data messages and will only respond to control bootstrap leader namespace messages. Once the leader is told what namespace it's using, it will wait for the location quorum registration message (data write) to successfully respond before processing client data commands. The exception to this process is the bootstrap of the location quorum leader itself which will simply use the provided bootstrap namespace when it satisfies the criteria of a location quorum namespace pattern.

An Absurd Bitcoin History

Bitcoin? How does that work. They're like coins, but they're made of bits! Many people have rambled on how bitcoins may or may not change the world. I'm not here to talk about that (not much anyway). I'm going to talk about the details of how the math works. Mmmm, sweet, sweet details. We're going for a ride into the world of cryptography.

Everyone knows that Satoshi Nakamoto created bitcoin. Everyone also knows that every Tuesday he eats dinner at his favorite restaurant in [redacted] if you ever want to have a chat with him. Anyway, it's a little known fact that he made a bit of side cash from selling rare paper napkins on his website a few years ago. He woke up one morning and put up a vintage 1946 McDonalds Christmas edition for sale. Of course, it sold for thousands of dollars almost instantly. He carefully packaged up his napkin and shipped it off as quickly as possible (as all good sellers do). What happened next was a complete shock. The customer reported the transaction as fraud and the bank gave them their money back -- a chargeback! After a few phone calls, it appears that the bank agrees with the customer because they don't believe that someone is trading napkins for thousands of dollars (I know, how crazy is that). Satoshi lost his precious napkin and thousands of dollars because the trust relationship between his customer and the banks involved in the transaction failed.

So, after work one day, Satoshi then set out on a quest (yeah, Satoshi does quest stuff) to create a system where a trust relationship like this can't fail. Once someone sends money, it should be sent for good -- no more chargebacks that depend on a complicated system of failure prone trust relationships. A quick search on the Internets led Satoshi to the wonderful world of cryptography. Rather than pay people with money based on human trust, we should be able to pay people with money based on really long numbers with special properties that are nearly impossible to forge.

But then he thought, how is this any different from printing paper money? A mint would generate these really large random numbers and circulate them to the public -- there's no benefit over paper money! Also, counterfeiting becomes a nightmare when it only takes a simple cut-and-paste operation to copy money. There must be a way to prevent the same long random number, the money number, from being spent more than once. However, tomorrow is another today, Satoshi grabbed his toothbrush and set off for bed.

While in the shower the next morning, a lightbulb went on over Satoshi's head after he flipped the switch on the wall. A few minutes later, after his eyes adjusted to the annoying overhead light, he thought of a potential answer to his problem -- public and private keys! It was so obvious. He realized that if he creates a system where the money number changes every time it's spent, it'll be impossible to spend a money number more than once. Every person will simply create a random number that they give to everyone (a public key) and a random number that they keep hidden from everyone (a private key). These public and private key are related to each other in a few special ways. For example, when a money number is combined with the private key, someone can use the related public key to prove that a specific hidden private key was used. When someone wants to spend a money number, they combine it with their private key to prove to the receiver, who uses the related public key, that the money is coming from the person who has control of the private key. Another property of the public and private keys is that someone can combine the money number with a public key in such a way so that only the owner of the private key related to that public key will then be able to figure out what the original money number is.

How do we use these public and private keys to change the money number after it's spent each time? One more thing is needed, something called a hash. A hash is simply a thing that can take a bunch of numbers and combine them into a single number, or hash value. However, it's very difficult to go the other way -- you can't simply figure out the original numbers from the hash value. A hash lets us nicely package up a previous transaction.

Phew. It looks like Satoshi finally figured out a bunch of tools that can used to create a chain -- a secure list of transactions. When it comes to money numbers, a transaction is simply a bunch of numbers that prove the transaction took place. To prove his concept, Satoshi gave one of his money numbers to Alice (for free!). So, now Alice has a money number and wants to give it to Bob to pay him back after that trip to New Mexico. Also, Alice and Bob both have public and private keys. Alice creates a new transaction by combining her private key with the hash number of all numbers in the previous transaction. She then combines it with Bob's public key before she sends it off to Bob. Bob can then verify that the money number is worth something by checking it against Alice's public key and the public keys of people who held the money number before Alice.

Problem solved! But wait, later on we discovered that Alice double spent her money number and sent it to Craig to get some homeopathic flavored toothpicks. This is a serious problem (homeopathic flavored toothpicks also don't clean your teeth very well). How do we know Alice didn't also do the same exact thing with someone else? There's nothing stopping her from double spending a money number, or even spending the money number thousands of times. We need a way to make sure Alice can't keep sending her single money number to thousands of people.

Satoshi was pissed at Alice for cheating the system. I mean, how could she? He could see this kind of thing happening with Craig, but Alice is usually honest. Disappointed, he went down to the train station to catch the subway to the bar for a late happy hour with Adam. The train was late so he looked up the schedule. Since the clock on the wall said 5:15, he knows for sure that he already missed the 4:58. Wait, that's it -- we can combine the transaction with a time value! We need to setup a centralized server that creates hashes made up of the time, the transaction, and the value of the previous hash the centralized server created. The server will then make all of these hashes public to everyone. Now we can verify that Alice (and anyone else) didn't double spend their money number because we can figure out what transaction took place first.

But wait, we're back where we started. Satoshi's whole problem stared because a bank can perform a chargeback. What's stopping the person running the centralized hash-time server to roll back the server when they want to undo transactions? Nothing, that's what. We need a way to get rid of the centralized time server.

Finally arriving at the bar, Satoshi met up with Adam and ordered their monkey knife fight ale (which was quite awesome). Satoshi brought up the topic of Alice and how she's trying to cheat the system. He also mentioned the centralized time server thing. What Satoshi didn't know is that Adam came up with an idea to solve this problem a few years back. He called the concept "proof of work".

While Satoshi is downing a few beers, Adam goes on to describe his proof of work concept which depends on performing the really difficult operation of finding hash values with special properties. For example, finding a hash value that has a certain number of leading zero bits exponentially increases in difficulty as the number of zero bits decreases. Hashes with these special properties can be calculated by brute force. Every time someone is successful at this brute force operation, they combine it with the previous special hash value and an incrementing number (which determines how slowly to decrease the number of zero bits). Each hash value and incrementing number is called a block. When the blocks are combined together (linked by containing the previous block's hash), they're called a block chain. Since the block chain grows so slowly due to the brute force operation, everyone can trust that a block chain is the trusted chain by simply looking at how long it is. The longest block chain is the most trustworthy one. The longest block chain can be used to determine a trustworthy ordering of all transactions. Our centralized time server is no longer needed.

Satoshi, now six beers in, was like woah dude, look at that squirrel over there!

The next morning, with a blistering headache, Satoshi remembered the night before and realized he could adopt Adam's design. He would use the concept Adam described, but would also place a bunch of previously processed transactions into the block as well. That way, it would act as a distributed ledger of historical transactions. New transactions would need consensus among a few people before they were valid, but old transactions could be proven based on this distributed ledger contained in the block chain.

Everything was starting to come together, everyone would be sent a copy of the longest block chain and people would help to brute force the distributed clock. Satoshi was even nice enough to have a special rule where the first transaction in a new block is a free money number given to the person who successfully brute forced the block. This ensures that there's motivation for everyone to brute force the blocks (aka mining) and build the distributed ledger -- they get some money for it!

Satoshi then went on to create this system that we know today as bitcoin. However, Satoshi has other plans for his life besides bitcoin and moved to [redacted] for a bit until he can be sure of [redacted].

Thinking Out Loud - File copy tool arguments

A side project I've been working on for some time now is a decentralized-distributed file copy tool in the spirit of the typical cp command you can find on just about any unix style system out there.

When I start a project like this, I tend to concentrate on what the most difficult problem is -- once I solve the most difficult problem, I can then easily finish designing the rest of the system without too much effort. However, the file copy tool caught me off guard a bit with how complex the argument handling turned out.

Of course, the code to distribute the workload across hundreds of cores (proven to scale past 100,000 cores) was the most difficult. However, it's basically a solved problem for this application as far as this application goes. Take a look at libcircle to see how this is done.

Since the hardest problem in the system is basically solved, we can move on to the second hardest problem — the creation an easy to use front-end for users to chunk up and copy files. I choose the familiar POSIX-style interface since most people are already trained to use it.

A subtle problem in creating this frontend is dealing with the many combinations of directories and files that the user may use for input. To keep things simple for this blog post, I'm going to ignore everything but simple files and directories. Maybe in a later post, I'll cover how block and character devices, local domain sockets, named pipes, and symbolic links should be handled.

To give an idea of what we're dealing with, here's the usage message for my tool. I've named it dcp, short for "distributed copy program" (see dcp).

For all input, we need to know what the base name of the directory is that we'll be writing files into. We'll also need the path of the destination and a list of source paths. There are also a few "impossible" situations, like trying to copy a directory into a file. We need to prune out these situations and present a nice error message to the user.

To figure out what needs to be pruned out, first we need to know what we have. This is a bit tricky because sometimes the destination does not initially exist based on what the user is trying to do. However, we're not in the business of creating new directories, so it's safe to say that the destination should be a single file or directory if the source is a single file or directory. However, sometimes an error condition will pop up when it doesn't make sense for multiple source paths to be copied into a file.

Here's some pseudocode to demonstrate this concept. First, we check to see if something exists on disk at the destination path. If it does, we remember that state for later.

If the destination path doesn't exist, we check to see what the source paths are. If recursion is turned on, we'll have a file as the destination if the source is a single file, otherwise, we need the destination to be a directory.

Now that we know what the last argument should be, we can reason about what the end result should be. This makes it trivial to prune out the impossible situations. Writing down all potential input combinations yields the following impossible conditions: copying one or more directories into a file, copying many files into a file, copying one or more directories and files into a file. Encountering any of this input will lead to an error condition.

Take note that all of the impossible conditions we've listed have the property of the destination being a file. We can take advantage of this by catching many of the error conditions in the logic that determines if we're in the mode of copying a single file into another file.

Now that we've handled everything but copying source files into directories, we can easily handle the rest of the potential inputs.

So there we have it, an algorithm to handle user input of directories and files for a file copy tool. Stay tuned for the initial stable release of dcp.

Hilbert Curves for Locality on Supercomputers

I haven't been able to find much information on how space filling curves, like Hilbert curves, are used in network interconnection topology in supercomputers. So, this post is my attempt at putting some of the information in one place. We'll start off with an introduction to gray encoding and how it relates to Hilbert curves.

So, gray codes are the concept of the day here. Gray codes are just another binary representation of integer counting (1, 2, 3, 4, 5, ...). They were used back in the day for mechanical counting because of a key property -- only one bit changes for every value that a gray code is incremented or decremented. This made it simple for a sensor to tell when the value had changed because only one bit would be flipped every time. This is in contrast to normal binary counting where it's common for multiple bits to be flipped for every time the value is incremented or decremented which makes it difficult to tell if the bits have all completely flipped into their final state.

Gray codes also have another special property. Some people like to call gray codes "reflected binary codes". This property makes the generation of gray codes feel very similar to the generation of a fractal. For example, start off with the one bit gray codes -- you have 0 and 1. Now, to get two bit gray codes, you write it forwards and backwards (0, 1, 1, 0). Then, prepend 0s to the first half of the numbers and 1s to the second half of the numbers (00, 01, 11, 10) and you end up with the two bit gray codes. To get the n-bit gray codes, you repeat the process n times from the base case.

Now the cool stuff. Gray codes have yet another story to tell. You can use gray codes to reference a coordinate in a space filling curve known as a "Hilbert Curve". Ignore the word "curve" or now it's just going to confuse you. If you're like most people, you're going to think of a curve as a smooth turn on a roller coaster or the shape of a hot air balloon. You'll think of Hilbert curves like that eventually, but not now. Right now, you can safely think of Hilbert curves in appearance as similar to the maze puzzles on a paper placemat from a roadside greasy spoon.

So, how do you use gray codes to reference locations in Hilbert curves? Lets start off with the two bit gray codes. Just use each value like an x-y coordinate. So, we have the coordinates of (0, 0), (0, 1), (1, 1), (1, 0) as our first order Hilbert curve.

                     (0, 1)  *---------*  (1, 1)
                             |         |
                             |         |
                             |         |
                             |         |
                     (0, 0)  *         *  (1, 0)

Now, lets try to do the second order Hilbert curve using the four bit gray codes. To do this, we need to utilize the fractal properties of the Hilbert curve. Each coordinate in the first order Hilbert curve needs to be replaced by a new sub-curve that is the same shape as the first order curve. This new sub-curve will be translated and rotated to fit in with the design of the first order Hilbert curve. Once each of the coordinates has been replaced, we end up with a second order Hilbert curve.

                      *-------*       *-------*
                      |       |       |       |
                      |       |       |       |
                      |       |       |       |
                      *       *-------*       *
                      |                       |
                      |                       |
                      |                       |
                      *-------*       *-------*
                              |       |
                              |       | 
                              |       |
                      *-------*       *-------*

To address each of the coordinates in the second order Hilbert curve with gray codes, we need to reference the original location of the coordinate that was replaced by the sub-curve. So, for example, the lower-right corner in the second order Hilbert curve has a gray code with bits that start with 10.

We can then get the rest of the gray code by again picturing the single order Hilbert curve and rotating the visualization and then referencing that same point after rotations in the lower order. So, the remaining part of the code is 00 which makes the code for the lower-right corner 1000. Using the same concept, you can use a gray code to find the location in a Hilbert curve.

Now, how does the conversion from gray codes to Hilbert graphs and back actually help us when it comes to network topology? Lets look at a third order Hilbert curve for some perspective.

                    *---*   *---*   *---*   *---*
                    |   |   |   |   |   |   |   |
                    *   *---*   *   *   *---*   *
                    |           |   |           |
                    *---*   *---*   *---*   *---*
                        |   |           |   |
                    *---*   *---*---*---*   *---*
                    |                           |
                    *   *---*---*   *---*---*   *
                    |   |       |   |       |   |
                    *---*   *---*   *---*   *---*
                       (3)  |  (7)      |
                (2) *---*   *---*   *---*   *---*
                    |   |       |   |       |   |
                (1) *   *---*---*   *---*---*   *
                       (4) (5) (6)

Start from the lower left corner of the curve and start ordering each coordinate as you go along. As you pass each coordinate, calculate a rough estimate of the physical difference in distance between each coordinate. You'll find that as you go, coordinates that are close to each other tend to also be close to each other in their ordering. The number of each coordinate as you go along and give each a value is known as the "Hilbert integer".

Now that we have a bit of a base, we can try to relate this to interconnection topology. First things first, we need to have a picture of a 3 dimensional Hilbert curve. So, instead of filling up a 2 dimensional space through each order of the curve, we fill up a 3 dimensional space by adding a rotation into the third dimension to each translate and rotation in 2 dimensional space.

                           *       *
                           |\      |
                           | \     |
                           |  *    |  *
                           |  |    |  |
                           *-------*  |
                              |       |
                              |       |

Now that we have a topology that closely resembles the node structure in a 3D torus network structure, we can start thinking about how nodes are allocated.

Remember the concept of a Hilbert integer? That's going to be the node ID of every node in the supercomputer. When the Hilbert integers of each node are generated, they';re stored by the central resource manager and stored for the duration of the resource manager's lifecycle. After being sorted into a one dimensional array, the node IDs can then be effectively used by a scheduler to provide resources to jobs at a level of abstraction where the scheduler doesn't need to worry about the actual node topology.