Sunday, July 20, 2008 #

Patterns for using distributed hash tables: Groups

Yesterday's post called them distributed in memory cache / storage, but I was reminded that the proper term for what I am talking about is distributed hash tables (DHT).

I presented the problem of dealing with DHT in this post, mainly, the fact that we have only key based access and no way to compose several actions into a single transaction. I'll let you go read that post for all the gory details, and continue on with some useful patterns for dealing with this issue.

As a reminder, here is the API that we have:

  • SET key, data, expiration
    Set the key value pair in the DHT
  • PUT key, data, expiration
    Will fail if item is already in DHT
  • GET key
    Will return null if item is not in DHT or if expired
  • DEL key
    Delete the key from the DHT
  • UPDATE key, data, version
    Update the item if the version matches

Now, let us explore the some of the more useful patterns we need to deal with. We will start with grouping of items.

One of the more annoying properties of key based access methods is... key based access. We have no way to perform a query (well, this is actually inaccurate, but we will touch it later).

Let us assume that we want to get the recent posts from the cache. How are we going to do that? Using SQL, it is very easy:

SELECT TOP 10 * FROM News ORDER BY PublishedDate DESC

We don't have anything similar using the API above. What we have instead is key based access, and it turn out we can use that to create groups, which are very useful.

A group is simple a key value pair where the value contains a list of keys. In the case of the query above, we can represent it using:

newsItems = List()
foreach news_id in GET("most recent news"):
	item = GET(news_id)
	continue if item is null // deleted news item
	newsItmes.Add(item)

The group itself would be:

{
  "most recent news" :
	[
		"NewsItem#15",
		"NewsItem#16",
		"NewsItem#17"
	]
}

But wait, why do we have to have groups, don't we violate a basic law of distributed work? SELECT N+1 ? Remember, each GET is a remote call. In this case, we make 4 remote calls, but what if we have 100 items in the recent news?

A better approach for this would be to put the item data in the group directly, like this:

{ 
	"most recent news" : 
		[ 
			"astounding occurrences in...",
			"amazing revelations...",
			"you are feeling sleepy..."
		]
}

Here, we have a single remote call, and we get all the data that we need. Much better, isn't it?

Well, sort of. The problem that we face now is that now we need to track what we are doing with regards to the most recent news. Let us say that we need to update an article. We most certainly want this update to be reflected in the most recent news as well. But in order to do that, we need to track that we put the article in the most recent news, and extending this sample a bit toward other groups that we may have in our applications, we can see that this quickly becomes an unmanageable problem.

By storing just the keys to other items in the list, we can now handle updates by updating a single key value pair, instead of updating a lot of separate items. In fact, this problem has a name, normalization / de-normalization. The tradeoffs of those are very well understood.

Actually, I wasn't as honest with you as I could be. The worst case scenario for this approach is not N + 1, it is (N modulus M) +1, where N is the number of items in the list, and M is the number of nodes in the DHT. The reason for that is that all DHT support some form of batch GET, so the following piece of pseudo code is a generic way to get a group from a DHT in as performant way as possible.

def GetGroup(groupName as string):
	items = GET(groupName)
	return EmptyList if items is null
	itemsByNode = GroupKeysByNode(items)
	futures = List()
	for kvp in ItemsByNode:
		futureGet = GET_ASYNC(kvp.Node, kvp.Items.ToArray())
		futures.Add(futureGet)
	WaitForAll(futures)
	results = List()
	for futureGet in futures:
		for item in futureGet.Results:
			results.Add(item) if item is not null
	return results

There are other optimizations we can make, but this one hits about as many of them as we can hope to without becoming very complex.

Now, remember the problem statement in the first post in this series, we have multiple clients working against the DHT in parallel. Let us assume that we have two clients that want to add an item to the most recent items. We want to preserve data integrity, so this is something that we need to think about. The way this is done is very simple:

while true:
	mostRecentItems, mostRecentItemsVersion = GetGroupWithVersion("most recent items")
	mostRecentItems.Add( GenerateCacheId(newItem) )
	result = UPDATE("most recent items", mostRecentItems, mostRecentItemsVersion)
	break if result is SuccessfulUpdate

Basically, we make a compare and exchange call, so if someone else snuck in and update the recent items group while we were doing it, we will simply fetch the list again and retry. A classic case of optimistic concurrency.

The problem occurs when we need to ensure mutual updates to two or more groups (or just two or more items in the DHT). Now, just using optimistic concurrency will not save us, because we don't have a way to control updates over two items (which may very well reside on different nodes). This will be the subject of my next post.

posted @ Sunday, July 20, 2008 11:18 AM | Feedback (0)

Saturday, July 19, 2008 #

Distributed in memory cache / storage

Let us start by defining the difference between cache and storage. A cache may decide to evict items whenever it feels like, a storage will only do so at well defined points.

As a simple example, this is legal for a cache:

PUT "foo", "item data"
result = GET "foo" 
assert result is null

The cache contract means that it "might" preserve values, or it might not, it is the cache choice. A storage contract make the above code illegal. It may never happen. Most cache solutions have a way to specify priorities, including the "do not evict" flag, which makes them into good storage solutions.

For now, we will assume the same API for both storage and cache, and assume that we have  flag to turn it this way or that. We need to figure out what is the contract we are working with in the first place. Distributed & in memory means that we have no practical memory limit and we do not care much for disk access times. Distributed does mean that we have remote calls penalty, but in general it means that we don't have to wait for disk, as we would have to in many DB based solutions.

We will go over the API first. Note that I am using an abstract API (suspiciously similar to the Memcached one, I know), which most cache/storage solution would support.

  • PUT key, data, expiration
    Will fail if item is already in cache
  • GET key
    Will return null if item is not in cache or if expired
  • DEL key
    Delete the key from the cache
  • UPDATE key, data, version
    Update the item if the version matches

We assume that the storage reside on multiple machines, which means that we have several interesting problems to deal with when we deal with the storage .

  1. We are, by definition, working in a threaded environment.
  2. The cache offer no way to lock items
  3. The set of items we need to perform a single operation may reside on different machines.
  4. As a result, Each operation is distinct from any other operation, there is no way to batch several operations into a distinct operation. In other words, we don't have transactions.

Think about the implications of this for a moment, will you?

Let us say that I have the following piece of code:

employee, employee_version = GET "employee#"+id
salary, salary_version = GET "salaray#"+id

# business logic

UPDATE "employee#"+id, employee, employee_version
UPDATE "salary#"+id, salary, salary_version

At any point, we might get interleaving from another client, which will lead to data corruption. Even the safe update version that we have is no good, assume that we fail to update the salary. At that point, we have already updated the employee, now we need to roll it back, but another client might have added their changes in the mean time...

To make things worse, the employee and salary might very well reside on different machines. Trying to build transactions around is possible, but extremely expensive in terms of performance.

We need a better solution, one that build on top of the existing API we have and give us a way to handle this properly.

I will touch on that in a future post.

posted @ Saturday, July 19, 2008 5:37 PM | Feedback (0)

Friday, July 18, 2008 #

Googling is a requirement

Here is another thing that annoys me. Some people ping with questions that are trivially answerable by a Google search. If you want to ask me something, consider Googling it first, not doing so shows lack of respect for my time.

posted @ Friday, July 18, 2008 3:12 PM | Feedback (20)

The purpose of Rhino Commons

Nathan has posted about utility libraries and he includes Rhino Commons there as well.

I don't see Rhino Commons as a utility library. At least not anymore. It certainly started its life as such, but it has grown since then.

Rhino Commons represent my default architecture. This is my base when I am building applications. It has some utility classes, sure, but it contains a lot more foundation and infrastructure components than anything else.

posted @ Friday, July 18, 2008 11:55 AM | Feedback (7)

Tuesday, July 15, 2008 #

Why I will not code review your code...

Like this post, this is a post that is here to serve as a statement of intention, and to clarify my position on the matter. Recently I had started to get quite a few requests for reviewing applications, frameworks and components. They all come from well meaning people, who often has very interesting code that they would like a second opinion on.

That is wonderful, except that I am most probably not going to be able to do this code review.

There are several reasons for that. I just don't have enough hours in the day to do so. And, to be frank, if you want me to do a code review on your code base, there is going to be a good reason for that. If it the source is publicly available and interesting (as defined by yours truly, not the author of the code), I would be more than happy to dig into the code. I do it quite often, just to make sure that I keep an open mind and learn from other people.

If the source isn't freely available, or if I don't find it interesting, then I would suggest contacting me for a consulting engagement, in which case I would be more than happy to go over any code base (unless it is written in Ook#, which I refuse to touch).

posted @ Tuesday, July 15, 2008 1:46 AM | Feedback (9)

Monday, July 14, 2008 #

Obsolete in Isolation

The question of how to deal with a brown field project came up in the ALT.Net mailing list. Here is the description of the project:

You've inherited a software project that has deeply rooted dependencies, business logic embedded in the presentation layer (and elsewhere), 'unit' testing that tests databases with hard-coded strings for primary keys, no actual unit testing but contrived integration/functional tests...

Sounds like one of my projects, from several years back, as a matter of fact. Oh, the things we did to System.Web.UI.GridView...

Oh, I was supposing to talk about how to deal with such a project, not reminisce about old projects.

My approach for such projects is called: Obsolete in Isolation.

Basically, I say that the existing software is there, and presumably it is valuable to the business. However, trying to continue development using the existing approach would cost too much time and effort. Typically, maintaining such a project is a big pain, and cost more than keeping my old car running (it was older than me, so take a guess).

Trying to restructure it is possible, but again, costly. As such, I declare the whole thing obsolete and decide that this is a green field project. Now, after having successfully escape a lynching by the customer for suggesting such a thing as throwing away "Working Code" because "I am being uppity", let me try to explain what I mean.

The code is here and it is working. It is not written in a way that make maintainability and extending it easy. We need to maintain and extend the code. We would like to avoid the pain during the process. Throwing the code away is a bad decision in most scenarios. So we will keep it, but we will not continue developing on it.

Instead, we will do all development on a new project, which is built to be maintainable and easily extendible. The old project will stand as it is. Any new features will go to the new project, any minor bug fixes will be done in the old project. With the old "tested with F5" if no other approach is viable. Major bug fixes means porting the code to the new approach.

Over time, what will happen is that we move all the parts of the application that are unstable and buggy to the new approach, which will allow us to test it properly. Any new features will be in the new application as well. The stable parts of the old application are going to remain there, and they are going to keep on working. We aren't going to lose the existing investment in them.

There are issues with this approach, of course. For example, there is a cost of integrating the systems, but it is generally a minor one. And you need to introduce new developers to both systems, the old and the new.

Even with those issues, I find that this is a good way to deal with legacy projects without too much pain.

posted @ Monday, July 14, 2008 9:16 PM | Feedback (22)

ALT.NET Israel registration is open

The registration for ALT.NET Israel is now open. You will need an OpenID (I use myopenid.com) to register. We are limited to 50 seats for the first event. First come first served. Worst case scenario you go into the waiting list.

Please only register if you are serious about attending.

The event takes place in two parts: Thursday eve. from 18:.30 to 20.30 and then the full day on friday (9-17.00). more details on the site.

posted @ Monday, July 14, 2008 12:33 PM | Feedback (0)

ALT.Net Israel

Yeah!

We are going to have an ALT.Net conference in Israel in a few weeks. More specifically:

Thursday 7th, at 18:30-20:30: planning meeting, following a walk to a nearby pub or coffee shop to socialise.

Friday 8th, at 09:30-16:30: sessions.

The conference will be held at the SQLink offices in Ramat Gan.

Ken Egozi was kind enough to not only prod me & Roy moving, but to arrange the location.

Agenda:

That is really up to the people who attend.  We will be following an open spaces format, similar to the other alt.net conferences in the UK, USA and Canada, where the agenda is decided by the conference participants.  Anyone can lead sessions on particular topics of interest, participate as an attendee or just hang around and chat with interesting people.

Our sponsors:

Registration is not open yet, I'll post again when it is.

posted @ Monday, July 14, 2008 5:28 AM | Feedback (0)

Sunday, July 13, 2008 #

Putting the container to work: Refactoring NServiceBus configuration

One of the most common issues when people are building frameworks and applications that rely on a container is that they are not giving the container enough to do. Basically, they use the container to create some components, but they are doing a lot of things that the container could do for them outside of the container.

Note: Code for this post can be in the Scratch Pad.

NServiceBus and Mass Transit are good examples of that. I detailed some of the issues that I had with Mass Transit a while ago. Udi and I talked about this situation with NServiceBus a few days ago, and this is my attempt to figure out a better model for configuring NSB. Let us start from what we have right now.

We have XML configuration in app.config:

<MsmqTransportConfig InputQueue="messagebus" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5"
/>

<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="">
  <MessageEndpointMappings>
      <add Messages="Messages" Endpoint="messagebus" />
  </MessageEndpointMappings>
</UnicastBusConfig>

<MsmqSubscriptionStorageConfig Queue="subscriptions" />

And we have code to initialize the bus:

new ConfigMsmqSubscriptionStorage(builder);
NServiceBus.Serializers.Configure.BinarySerializer.With(builder);
new ConfigMsmqTransport(builder)
	.IsTransactional(true)
	.PurgeOnStartup(false);
new ConfigUnicastBus(builder)
	.ImpersonateSender(false)
	.SetMessageHandlersFromAssembliesInOrder(
		typeof(RequestDataMessageHandler).Assembly
		);
IBus bus = builder.Build<IBus>();
bus.Start();

The ConfigXyz objects are there to configure the bus itself inside the builder (the container used in the sample.

My first step was to take this and move it to Windsor, and with no XML config. Which gave me this:

// configure bus
container.Register(
	Component.For<IBuilder>()
		.ImplementedBy<WindsorBuilderAdapter>(),
	Component.For<IMessageSerializer>()
		.ImplementedBy<BinaryMessageSerializer>(),
	Component.For<ITransport>()
		.ImplementedBy<MsmqTransport>()
		.DependsOn(new
		{
			InputQueue = "messagebus",
			NumberOfWorkerThreads = 1,
			ErrorQueue = "error",
			MaxRetries = 5,
			PurgeOnStartup = true,
			IsTransactional = false
		}),
	Component.For<ISubscriptionStorage>()
		.ImplementedBy<MsmqSubscriptionStorage>()
		.DependsOn(
		new
		{
			Queue = "subscriptions"
		}
		),
	Component.For<IBus>()
		.ImplementedBy<UnicastBus>()
		.DependsOn(new
		{
			ImpersonateSender = false,
			MessageOwners = new Hashtable
			{
				{"Messages", "messagebus"}
			},
			MessageHandlerAssemblies = new[]
			{
				typeof (RequestDataMessageHandler).Assembly
			}
		})
	);

// configure handlers
container.Register(
	// yuck, we are registering concrete type!
	Component.For<RequestDataMessageHandler>()
	);


var bus = container.Resolve<IBus>();
bus.Start();

There are a few things that you would notice here. We have a lot more code, we hard code the configuration all over the place, you either need to understand Windsor or you have to copy/paste this, no XML (yeah!).

Some of this is good (no XML), the rest... need some work. If we will consider the fact that this is more or less standard bus configuration (configure bus on top of MSMQ), we will see that there is quite a lot we can do here to encapsulate the entire mess into a mechanism that would be much easier to work with.

In Windsor, packaging functionality is done using facilities. A facility is an extension to the container that contains certain behavior. It can be as simple as packaging up registration for several components into a single facility or it can be as complex as proxies and runtime component selections.

Let us start with what the least common denominator. A Windsor facility with XML configuration. The configuration I came up with is:

<configuration>
	<facilities>
		<facility
		   id="NServiceBusFacility"
		   type="Windsor.Infrastructure.NServiceBusFacility, Windsor.Infrastructure"
		   useBinarySerialization="true"
		   subsciptionQueue="subscriptions">
			<transport inputQueue="messagebus" errorQueue ="error"/>
			<bus impersonateSender="false">
				<message name="Messages" destination="messagebus"/>
				<handler name="Server"/>
			</bus>
		</facility>
	</facilities>

	<components>
		<component id="RequestDataMessageHandler" 
			type="Server.RequestDataMessageHandler, Server"/>
	</components>

</configuration>

And the code to make this happen is here (minus utility methods that I am not showing):

public class NServiceBusFacility : AbstractFacility
{
	public bool UseXmlSerialization { get; set; }
	public bool UseBinarySerialization { get; set; }

	protected override void Init()
	{
		UseXmlSerialization = FacilityConfig.Value("UseXmlSerialization");
		UseBinarySerialization = FacilityConfig.Value("UseBinarySerialization");

		Kernel.Register(
			Component.For<IBuilder>()
				.ImplementedBy<WindsorBuilderAdapter>()
			);


		RegisterTransport();
		RegisterSerializer();
		RegisterSubscription();
		RegisterBus();
	}

	private void RegisterBus()
	{
		var bus = FacilityConfig.Children["bus"];
		if (bus == null)
			throw new InvalidOperationException("bus is a mandatory element");
		var messageOwners = new Hashtable();
		var assemblies = new List<Assembly>();
		foreach (var element in bus.Children)
		{
			if (element.Name == "message")
			{
				AddMessageDestination(element, messageOwners);
			}
			else if (element.Name == "handler")
			{
				AddHandlerAssebmly(element, assemblies);
			}
			else
			{
				throw new InvalidOperationException("Unknown element in bus: " + element.Name);
			}
		}
		Kernel.Register(
			Component.For<IBus>()
				.ImplementedBy<UnicastBus>()
				.DependsOn(new
				{
					MessageOwners = messageOwners,
					MessageHandlerAssemblies = assemblies
				})
			);
	}

	private static void AddHandlerAssebmly(IConfiguration handler, ICollection<Assembly> assemblies)
	{
		string assemblyString = handler.Attributes["name"];
		if (string.IsNullOrEmpty(assemblyString))
			throw new InvalidOperationException("name attribute is mandatory in handler element");
		assemblies.Add(Assembly.Load(assemblyString));
	}

	private static void AddMessageDestination(IConfiguration message, IDictionary messageOwners)
	{
		string messsageName = message.Attributes["name"];
		if (string.IsNullOrEmpty(messsageName))
			throw new InvalidOperationException("message must have a name");
		string destination = message.Attributes["destination"];
		if (string.IsNullOrEmpty(destination))
			throw new InvalidOperationException("message must have a destination");

		messageOwners[messsageName] = destination;
	}

	private void RegisterSubscription()
	{
		string attribute = FacilityConfig.Attributes["subsciptionQueue"];
		if (attribute == null)
			throw new InvalidOperationException("subsciptionQueue is a mandatory attribute");
		Kernel.Register(
			Component.For<ISubscriptionStorage>()
				.ImplementedBy<MsmqSubscriptionStorage>()
				.Parameters(
				Parameter.ForKey("Queue").Eq(attribute)
				)
			);
	}

	private void RegisterTransport()
	{
		IConfiguration transport = FacilityConfig.Children["transport"];
		if (transport == null)
			throw new InvalidOperationException("transport is mandatory element");
		Kernel.Register(
			Component.For<ITransport>()
				.ImplementedBy<MsmqTransport>()
				.Parameters(

				// mandatory
				transport.Parameter("InputQueue"),
				transport.Parameter("ErrorQueue"),

				// optional
				transport.Parameter("numberOfWorkerThreads", "1"),
				transport.Parameter("MaxRetries", "5"),
				transport.Parameter("PurgeOnStartup", "false"),
				transport.Parameter("IsTransactional", "false")

				)
			);
	}

	private void RegisterSerializer()
	{
		AssertValidSerializationSettings();

		if (UseBinarySerialization)
		{
			Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<BinaryMessageSerializer>()
				);
		}

		if (UseXmlSerialization)
		{
			Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<XmlMessageSerializer>()
				);
		}
	}

	private void AssertValidSerializationSettings()
	{
		if ((UseXmlSerialization && UseBinarySerialization) || (!UseXmlSerialization && !UseBinarySerialization))
		{
			throw new InvalidOperationException("Must define either XML or Binary, not both.");
		}
	}
}

I am not happy with this yet, the facility has a lot of code there, and we still have XML, but we have very little configuration and the code to use this is now:

IWindsorContainer container = new WindsorContainer("windsor.config");

var bus = container.Resolve<IBus>();
bus.Start();

Which is much better than both versions. It also doesn't require me to recompile to modify the configuration.

Hold the press, what about administrator configuration?!

One of the major emphasis that NServiceBus has in its configuration API it the explicit distinction it makes between developer level configuration (dependencies, which transport you are using, transactions, who handles what, etc) and administrator level configuration (queue names, mostly).

In the configuration above we have no such separation. Problem, isn't it?

Again, we can use the container itself as a way to deal with this. First, we will define a configuration file, which will contain the following text:

<configuration>
	<properties>
		<subscriptionsQueue>subscriptions</subscriptionsQueue>
		<inputQueue>messagebus</inputQueue>
		<errorQueue>error</errorQueue>
	</properties>
</configuration>

And now in the configuration file itself we will include this configuration file, and refer to those values:

<configuration>
	<include uri="file://Configuration.config"/>
	<facilities>
		<facility
		    id="NServiceBusFacility"
		    type="Windsor.Infrastructure.NServiceBusFacility, Windsor.Infrastructure"
		    useBinarySerialization="true"
		    subsciptionQueue="#{subscriptionsQueue}">
			<transport inputQueue="#{inputQueue}" errorQueue ="#{errorQueue}"/>
			<bus impersonateSender="false">
				<message name="Messages" destination="messagebus"/>
				<handler name="Server"/>
			</bus>
		</facility>
	</facilities>

	<components>
		<component id="RequestDataMessageHandler" 
			 type="Server.RequestDataMessageHandler, Server"/>
	</components>

</configuration>

And just like that, we got ourselves a nice dual configuration, one for the administrators and one for the developers.

I am still not happy with this, because I have XML and a lot of code to deal with this XML nonsense, but we did drop down to a very simple XML configuration with very little time.

Let us see what is going to happen if we will use Binsor...

Since we don't want to have replication of the XML syntax (we can do much better without the limitations of XML), we will start from scratch, and define a new facility.

Note: While writing this several improvements to Binsor itself occurred to me, so this is certainly something that can be improved.

The approach for building configuration language using Binsor is fairly simple. Create an object graph that represent your configuration, and just call it from the Binsor script.

We start by defining the configuration model:

public enum SerializationFormat
{
	Xml,
	Binary
}

public class Transport
{
	public Transport()
	{
		//default values for optional params
		NumberOfWorkerThreads = 1;
		MaxRetries = 5;
		PurgeOnStartup = false;
		IsTransactional = false;
	}
	public string InputQueue { get; set; }
	public string ErrorQueue { get; set; }
	public int NumberOfWorkerThreads { get; set; }
	public int MaxRetries { get; set; }
	public bool PurgeOnStartup { get; set; }
	public bool IsTransactional { get; set; }
}

public class Bus
{
	public IDictionary MessageOwners { get; set; }
	public Assembly[] MessageHandlerAssemblies { get; set; }
}

public class UnicastBus : Bus { }

As you can see, this is about as simple as it can get.

Then we define the facility itself:

public class NServiceBusFacility_Binsor : AbstractFacility
{
	public SerializationFormat SerializationFormat { get; set; }
	public string SubsciptionQueue { get; set; }
	public Transport Transport { get; set; }
	public Bus Bus { get; set; }

	public NServiceBusFacility_Binsor(
		SerializationFormat serializationFormat,
		string subsciptionQueue,
		Transport transport,
		Bus bus)
	{
		SerializationFormat = serializationFormat;
		SubsciptionQueue = subsciptionQueue;
		Transport = transport;
		Bus = bus;
	}

	protected override void Init()
	{
		Kernel.Register(
			Component.For<IBuilder>()
				.ImplementedBy<WindsorBuilderAdapter>()
			);

		Kernel.Register(
			Component.For<ITransport>()
				.ImplementedBy<MsmqTransport>()
				.DependsOn(Transport)
			);

		switch (SerializationFormat)
		{
			case SerializationFormat.Binary:
				Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<BinaryMessageSerializer>()
				);
				break;
			case SerializationFormat.Xml:
				Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<XmlMessageSerializer>()
				);
				break;
			default:
				throw new NotSupportedException("Serialization format " + SerializationFormat + 
" is not supported"); } Kernel.Register( Component.For<ISubscriptionStorage>() .ImplementedBy<MsmqSubscriptionStorage>() .Parameters( Parameter.ForKey("Queue").Eq(SubsciptionQueue) ) ); Kernel.Register( Component.For<IBus>() .ImplementedBy<NServiceBus.Unicast.UnicastBus>() .DependsOn(Bus) ); } }

It is significantly simpler than the XML configuration based one. And now we can get to the configuration itself:

import System.Reflection
import Windsor.Infrastructure
import Server

facility NServiceBusFacility_Binsor:
	serializationFormat = SerializationFormat.Binary
	transport = Transport ( 
		InputQueue: "messagebus",
		ErrorQueue: "errors"
	)
	subsciptionQueue = "subscriptions"
	bus = UnicastBus (
			MessageOwners : {
				"Messages" : "messagebus"
			},
			MessageHandlerAssemblies : ( Assembly.Load("Server"), )
	)
	
component RequestDataMessageHandler

Wait! What about administrator configuration? Now that we are using a script to configure our application, it is even more important to separate the administrative configuration from the application configuration.

We will take the exact same approach as we did before. Creating a separate file for administration purposes. In order to do so in a way that gives the admin a nice syntax for configuration, we will define a configuration model:

public class MyConfiguration
{
	public static string InputQueue { get; set; }
	public static string ErrorQueue { get; set; }
	public static string SubscriptionsQueue { get; set; }
}

Now we can create the default AdminConfiguration.boo file:

import Windsor.Infrastructure # namespace of MyConfiguration

MyConfiguration.SubscriptionsQueue = "subscriptions"
MyConfiguration.InputQueue = "messagebus"
MyConfiguration.ErrorQueue  = "error"

And our Windsor.boo file is now:

import System.Reflection
import Windsor.Infrastructure
import Server
import file from AdminConfiguration.boo

AdminConfiguration().Run() # execute admin configuration

facility NServiceBusFacility_Binsor:
	serializationFormat = SerializationFormat.Binary
	transport = Transport ( 
		InputQueue: MyConfiguration.InputQueue,
		ErrorQueue: MyConfiguration.ErrorQueue
	)
	subsciptionQueue = MyConfiguration.SubscriptionsQueue
	bus = UnicastBus (
			MessageOwners : {
				"Messages" : "messagebus"
			},
			MessageHandlerAssemblies : ( Assembly.Load("Server"), )
	)
	
component RequestDataMessageHandler

Now we don't have any XML involved, but the format that we have is suspiciously similar to the way we worked when we had XML. So, except from a small reduction in the configuration complexity, what did we gain?

We have a full fledged programming language for our configuration purposes. We can now apply rules to our configuration, make logic based decisions, etc.

As a simple example, instead of having to hard code the message owners and handlers, we can scan the application directory for matching assemblies. Want to add a new handler, drop it into the directory, done. This is a really powerful concept, and I am using this extensively in my applications.

Note: Code for this post can be in the Scratch Pad.

posted @ Sunday, July 13, 2008 12:33 PM | Feedback (7)

Notes on versioning Domain Specific Languages

Those are just a few topics that I feel are important for discussion when talking about versioning DSL:

  • Different behavior at runtime
  • API vs. Syntax
  • Different dialects
  • Backward and forward Compatibility
  • Pros:
    • Keeping existing assets
    • Training
    • Knowledge
    • The Test of Fire
  • Cons:
    • Increased costs
    • Harder to change
  • Preparing for versioning:
    • Syntax Documentation
    • Closed world - control what you can access
    • Limit to a scenario
  • Versioning strategies:
    • The Holy Compatibility
    • Build & abandon
    • Version marker
    • The Big Upgrade

Thoughts?

posted @ Sunday, July 13, 2008 12:29 PM | Feedback (5)

Integrating NHibernate and Active Record

On the face of it, this is a nonsense post. What do I mean, integrating NHibernate and Active Record? Active Record is based on NHibernate, after all. What kind of integration you need? Well, sometimes you want to be able to use NHibernate entities in an Active Record project. And that tended to be very hard. (The other way was extremely easy, just tell Active Record to generate the mapping and move from there.)

A week or so ago I added support for doing it the other way around, of adding POCO NHibernate entities into the ActiveRecord model.

Here is the test:

[Test]
public void CanIntegrateNHibernateAndActiveRecord()
{
	ActiveRecordStarter.ModelsValidated += delegate
	{
		new ActiveRecordModelBuilder().CreateDummyModelFor(typeof(NHibernateClass));
	};
	ActiveRecordStarter.Initialize(
		GetConfigSource(),
		typeof(ActiveRecordClass),
		typeof(NHibernateClass));

	Recreate();

	using (TransactionScope tx = new TransactionScope())
	{
		ActiveRecordClass ar = new ActiveRecordClass();
		ar.Friend = new NHibernateClass();
		ActiveRecordMediator.Save(ar.Friend);
		ActiveRecordMediator.Save(ar);
		tx.VoteCommit();
	}

	using (TransactionScope tx = new TransactionScope())
	{
		ActiveRecordClass first = ActiveRecordMediator<ActiveRecordClass>.FindFirst();
		Assert.IsNotNull(first);
		Assert.IsNotNull(first.Friend);
	}
}

Note that I would reserve this to advance scenarios only, in most cases, it is recommended to only use a consistent approach.

posted @ Sunday, July 13, 2008 12:06 PM | Feedback (1)

Beautiful (nontrivial) Code - Rhino Mocks 3.5's AssertWasCalled

Beautiful code is not something that is easy to define. I think of this as something that is extremely elegant, that solve a hard problem in a way that isn't brute force. I think that the way Rhino Mocks implements the AssertWasCalled functionality is elegant, and I would like to point it out.

I know of at least one contributor to Rhino Mocks who consider that piece of code scary, by the way, so it is not cut & dry.

Here is the actual method call:

public static void AssertWasCalled<T>(this T mock, Action<T> action, 
	Action<IMethodOptions<object>> setupConstraints)
{
	ExpectationVerificationInformation verificationInformation = 
		GetExpectationsToVerify(mock, action, setupConstraints);

	foreach (var args in verificationInformation.ArgumentsForAllCalls)
	{
		if (verificationInformation.Expected.IsExpected(args))
		{
			verificationInformation.Expected.AddActualCall();
		}
	}
	if (verificationInformation.Expected.ExpectationSatisfied)
		return;
	throw new ExpectationViolationException(
		verificationInformation.Expected.BuildVerificationFailureMessage());
}

We will get the GetExpectaionsToVerify in a bit, but broadly, it gets the expectation that should have been called and then it execute the same logic that it would have in the Record/Replay model. In fact, it is an exact reversal of the Record/Replay model. Now we record all the actual calls, and then we create an expectation and try to match it against the actual calls that were made against the actual object.

Of even more interest is how we get the expectation that we are verifying:

private static ExpectationVerificationInformation GetExpectationsToVerify<T>(T mock, Action<T> action,
		Action<IMethodOptions<object>> setupConstraints)
{
	IMockedObject mockedObject = MockRepository.GetMockedObject(mock);
	MockRepository mocks = mockedObject.Repository;

	if (mocks.IsInReplayMode(mockedObject) == false)
	{
		throw new InvalidOperationException(
			"Cannot assert on an object that is not in replay mode." +
			" Did you forget to call ReplayAll() ?");
	}

	var mockToRecordExpectation = (T)mocks.DynamicMock(
		mockedObject.ImplementedTypes[0], 
		mockedObject.ConstructorArguments);

	action(mockToRecordExpectation);

	AssertExactlySingleExpectaton(mocks, mockToRecordExpectation);

	IMethodOptions<object> lastMethodCall = mocks.LastMethodCall<object>(mockToRecordExpectation);
	lastMethodCall.TentativeReturn();
	if (setupConstraints != null)
	{
		setupConstraints(lastMethodCall);
	}
	ExpectationsList expectationsToVerify = 
		mocks.Replayer.GetAllExpectationsForProxy(mockToRecordExpectation);
	if (expectationsToVerify.Count == 0)
	{
		throw new InvalidOperationException(
			"The expectation was removed from the waiting expectations list,"+
			" did you call Repeat.Any() ? This is not supported in AssertWasCalled()");
	}
	IExpectation expected = expectationsToVerify[0];
	ICollection<object[]> argumentsForAllCalls = mockedObject.GetCallArgumentsFor(expected.Method);
	return new ExpectationVerificationInformation
			{
				ArgumentsForAllCalls = new List<object[]>(argumentsForAllCalls),
				Expected = expected
			};
}

This is even more interesting. We create a new mocked object, and execute it in record mode against the expectation that we wish to verify. We gather this expectation and extract that from the newly created mock object, to pass it to the AssertWasCalled method, where we verify that against the actual calls made against the object.

What I find elegant in the whole thing is not just the reversal of the record / replay model, it is the use of Rhino Mocks to extend Rhino Mocks.

posted @ Sunday, July 13, 2008 12:02 PM | Feedback (6)

Friday, July 11, 2008 #

Observations on Embedded databases

I spent significant parts of the last two weeks dealing with embedded databases. I have used, SQL CE, SQLite, FireBird, db4o and Berkeley DB.

My requirements were really simple, or so I thought. I just wanted safe for multi threading and support for transactions.

Let me go over them in order:

SQL CE

This is a really nice DB, syntax is comparable to SQL Server, so it makes a lot of things simpler. It has transaction support and is supposed to be multi threaded safe.

It is not.

It is very easy to get SQL CE into situations where it hang. Usually when it is attempting to open the database. Oh, and there is no lock timeout for this issue, so it literally hang.

Result: overruled.

SQLite

I just love this DB. It is simple, straightforward, and aside from crazy date manipulation support, just works. It has both transactions and multi threading support.

However, multi threading support is gained by locking the entire database. This is an acceptable behavior, in most scenarios, but for my needs, it meant that threads stepped on each other all too often, and that wasn't acceptable.

FireBird

I had a really hard time getting this to work correctly. I got some recommendations for it, so I decided to go for it.

It doesn't handle transaction isolation, so I never actually got to the point of testing multi threading behavior.

db4o

This was my first foray into using db4o, so take anything that I say with a grain of salt.

It looked like it would be a good solution. However, I couldn't figure out how to get two threads share the same file. Each connection lock the file, so they cannot be used concurrently. Using the server version might solve this, but I am looking for embedded solution, not server solution.

Berkeley DB

I spent the most amount of time here. And it is almost there.

I had one critical issue that I managed to overcome, but then BDB totally killed itself when it hang in my tests. After tearing up a lot of hair, I ended up finding out that there is a known bug in version 4.5 (which is the latest version that has a .NET binding) that can cause this.

Conclusion

I start using Dictionary as my data store. Just don't restart the server.

posted @ Friday, July 11, 2008 1:54 AM | Feedback (40)

Thursday, July 10, 2008 #

Looking for server backup

I have a VPS server that I am hosting some personal stuff on. I would like to have backup for that, since I know it will bite me in the end.

I am talking about backing up SQL Server (very small) and a few directories. Anyone can recommend a good online backup solution? Preferably one that I can install and forget about?

I am already using Mozy for backing up local machines, and I tried using that on the server. Unfortunately it doesn't work on virtual machines, which really annoyed me.

Any recommendations?

posted @ Thursday, July 10, 2008 10:15 AM | Feedback (22)

Wednesday, July 09, 2008 #

Boo Migration DSL

Nathan Stott is doing some really interesting things with Rhino DSL and Boo. His latest post outlines how to create this syntax:

CreateTable "Cats":
    Int32 "Id", { "identity" : true, "primary" : true }
    String "Name", { "length" : 50 }

I like it.

posted @ Wednesday, July 09, 2008 9:39 AM | Feedback (10)

Monday, July 07, 2008 #

Dealing with time in tests

One of the more annoying things to test is time sensitive code.

I just spent five minutes trying to figure out why this code if failing:

repository.ResetFailures(failedMsgs);
var msgs = repository.GetAllReadyMessages();
Assert.AreEqual(2, msgs.Length);

Reset failures will set the retry time of the failed messages to 2 seconds in the features. GetAllReadyMessages will only get messages that are ready now.

Trying to test that can be a real pain. One solution that I have adopted across all my projects is introducing a separate concept of time:

public static class SystemTime
{
	public static Func<DateTime> Now = () => DateTime.Now;
}

Now, instead of calling DateTime.Now, I make the call to SystemTime.Now(), and get the same thing. This means that I can now test the code above easily, using:

SystemTime.Now = () => new DateTime(2000,1,1);
repository.ResetFailures(failedMsgs); 
SystemTime.Now = () => new DateTime(2000,1,2);
var msgs = repository.GetAllReadyMessages(); 
Assert.AreEqual(2, msgs.Length);

This is a really painless way to deal with this issue.

posted @ Monday, July 07, 2008 6:39 AM | Feedback (13)

Why I don't like FireBird: Part II

Take a look at this code. It is supposed to give me the earliest message, ensuring that I'll get each message once and only once.

It doesn't work. I am getting some messages twice. The same code (well, simplified) just works on SQLite.

public QueueMessage GetEarliestMessage()
{
	byte[] data = null;
	bool done = false;
	while (done == false)
	{
		Transaction(cmd =>
		{
			/* 
				SELECT FIRST 1 Id, Data FROM IncomingMessages
				ORDER BY InsertedAt ASC
			*/
			cmd.CommandText = Queries.GetEarliestMessageFromIncomingQueue;
			string id;
			using (var reader = cmd.ExecuteReader())
			{
				if (reader.Read() == false)
				{
					done = true;
					return;
				}
				id = reader.GetString(0);
				data = (byte[])reader[1];
			}
			/* DELETE FROM IncomingMessages WHERE Id = @Id	*/
			cmd.CommandText = Queries.DeleteMessageFromIncomingQueue;
			cmd.Parameters.Add("@Id", id);
			try
			{
				var rowAffected = cmd.ExecuteNonQuery();
				// someone else already grabbed and deleted this row, 
				// so we will try again with another one
				if (rowAffected != 1)
					return; // same as continue in this case} 
			}
			catch (FbException e)
			{
				// yuck! it would have been better to compare the error code
				// but FB doesn't exposes it
				if (e.Message == "cannot update erased record")
				{
					return;// same as continue
				}
			}
			done = true;// same as break from the loop
		});
	}
	if (data == null)
		return null;
	return Deserialize(data);
}

protected void Transaction(Action<FbCommand> action)
{
	using (var connection = new FbConnection(connectionString))
	using (var cmd = connection.CreateCommand())
	{
		connection.Open();
		using (var tx = connection.BeginTransaction(IsolationLevel.Serializable))
		{
			cmd.Transaction = tx;
			action(cmd);
			tx.Commit();
		}
	}
}

a

posted @ Monday, July 07, 2008 12:37 AM | Feedback (10)

Why I don't like Firebird: Part I

I mentioned before that my main objection to this DB is its utter lack of tooling. I mean, even SQLite has some tools. For Firebird, there seem to be nothing that works on my machine.

I needed to debug something there, and this was the easiest solution.

image

posted @ Monday, July 07, 2008 12:32 AM | Feedback (8)

Sunday, July 06, 2008 #

Things you should never do

That should be pretty high in the list, I think:

image

image

posted @ Sunday, July 06, 2008 3:05 AM | Feedback (14)

Saturday, July 05, 2008 #

Challenge: Find the deadlock

Okay, there isn't much of a challenge here, but it is worth point out nevertheless:

image image

posted @ Saturday, July 05, 2008 9:36 AM | Feedback (2)

A TDD Dilemma

I am currently modifying some core parts of the system, changing it from using a SQLite DB to using Berkeley DB. The problem is that it is causing... issues.

I have things fairly well isolated, but I need to write code that make this test pass:image

As you notice, this is a test for the repository, and it is