Pipes and filtersThe IEnumerable appraoch

time to read 5 min | 879 words

Pipes are very common in computing. It is a very good way to turn a complex problem to a set of small problems. You are probably familiar with the pattern, even if not explicitly.

  • The ASP.Net Http Pipeline (Begin_Request, Authorize_Request, etc
  • Compiler Pipelines (Parse, ProcessTypes, SaveAssembly, etc)
  • Command Line piping (ps -ax | grep Finder)

What I wanted to talk about today was how to implement this in code. I did several implementation of pipes and filters in the past, and they all were overly complex. I took this weekend to look at the problem again, and I came up with a ridiculously simple solution.

In a nutshell, here it is:

image

We have a pipeline, that is composed of operations. Each operation accepts an input and return an output. The use of IEnumerable<T> means that we can streamline the entire process without any effort whatsoever.

Most problems that calls for the pipeline approach are fairly complex, so picking a simple example means that it is trivial to implement it otherwise. Let us go to the really trivial sample of printing all the processes whose working set is greater than 50 MB.

We have three stages in the pipeline, the first, get processes:

public class GetAllProcesses : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        return Process.GetProcesses();
    }
}

The second, limit by working set size:

public class LimitByWorkingSetSize : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        int maxSizeBytes = 50 * 1024 * 1024;
        foreach (Process process in input)
        {
            if (process.WorkingSet64 > maxSizeBytes)
                yield return process;
        }
    }
}

The third, print process name:

public class PrintProcessName : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        foreach (Process process in input)
        {
            System.Console.WriteLine(process.ProcessName);
        }
        yield break;
    }
}

All of those are very trivial implementation. You can see that the GetAllProcesses class doesn't care about its input, it is the source. The LimitByWorkingSetSize iterate over the input and use the "yield return" keywords to stream the results to the next step, PrintProcessesName. Since this step is the final one, we use the "yield break" keywords to make it compile without returning anything. (We could return null, but that would be rude).

It is important to note that the second stage uses the if to control what get pass downstream.

Now we only have to bring them together, right?

public class TrivialProcessesPipeline : Pipeline<Process>
{
    public TrivialProcessesPipeline()
    {
        Register(new GetAllProcesses());
        Register(new LimitByWorkingSetSize());
        Register(new PrintProcessName());
    }
}

Now, executing this pipeline will execute all three steps, in a streaming fashion.

Okay, this is a lot of code that we can replace with the following snippet:

int maxSizeBytes = 50 * 1024 * 1024;
foreach (Process process in Process.GetProcesses())
{
     if (process.WorkingSet64 > maxSizeBytes)
         System.Console.WriteLine(process.ProcessName);
}

What are we getting from this?

Composability and streaming. When we execute the pipeline, we are not executing each step in turn, we are executing them all in parallel. (Well, not in parallel, but together.)

Hey, I didn't show you how the Pipeline<T> was implemented, right?

public class Pipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public Pipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

I'll leave you to ponder that.

More posts in "Pipes and filters" series:

  1. (06 Jan 2008) The multi threaded version
  2. (05 Jan 2008) The IEnumerable appraoch