Designing Rhino DHT - A fault tolerant, dynamically distributed, hash table
of the more interesting requirements that came my way recently was to redesign Rhino DHT to support dynamically distributed network of nodes. Currently, the DHT support a static network layout, and while it can handle node failure (and recovery) via replication, it cannot handle node additions on the fly.
That is a problem. But another problem is that the configuration for the DHT is getting complex very fast. And we start putting more and more requirements on top of it. This post is going to articulate the requirements we have for the DHT, how the current design for solving them.
- Storage for the DHT is a solved problem, using Rhino Persistent Hash Table.
- Concurrency options for the DHT are those that are applicable for Rhino PHT.
- Merge concurrency
- Optimistic concurrency
- Last write win - for unchanging values
What we need to solve is how to generalize the PHT into a dynamic DHT, with failure tolerance and the ability to add and remove nodes to the network on the fly.
- A DHT cluster may contain 1 or more nodes
- A DHT cluster allow to extend it by adding new nodes on the fly
- Adding a node to a cluster is an expensive operation, requiring resharding of the data
- Removing a node is not explicitly supported, when a node is removed we assumed it is failed, and we reallocate its responsibilities
- A value on the DHT always have a master node, and replicated to 2 other machines
- A DHT node configuration includes:
- DHT Cluster name
- DHT options:
- max number of machines to replicate a value to
- The cluster topology is maintained in the DHT itself, and can be queried from any node
- Clients may receive a topology changed error when querying the DHT, and need to refresh their cached topology when this happens
Working from this set of assumptions, let us see how we can handle a DHT node startup. For now, we assume that this is the first time that it has woken up, and that there are no other nodes alive.
The DHT node publish a multicast UDP message on the network, announcing that it is up, and waits for replies. Since this is currently the only node, there are no replies and we are going to assume that we are the leader, and mark ourselves as such. A second node coming up is also going to announce itself on startup, and this time it is going to get a reply from the first node, telling it that the first node is the leader.
This is where things gets... tricky. A new node coming up means that we need to reshard the system. Sharding means that we put the values on more than a single machines, and resharding means that we need to allocate previously assign data range from the existing machines to the new one.
We handle this problem using the following approach:
The DHT is a key value store. Setting a value requires a key (string). That key is going to be MD5 hashed, in order to get reliable hashing algorithm across machines, runtime versions and maybe even different platforms. Using the hash, we are usually going to simple index into the list of nodes in order to find the appropriate node responsible for this. In essence, this is the code most often used:
nodes[ MD5_Hash(key) % nodes.length ].Send(key, val);
The problem with this approach is that adding/removing a node will invalidate the entire hashing strategy. In order to handle this problem, what we are going to do is to assume a 32 bits address space, giving us 4,294,967,296 distinct values. We are going to divide this address space into 1024 ranges, each of 4,194,304 values.
Each of those values is going to be assigned to a node. This gives us much more control on distributing the values across the network.
Back to the resharding issue. When we had a single node, it was the owner of all those ranges, and any value was set in that node. Now, we have to reallocate ranges, and we allocate half of them to the new node, giving us 512 ranges for each. Just reallocating the nodes isn't as simple as it may sound, however, since we have possible values allocated in them.
The algorithm for doing that is:
- The leader allocate a set of ranges to the newly arrived node, and tell the new node about its new ranges, and the old nodes that were assigned those ranges.
- The new node now access each old node and ask it to send it all the values in the ranges it was previously assigned. This is a somewhat complex procedure, since we need to keep the system alive while we do so. Internally, it is going to work as:
- Get all the values in the requested range and mark their timestamp
- Send all the values to the new node
- For all the sent values, check their timestamp and mark any that weren't changed with "forward to the new node", any request for that value would now generate an error saying that the client needs to go to the new node instead.
- Repeat until there are no values that has been changed during the send process.
- Mark the range as "forward to this node", any request will generate an error saying that the new node should be used.
- Once we get all the values in all the ranges allocated to us, we let the leader know about that.
- The leader tell all the nodes to accept the new topology. Now, any queries for the deallocated values on the old nodes will generate a topology changed error, which would force the client to reload the topology.
- Note, there is a difference between a "forward to this node" error vs. "topology changed" error. The first is transient, the second is permanent.
There are two major problems with this algorithm. The first is that it totally ignore the possibility of failure, the second is that it ignores the replication topology as well.
We will start with the notion of failure first, if the new node fails during the initial load process, it is going to cause some problems, because any newly accepted values on it will be dead, while the node that gave it will continue redirecting to the now dead new node. This isn't actually a problem, the standard failover mechanism will kick in, and we will query the node secondary (which hasn't changed yet) for the actual value. If we try to set a value, it will generate a replication to the old primary node, resulting in the value effectively being "resurrected" in the old node.
Since we don't expect this scenario to be very common, I think we can leave it as that, the situation will slowly correct itself over time, without us needing to do anything else.
A more interesting case is the failure of one of the old nodes during the copy process. Again, we will fail over to the range's secondary node, and copy the values from there. There is an interesting race condition here if the old node manage to recover in time, but again, that is not something that I am overly concerned about.
Replication topology is a more interesting scenario. Let us go over the topology of a four node cluster:
- Node 1
- Leader
- Primary for ranges 0 - 256
- Secondary for ranges - 256 - 512
- Tertiary for ranges - 512 - 768
- Node 2
- Primary for ranges - 256 - 512
- Secondary for ranges - 512 - 768
- Tertiary for ranges 768 - 1024
- Node 3
- Primary for ranges - 512 - 768
- Secondary for ranges - 768 - 1024
- Tertiary for ranges 0 - 256
- Node 4
- Primary for ranges - 768 - 1024
- Secondary for ranges - 0 - 256
- Tertiary for ranges - 256 - 512
This topology is stored in all the nodes, and can be queried from any of them. This means that it is very easy for any of the clients to know exactly which node is the master for any key. And in the case of a node failure, they also know which nodes to failover to.
Any node in the cluster is also aware of the secondary and tertiary replication backups for its ranges, and it used async messaging to let them know about new values. This keeps the node alive even when the replication nodes are down.
Changing the replication topology is done by the leader once the primary values has been successfully copy and we have a topology changeover for the primary. The process is identical to the way we handle the primary topology changeover.
So far, so good, but we haven't handled two other scenarios, leader failure and long node failure. As it stands currently, restarting a node should not cause a change in the network topology. All the clients will automatically failover for the secondary or tertiary nodes, and on startup, the node is going to query its secondary and tertiary nodes for any new values they accepted in its name before is starts listening to incoming requests.
The problem is when we have a node that is going to be down for a long period of time, or forever. Since we are attempting to get a zero configuration system, we need to handle this ourselves. We give a failed node a timeout of 15 minutes to resume operation. This is being taken care of by the leader, which is going to listen to heartbeats from all the nodes. Assuming that 15 minutes have passed without getting a heartbeat from the node, we are going to assume that the node is down, and that we need to reallocate its ranges.
This is actually a fairly simple process, we bump the secondary node for its ranges to be the primary node and the tertiary to be the secondary. The next issue is then to do replication rebalancing, to make sure that each range has a secondary and tertiary nodes, but that is already resolved with the new node addition.
We have a few other edge cases to handle. The most important one is the failure of the leader. This one is pretty easy to handle, I think. The first node in the topology is always the leader, and the topology is always replicated to all the nodes. The second node in the topology is a watchdog for the leader, always pinging it to see that it is alive and responding for requests.
If the leader is down for a period of time (15 seconds, let us say), the watchdog takes over, and let everyone else know that it is now the leader. The next one in the chain is now the watchdog. When the old leader comes up again, it joins the cluster as a normal node. The data that the leader stores is subject to the same rules as any other nodes, and 15 minutes after the node is down it will go through the same rebalancing act by the new leader.
Thoughts?
Comments
How are you planning to do a distributed Lock when updating values?
When the client updates a key/value pair the next value request (no matter which client that request came from) must return the updated value.
Can you write a little bit more about that?
Have you looked at NChord? http://nchord.sourceforge.net/
I haven't looked into it deeply enough, but fro a skim of the doc it would seem that they have algorithms for dealing with a dynamic cluster.
Very interesting.
What is the problem that you're trying to solve with your DHT ?
Or is it purely academic curiosity?
I'm intrested in how would you handle such scenario:
For some reason (network problems, machine freezes due to bug in application etc) the lead passes 15 secs without answering it's watchdog, so the second takes the lead. After 15+ secs have passed( say 17 secs) the lead is up & takes back. & so forth. Effectevly, it would mean DHT will spend most of it's time switching leads.
P.S. If you think it's imaginary - it's not. I've read somewherre exactly this scenario, where network router had problems under heavy load. And they had this kind of aggressive redundancy & failover. Sorry, can't provide a link to the source.
Very interesting. Would it be a bit simpler if you divide your hashing logic iinto two pieces. First select the range using MD5_Hash(key) % ranges.length and then select the node using rangekey % nodes.length. The secondary node would be (rangekey * 3) % nodes.length and the tertiary (rangekey * 5) % nodes.length. That would distribute all keys quite well into ranges and ranges into different nodes.
Each node would have a permanent slot in the nodes list. If you loose a node permanently, a new node could take that slot. If the length of the nodes list changes, the master would decide what ranges (primary, secondary, or tertiary) should be noved from a node to an other one.
Just my $0.02
Tapio
V,
That is not a problem, the client always goes to the same node for the same value.
Andy,
No, I didn't know about this.
I'll take a look, looks very interesting.
Omer,
Key value store for items in a distributed network that can survive nodes coming up and down.
Victor,
I am not going to handle this scenario.
When the leader come back up, it is going to be a regular node under the new leader.
Tapio,
Using the second mod would mean that I am still vulnerable to rehashing issues. I prefer to have a much more explicit control over the issue, rather then just use mods.
Andy,
I took a very quick peek into NChord, and... the code base makes me nervous.
Oren,
Your design does not really remove the rehashing problem. Instead of rehashing keys, you end up moving ranges from one node to another. My idea of of mod operator to find out the owner-node of a range was a very bad anyway. That would easily cause the primary, secondary and tertiary nodes be the same node.
Hash the keys using a fixed-length range-array and use your master to control the topology of nodes and ranges. That way you'll never have to rehash keys.
The failure behavior is something you have to think about. If a node goes down because of too heavy traffic and the master decides to change the topoplogy, the failure could escalate to other nodes very fast and bring all nodes down.
Tapio
1024 buckets isn't enough, try going higher
another uncommon but real scenario is network partioning (ie a vlan config somewhere gets borked and suddenly half the computers are split and isolated from the other half). In this case, there needs to be a recovery case for when the two isolated networks are rejoined
Have you tried looking into the design of Bittorrent's overlay network? It is what they use for "dynamically distributed network of nodes" in their DHT implementation. There are a number of open source Bittorrent implementations. The Mono guys have one, BitSharp http://www.mono-project.com/Bitsharp.
Lately I've been messing with BitSharp and the Memcached client Enyim.Caching (which also uses a DHT).
Tapio,
I thought that my design did just that, the leader controls the network topology.
Andy,
1024 gives me 1024 nodes in the cluster, more than enough.
I am aware of network partitioning, and I'll write some tests for it, but I don't know how to deal with it right now.
Don,
I did, but they all make assumptions about the type of network that you do, which is not relevant to where I want to use the DHT.
The DHT is going to be used most often on the LAN, or, at worst, across data centers.
Oren - I'll "rehash" my question:
What is the problem that you're trying to solve with a "Key value store for items in a distributed network that can survive nodes coming up and down".
Having a dynamic DHT like you're describing is neat, but what is your real-world "business" problem that made you write it for?
Session store, saga state, highly efficent key based retrieval for state of current operations, cache.
Just out of curiosity, is it me or can you only have an even number of nodes in your network?
That is your DHT couldn't consist of 3,5 or 7 nodes as the ranges would be unbalanced.
Paul.
Um, I am not seeing how you reached that conclusion.
Each node has a primary and secondary and tertiary range of nodes it maintains incase any one of the nodes fails.
The ranges that each node are split is 1024 / nodes, for your 4 node example. Each range is 256 = 1024 / 4. Backed up on two other machines. If you have 5 nodes 1024 / 5 does not fit , so the ranges that each node looks after will be unbalanced.
Infact, I made a mistake when I said even number, does the number of nodes have to be a ^2?
To be fair I have not seen the code (is it available) so I am making wild assumptions
Oren,
All the examples you gave (session store, saga state, cache, etc...) are from the domain of the infrastructure.
I fully understand how your DHT solution fits that domain.
I was curios in the business problem you're working on that needs all that heavy infrastructure.
It's not a matter of not understanding the problem or the solution. I'm just curios about what made you write this.
Paul,
Yes, there would be some small imbalance, but for a 7 node cluster, we are talking about two nodes with 147 ranges and five nodes with 146, that is not really problematic from my view point.
Omer,
A lot of the data that I store is keyed and non relational. Shopping cart information, search information, etc.
That is the business problem that we are solving.
really good discussion
my thought would be to have a controller node of some sort (similar to your watch dog idea)
the controller node would of course be backed up by another node for fail-safe operations but...
this controller node would be the one that indicates who the leader is (if more than one node comes on like) but would also be responsible for holding or containing the ranges
my thought here would be that the ranges would be fed into the nodes as designations rather than a single node (the leader) trying to determine it
as nodes come up the controller would determine what ranges should should be re-partitioned and how
then when the controller re-hashes the ranges the DHT would flow through it rather than an ack-nack with the lead controller
does that make sense?
so node 1 is on-line with all of the ranges available to it
as node 2 is being brought up (before the topology is changed mind you), the controller would re-hash the ranges and request the data from node 1 for the ranges going to node 2
if there is an error or some transmission error then node 2 has still not been activated and no topology has changed
(mind you... i don't think that it would be necessary to remove data from the nodes for specific ranges since those ranges would "fall off" after the topology is changed)
the only issue here (as i am thinking about it) would be that while the ranges and topology are changing (or nodes are being brought up) the controller nodes would have to get any updates or additions to smartly invalidate those ranges for a second pass (like an active pass through)
eh... this approach might be a little nieve and more than likely introduce too many moving parts
Very interesting stuff. For resharding when nodes join, take a look at consistent hashing:
http://en.wikipedia.org/wiki/Consistent_hashing
www.spiteful.com/.../programmers-toolbox-part-3...
Regards,
Erik
So it does this partionining now with the imbalance?
One other thing, if you have 5 nodes, I don't think there is any topology that allows each not to have a primary, secondary and tertiarty data. One node would have to have only a primary and secondary, or primary, secondary, tertiary and 4th store?
Erik,
This is just one detail of how to find the nodes, I am aware of this.
I am more concerned with detecting and transparently moving data in the presence of failure or new nodes.
Paul,
Yes, but the imbalance is small enough for us not to care about it.
As for 5 nodes:
1 - 0-205 (primary), 820 - 1024 (secondary), 616- 820 (tertiary)
2 - 205 - 410 (p), 0 - 205 (s) 820 - 1024 (t)
3 - 410 - 615 (p), 205 - 410 (s), 0 - 205 (t)
4 - 615 - 820 (p), 410 - 615 (s), 205 - 410 (t)
5 - 820 - 1024 (p), 615 - 820 (s), 410 - 615(t)
There is very small imbalance here, the last node has 204 ranges instead of 205.
But every node is primary, secondary and tertiary.
Interesting problem but did you considered other options? NCache, NVelocity...I have not done any performance measurements yet therefore I'm curious how fast is retrieving data from distributed hash across network in comparison with RDB. When you have entire aggregate(in DDD meaning) under one key it must be amazing fast. However, execute complex queries against such data store seems to be difficult as well as handling references. The performance gain in such scenarios(complex queries) might not be reasonable...
Bystrik,
Caching solutions aren't really good, I need something with persistence options.
When the entire aggregate is under a key it is really fast.
The problem with most RDMBS is that they don't scale wide very easily.
Most of the commercial distributed caches have persistence capabilities. You might want tot check out GigaSpaces and Coherence (now under Oracle).
Ayende, I also commented on your newer post about NChord before reading this post.
Chord was really designed for a variety of high churn networks... because it was inspired by peer-to-peer file sharing architecture. There are nodes in a Chord network which you could not make Leaders (or supernodes) due to latency concerns so your first-come leader strategy would break, but your business requirements gets around that problem. A supernode infrastructure is much more efficient like you point out in these articles.
Anyway, I'm working on fault tolerance for calendar of events aggregation at work and I decided that during leader failure all nodes would race each other to become the new leader. I use the database to queue up leader candidates and then broadcast leader status across the cluster when the new leader takes over. The database is fault tolerant and IMHO is perfect for this task IF like in my case your identity strategy is sequence. Otherwise I don't think it would work.
Cali,
In your case, you don't actually have a problem. You have a single point of truth, so you can sync around that.
Comment preview