A minimal actor framework, part 2
Originally posted at 4/29/2011
Yesterday I introduced a very minimal actor “framework”, and I noted that while it was very simple, it wasn’t a very good one. The major problems in that implementation are:
- No considerations for errors
- No considerations for async operations
The first one seems obvious, but what about the second one, how can we not consider async operations in an actor framework?
Well, the answer to that is quite simple, our actor framework assumed that we were always going to execute synchronously. That isn’t going to work if there is a need to do things like async IO.
As it happened, that is precisely what I had in mind for this code, so I wrote this:
public class Actor<TState> { public TState State { get; set; } private readonly ConcurrentQueue<Func<TState, Task>> actions = new ConcurrentQueue<Func<TState, Task>>(); private Task activeTask; public void Act(Func<TState, Task> action) { actions.Enqueue(action); lock(this) { if (activeTask != null) return; activeTask = Task.Factory.StartNew(ExecuteActions); } } public event EventHandler<UnhandledExceptionEventArgs> OnError; private void InvokeOnError(UnhandledExceptionEventArgs e) { var handler = OnError; if (handler == null) throw new InvalidOperationException("An error was raised for an actor with no error handling capabilities"); handler(this, e); } private void ExecuteActions() { Func<TState, Task> func; if (actions.TryDequeue(out func)) { func(State) .ContinueWith(x => { if (x.Exception != null) { InvokeOnError(new UnhandledExceptionEventArgs(x.Exception, false)); return; } ExecuteActions(); }); return; } lock(this) { activeTask = null; } } }
Thoughts?d
Comments
The recursive task invocation is a nice trick. I like the Task-factory solution very much.
It might be a problem that, if the actor faults, the queue will fill unboundedly. Setting a fault-flag or nulling the actions field would solve this.
Euh, when you add to time consuming tasks to the actor, it seems that the second task is never assign to activeTask as only Add is setting the field.
Shoudn't the ContinueWith clause also set the property? Because after the first run of ExecuteActions, ActiveTask is set to null and the recursive ExecuteActions doesn't set this property. So when you add a task, while the first task is already finished (and activeTask is null), the Add method will invoke a parallel run as it seems..
Isn't there a race here?
ExecuteActions detects that there are no pending actions
Act adds a new action
Act enters the lock, observes non-null activeTask
Act exits
ExecuteActions enters the lock, sets activeTask to null
the action will sit waiting until another call to Act occurs.
I suppose Damien is right. And previous version had better performance due to double checked locking.
I would be nice to have the InvalidOperationException include the original exception as an inner exception.
Shouldn't the next action be called when you can handle the exception through OnError event? the return in ContinueWith should be removed?
And the race condition Damien talks about,
is solved with
lock(this)
{
if (actions.Any())
{
}
else
{
}
}
When the locked is acquired we should not reset activeTask when there is more work available.
// Ryan
Oops, the lock should be release when calling ExecuteActions again ...
me ducks ...
// Ryan
quick fix ...
var callExecuteActions = false;
lock(this)
{
if (actions.Any())
{
callExecuteActions = true;
}
else
{
activeTask = null;
}
}
if (callExecuteActions)
ExecuteActions();
I probably want to refactor the whole method with a while loop now ...
// Ryan
Dave,
We continue executing on the same task (or its continuation, note the ExecuteActions call there.
Yes, there's a race condition here. Instead of solving it, I would suggest investing 5 seconds in an explicit lock root object. This will force you to give it a name. This in its turn will force you to think what shared resource are your really protecting with the lock. That resource is the worker thread (aka activeTask). What you're trying to do is make sure that exactly one worker thread is auto-started and auto-stopped when needed. I believe this is a wrong problem to solve. Instead you should start the thread in constructor and dispose it in the IDisposable.Dispose (and in the finalizer). Then you should make sure your task is not spinning when there is no work.
You can get rid of the lock (and the race): https://gist.github.com/957391
When the action / func (please make up your mind) itself throws an exception, the actor will grind to a halt. activeTask will catch the exception and save it for you, but you never try to access it.
Also, in ExecuteActions I would either invert the if, or at least use an else block for the lock statement, in which case you could also remove the return. On my first pass over the code I thought you were calling ExecuteActions recursively and setting activeTask to null.
@Patrick,
Isn't any exception saved by activeTask being accessed via the continuation:
.ContinueWith(x =>
@Harry
There you continue on the task that gets returned by the func. But what happens if the func itself throws an exception?
Using the RX Framework (Subject class) as the basis for this is a way to get started. I have done this before. Just a suggestion.
@Patrick,
I think I understand now, but correct me if I'm wrong. If it is currently executing a recursive call to ExecuteActions and the func throws, the exception is stored in the previous func's task and its continuation will handle it. But if it is executing the first call to ExecuteActions that was fired by Task.Factory.CreateNew and the func throws, the exception is saved by the activeTask and it is never handled, just silently ignored. Correct?
Well, ContinueWith returns a new task, so on recursive calls this ignored task will contain the exception. In that case when it is GCed, the task will scream bloody murder about no one handling the exception and crash the application.
In the non-recursive case, not only will the exception be silently ignored, which is 'frowned upon', its existence will actually prevent the actor from ever dequeuing the next func ever again.
What about taking it one step further and doing away with the Queue altogether? If you track the last task added, you can add a continuation to it directly effectively creating a linked list of asynchronous tasks.
Also, what happens if a task blocks and never returns? It's kind of a problem with any actor, but you may want to consider a timeout and task cancellation.
Sam, I really wanted to do that, but the problem is, how do you do it with concurrent task addition without allowing concurrent tasks execution.
I'm going to reiterate Anthony's comment in the Part 1 post. Retlang could fit your need. It has matured significantly since your initial review of it several years ago. http://code.google.com/p/retlang/ from the site:
"The library is intended for use in message based concurrency similar to event based actors in Scala.
Retlang relies upon four abstractions: IFiber, IQueue, IExecutor, and IChannel. An IFiber is an abstraction for the context of execution (in most cases a thread). An IQueue is an abstraction for the data structure that holds the actions until the IFiber is ready for more actions. The default implementation, DefaultQueue, is an unbounded storage that uses standard locking to notify when it has actions to execute. An IExecutor performs the actual execution. It is useful as an injection point to achieve fault tolerance, performance profiling, etc. The default implementation, DefaultExecutor, simply executes actions. An IChannel is an abstraction for the conduit through which two or more IFibers communicate (pass messages).
All messages to a particular IFiber are delivered sequentially. Components can easily keep state without synchronizing data access or worrying about thread races."
Using a custom IExecutor with Retlang may solve your issue of "concurrent task addition without concurrent tasks execution"
just my 2 cents...
This is what I was thinking. Let me know if I've missed anything.
An example using Retlang:
Comment preview