RavenDB Sharding–Map/Reduce in a cluster
In my previous post, I introduced RavenDB Sharding and discussed how we can use sharding in RavenDB. We discussed both blind sharding and data driven sharding. Today I want to introduce another aspect of RavenDB Sharding. The usage of Map/Reduce to gather information from multiple shards.
We start by defining a map/reduce index. In this case, we want to look at the invoice totals per date. We define the index like this:
public class InvoicesAmountByDate : AbstractIndexCreationTask<Invoice, InvoicesAmountByDate.ReduceResult> { public class ReduceResult { public decimal Amount { get; set; } public DateTime IssuedAt { get; set; } } public InvoicesAmountByDate() { Map = invoices => from invoice in invoices select new { invoice.Amount, invoice.IssuedAt }; Reduce = results => from result in results group result by result.IssuedAt into g select new { Amount = g.Sum(x => x.Amount), IssuedAt = g.Key }; } }
And then we execute the following code:
using (var session = documentStore.OpenSession()) { var asian = new Company { Name = "Company 1", Region = "Asia" }; session.Store(asian); var middleEastern = new Company { Name = "Company 2", Region = "Middle-East" }; session.Store(middleEastern); var american = new Company { Name = "Company 3", Region = "America" }; session.Store(american); session.Store(new Invoice { CompanyId = american.Id, Amount = 3, IssuedAt = DateTime.Today.AddDays(-1)}); session.Store(new Invoice { CompanyId = asian.Id, Amount = 5, IssuedAt = DateTime.Today.AddDays(-1) }); session.Store(new Invoice { CompanyId = middleEastern.Id, Amount = 12, IssuedAt = DateTime.Today }); session.SaveChanges(); }
We use a three way sharding, based on the region of the company, so we actually have the following document sin three different servers:
First server, Asia:
Second server, Middle East:
Third server, America:
Now, let us see what happen when we use the map/reduce query:
using (var session = documentStore.OpenSession()) { var reduceResults = session.Query<InvoicesAmountByDate.ReduceResult, InvoicesAmountByDate>() .ToList(); foreach (var reduceResult in reduceResults) { string dateStr = reduceResult.IssuedAt.ToString("MMM dd, yyyy", CultureInfo.InvariantCulture); Console.WriteLine("{0}: {1}", dateStr, reduceResult.Amount); } Console.WriteLine(); }
As you can see, again, we make no distinction in our code about using sharding, we just query it normally. The results, however, are quite interesting:
As you can see, we got the correct results, cluster wide.
RavenDB was able to query all the servers in the cluster for their results, reduce them again, and get us the total across all three servers.
And that, my friends, it truly awesome.
Comments
Nice. Presumably paging and sorting are not supported?
Jonty, Paging and sort is supported.
How does that work then? Presumably you'd have to sort on each server, return the number of results from each server equal to the page size and do a further sort on the client.
Jonty, Yes, that is what we are doing.
Ayende, is that also how it works for pages other than the first page?
Let's say we want documents n through m. Are you saying that you get the first m documents from each server, and then on the client sort and throw away the unnecessary documents?
@Geert I guess that's the case, except it would be the reduce results that are being returned and thrown away, not whole documents!
Greet, No, it does no. we get the N-M from each server, then sort them, and give you the required page size from there.
Ok, I must be missing something.
Let's say we want document 3 through 5, sorted by key. There are 2 servers in the cluster. Server A contains docs with key A, B, C, E, H. Server B contains docs with key D, F, G, I, J
The expected result without clustering is C, D, E.
However, with clustering: Server A : C, E, H Server B : G, I, J which after sorting, and taking the first 3, gives you C, E, G.
I'm with Geert here. Logically you'd have to bring back m results from each server, as the first record on any given server could be in the desired page. Unless you did some kind of round robin between servers before returning to the client.
Geert, Yes, you are correct. You cannot get a globally paged sharded result without having access to all of the information. Now, we may provide feedback in the future that will allow you to fine tune those per shard, but this is really hard to do, and place undue burden on the client.
More than that, the actual scenario doesn't look too good from a business point of view either. In a sharded env, you respect the sharding, and you don't try to go ahead and do things like this on the fly. You would modify your behavior so your queries respected the shard boundaries.
So would the result of (hope the formatting works):
session.Query...() .Skip(10) .Take(10) .ToList()
return unexpected results under sharding? Or would it get 20 results from each server and do the Skip and Take client-side?
I guess what I really should do is download Raven DB and try it for myself, since Ayende's put so much effort in to making that as simple as possible to do :)
Morcs, It would get the 10 - 20 results from each server, sort them and give you 10 results
@morcs and Jonty
Doing sorting and paging at the same time like being described is hard. One solution I could think of is using the same attribute for sharding and sorting.
So in the example above you could sort first by region and then by name. This would get the first pages from server A, the next pages from server B and so on. I'm not sure if this is already supported by RavenDB, but it shouldn't be too complicated. In case you reach the end of one shard and only get a partial page, you would need to do another request for the next shard to fill the page.
Understood :) I've not had to use anything like sharding before but it would worry me that:
session.Query(...).OrderBy(...).Skip(x)
would return different, possibly confusing results in a sharded setup, but I do understand why.
Would the "safe by default" principle suggest that we should get an exception when trying to use Skip on an IOrderedQueryable in a sharded setup?
Morcs, We provide an extension point for you to inject your own behavior, so I am not sure if an exception would be a good idea here.
Is there a path that one can take to reverse the sharded DB back into single instance?
Thanks.
Petar, Sure, take all the data from the sharded instances and put it in one box. Then use the standard DocumentStore
Is there a way to "partition/shard" the data by a dynamic/arbitrary sequential value like date? In other words, could I tell RavenDB to partition a DocumentStore by month or do I have to explicitly define the shard like in your example of region?
Sonic, No, you can do it easily with RavenDB. You need to provide two values to the ShardingOn method, the first is to extract the value from the entity, and the second is to convert the value to the shard id. That way, you can do things like arbitrary or date based.
Oren, can I retrieve results if even nodes which doesn't contain appropriate data are down (offline)? Shards located in different datacentres. Simple example: 1. I do Load<T>(ID) from ShardedDocumentStore [Shard1, Shard2]. 2. Shard 1 is online. Shard2 is offline (down). 3. Shard 1 contains required document. I have exception in this case. How I can setup a session for retrieving available data? Does Raven have this possibility?
Vlad, Right now, we don't handle this scenario, and that is on purpose. I don't know what the kind of behavior to implement here would be. But we do provide a hook to show how you can handle that yourself, giving you the ability to handle that. The hook is the IShardAccessStrategy.
Oren, here is an example of the case. One of a system customers wants to store their data in separated db instance, in their own network/datacenter. Database is sharded by Customer. The customer also wants to store user accounts at their site (very paranoidal policy). I want to avoid situation when customer's connection problems will affect the system and other users will not have access to it. It will be great to have sharding which helps to avoid the problem. I've investigated the code of existing Access strategies (Sequential and Parallel) and as I see I can write the same but with appropriate exception handling. Thank you and your team for readable code!
Vlad, Note that one thing that you have to worry about in this scenario is actually knowing that you have a connection problem. The absolute worst thing you can have is to have a connection problem and for some of the data to just go away without you actually noticing.
Sure things, Oren. A user should be notified in case when the system avoids unhandled exception. Basically I think about only Load<T>(id) operation which returns only single value. In this case we can: 1. return found document (if any exists); 2.return null; 3. return exception when not found any and connection is broken. Yes, you are right - the system must guarantee that user see all requested data or notify about the problem like "You cannot view requested data because...", or in very rare situation - show part of requested data and notify about the problem like "You see not all of requested data because..."
Vlad, I wrote a five parts series about this exact topic: http://ayende.com/blog/155809/api-design-sharding-status-for-failure-scenarios?key=167619a5bdec4055a66651904916ffb4 http://ayende.com/blog/155841/api-design-sharding-status-for-failure-scenariosndash-ignore-and-move-on?key=fe08107267224788b42ce633469418ec http://ayende.com/blog/155873/api-design-sharding-status-for-failure-scenariosndash-explicit-failure-management?key=341dc08c7fde407a83dc2b7ad81c3bd0 http://ayende.com/blog/155905/api-design-sharding-status-for-failure-scenariosndash-explicit-failure-management-doesnrsquo-t-work?key=c9a5a273ba8d4029bd2f111e3a7293c8 http://ayende.com/blog/155937/api-design-sharding-status-for-failure-scenariosndash-solving-at-the-right-granularity?key=f3e596cb1e5342c193aa617553adea5d
I would like your opinion about this.
Comment preview