Streamlining .NET Asynchronous Operations - Part II
Roger Torres - February 15th, 2009
In Part I of this series I questioned the use of traditional .NET asynchronous patterns when executing a large number of asynchronous operations in a predetermined sequence, showing how the advantages in performance are overshadowed by the complexity in the implementation.
Thanks to the functional language capabilities of C#, we can easily streamline our code by following the guidelines I’m going to discuss here today. The whole idea is based on coroutines, where unlike regular methods (subroutines), we have multiple entry points to resume execution. C# doesn’t support coroutines natively (I hope this is something we will have in future versions), but we can simulate some of that behavior using generators (best known as iterators in C# terms).
The key is to return an enumeration of asynchronous actions that must be executed in sequence by yielding each individual action to the iterator… and as usual, there are many ways to accomplish this goal. I’m going to present a solution that I found well balanced in terms of complexity and functionality.
The Solution
The following class diagram depicts the components of my solution:
Note that all actions have a common base abstract class AsyncAction, so we can nest actions to create compositions of parallel and serial sequences as long as we want. Today we are going to work with an enhanced version of the example discussed in part I, this time downloading the contents of several web pages as shown in the following diagram:
We will see how this mechanism uses ThreadPool threads very efficiently, only starting new threads when necessary.
AsyncAction
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | using System; using System.Threading; namespace CoNatural.Threading.Async { public abstract class AsyncAction { public string Name { get; private set; } public bool RaiseEvents { get; private set; } protected AsyncAction(string name, bool raiseEvents) { Name = name; RaiseEvents = raiseEvents; } internal void Complete(bool cancelled, AsyncActionException error) { Context.OnAsyncActionCompleted(this, cancelled, error); } internal void ChangeProgress(int progressPercentage) { Context.OnAsyncActionProgressChanged(this, progressPercentage); } internal void Trace(string message) { Context.OnAsyncActionTraced(this, message); } internal AsyncContext Context { get; set; } public virtual void Start(AsyncContext context) { Context = context.OnStart(); ThreadPool.QueueUserWorkItem((state) => { BeginInvoke(this, () => context.OnCompleted()); }); } internal abstract void BeginInvoke(AsyncAction source, Action nextAction); } } |
This is the base (abstract) class representing any type of asynchronous action returned by the generator. All our actions are instantiated with a name (to facilitate tracing and debugging), and a flag used by the operation context to raise events generated by the action (Completed, ProgressChanged, Traced). Raising events is something that must be reserved only for actions that you want to debug or display their progress report and termination feedback to the GUI.
We can define an asynchronous operation as the execution of one or many nested asynchronous actions. To start an operation, the user invokes the Start method of the root action, passing an execution context that is reserved by the engine until the operation is completed. Note that the root’s BeginInvoke method is queued in the ThreadPool, so the main calling thread is not blocked by the operation.
BeginInvoke is the method that binds our nested actions together. Each concrete action implements BeginInvoke according to their specific compositional properties. When the asynchronous operation is started, the engine binds each executing action to the execution context (AsyncContext) and starts calling each action’s BeginInvoke in order until the operation is completed. We will see below how the AsyncContext takes care of posting individual AsyncAction events (and its own Completed event) to the current synchronization context.
AsyncAction<T>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | using System; using System.Threading; namespace CoNatural.Threading.Async { public class AsyncAction<T> : AsyncAction { private Func<AsyncCallback, object, IAsyncResult> _beginXxx; private Func<IAsyncResult, T> _endXxx; public T Result { get; private set; } public AsyncAction(string name, Func<AsyncCallback, object, IAsyncResult> beginXxx, Func<IAsyncResult, T> endXxx) : base(name, false) { _beginXxx = beginXxx; _endXxx = endXxx; } internal override void BeginInvoke(AsyncAction source, Action nextAction) { Context = source.Context; try { _beginXxx( delegate(IAsyncResult asyncResult) { try { Result = _endXxx(asyncResult); Complete(false, null); } catch (Exception ex) { Complete(false, new AsyncActionException(this, string.Empty, ex)); } nextAction(); }, null ); } catch (Exception ex) { Complete(false, new AsyncActionException(this, string.Empty, ex)); nextAction(); } } } } |
This is our first and most important concrete AsyncAction. Its main function is to wrap all .NET APIs using the BeginXxx-EndXxx asynchronous pattern, returning a result of type T (in most cases). The constructor accepts the parameters of the base class plus delegates to the BeginXxx-EndXxx methods.
Note how BeginInvoke wraps the .NET asynchronous pattern, effectively blocking the sequence until the action is completed by EndXxx. In case of success, the result of the action is saved and the Completed event is raised. In case of errors, the Completed event is also raised, but this time with the exception returned in the Error property of the event’s arguments. Finally, the sequence is resumed by invoking the delegate of the next action.
We are achieving our first goal of executing multiple asynchronous actions in a predetermined serial sequence, but we need a way to encapsulate the specific details of every .NET API in a nicer form. Now is when the C# 3.0 extension methods come to the rescue. The code below shows one way to extend System.Net.WebRequest and System.IO.Stream by wrapping their BeginXxx-EndXxx .NET asynchronous operations inside AsyncAction<T> actions.
1 2 3 4 5 6 7 8 9 10 11 | using System; using System.Net; namespace CoNatural.Threading.Async.Extensions { public static class WebRequestExtensions { public static AsyncAction<WebResponse> GetResponseAsync(this WebRequest request) { return new AsyncAction<WebResponse>(request.RequestUri.AbsoluteUri, request.BeginGetResponse, request.EndGetResponse); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | using System; using System.Collections.Generic; using System.IO; namespace CoNatural.Threading.Async.Extensions { public static class StreamExtensions { public static AsyncAction<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count) { return new AsyncAction<int>("ReadAsync", (callback, state) => stream.BeginRead(buffer, offset, count, callback, state), stream.EndRead); } public static AsyncAction<int> WriteAsync(this Stream stream, byte[] buffer, int offset, int count) { return new AsyncAction<int>("WriteAsync", (callback, state) => stream.BeginWrite(buffer, offset, count, callback, state), (asyncResult) => { stream.EndWrite(asyncResult); return 0; }); } public static IEnumerable<AsyncAction> CopyAsync(this Stream stream, Stream copyTo) { long total = stream.CanSeek ? stream.Length : -1; int read = -1; byte[] buffer = new byte[1024]; while (read != 0) { AsyncAction<int> count = stream.ReadAsync(buffer, 0, 1024); yield return count; yield return copyTo.WriteAsync(buffer, 0, count.Result); if (total > 0) yield return new Progress((int)(((double)count.Result / (double)total) * 100.0)); else yield return new Trace(count.Result.ToString()); read = count.Result; } } } } |
We will see later how we can use these extension methods to implement our asynchronous operations.
Serial
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | using System; using System.Collections.Generic; namespace CoNatural.Threading.Async { public class Serial : AsyncAction { protected IEnumerable<AsyncAction> _actions; public Serial(string name, bool raiseEvents, IEnumerable<AsyncAction> actions) : base(name, raiseEvents) { _actions = actions; } internal override void BeginInvoke(AsyncAction source, Action nextAction) { Context = source.Context; MoveNext(_actions.GetEnumerator(), nextAction); } private void MoveNext(IEnumerator<AsyncAction> actionEnumerator, Action nextAction) { try { // stop when cancellation requested in context if (Context.CancelRequested) { Complete(true, null); nextAction(); } // advance iteration else if (actionEnumerator.MoveNext()) { actionEnumerator.Current.BeginInvoke(this, () => MoveNext(actionEnumerator, nextAction)); } // we are done else { Complete(false, null); nextAction(); } } catch (AsyncActionException aex) { Complete(false, aex); nextAction(); } catch (Exception ex) { Complete(false, new AsyncActionException(this, string.Empty, ex)); nextAction(); } } } } |
Going back to our first goal, we needed serial actions to synchronize our sequences. Our Serial class takes an IEnumerable list of AsyncAction actions that must be invoked in order.
BeginInvoke is where the fun begins, in this case we save the execution context as usual and start the enumeration of actions by calling MoveNext. Here we check if the context has received a cancellation request to stop the execution, otherwise we just need to advance the iteration and call BeginInvoke again, passing a delegate to MoveNext (this is the recursive part) to guarantee the order of the sequence. Once we reach the end of the enumeration, the serial operation is completed and we can stop the recursion. Of course, in case of exceptions, we also stop and report the error in the completion event.
Serial<T>
1 2 3 4 5 6 7 8 9 10 11 | using System; using System.Collections.Generic; namespace CoNatural.Threading.Async { public class Serial<T> : Serial { public T Result { get; internal set; } public Serial(string name, bool raiseEvents, IEnumerable<AsyncAction> actions) : base(name, raiseEvents, actions) { } } } |
In some cases our sequence will return a result of type T, so we need a special Serial<T> class with a property to store the result.
Parallel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | using System; using System.Collections.Generic; using System.Threading; namespace CoNatural.Threading.Async { /// <summary> /// Executes multiple asynchronous actions in parallel. /// </summary> public class Parallel : AsyncAction { public AsyncAction[] Branches { get; private set; } public Parallel(string name, bool raiseEvents, params AsyncAction[] branches) : base(name, raiseEvents) { Branches = branches; } public override void Start(AsyncContext context) { Context = context.OnStart(); BeginInvoke(this, () => context.OnCompleted()); } internal override void BeginInvoke(AsyncAction source, Action nextAction) { Context = source.Context; // parallel actions start in their own managed threads because they must wait // for all branches to complete before resuming ThreadPool.QueueUserWorkItem( (state) => { long activeBranches = Branches.Length; ManualResetEvent yield = new ManualResetEvent(false); foreach (AsyncAction branch in Branches) branch.BeginInvoke( this, () => { // this branch is done, decrement counter long pending = Interlocked.Decrement(ref activeBranches); // release parallel action when all branches are completed if (pending == 0) yield.Set(); } ); // wait for all branches to complete yield.WaitOne(); // we are done Complete(false, null); nextAction(); } ); } } } |
Parallel actions complete our compositional requirements. Here we take a list of actions that must be executed concurrently (called branches), and schedule a thread to invoke each one of them individually. After all branches are invoked, the thread controlling the parallel operation is blocked, waiting for all branches to complete. Note the lambda expression that’s assigned as the “next action” to complete a branch: The counter of pending branches is decremented, and the ManuelResetEvent is signaled to release the parallel thread when the counter = 0.
Return<T>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | using System; namespace CoNatural.Threading.Async { /// <summary> /// Returns Serial of T action results. /// </summary> /// <typeparam name="T">Result type.</typeparam> public class Return<T> : AsyncAction { public T Value { get; private set; } public Return(T value) : base(string.Empty, false) { Value = value; } internal override void BeginInvoke(AsyncAction source, Action nextAction) { if (source is Serial<T>) { ((Serial<T>)source).Result = Value; nextAction(); } else throw new InvalidAsyncActionException(source, "Source of Return<T> must be Serial<T>."); } } } |
Since we are emulating coroutines with C# iterators, we must “yield” all results to the engine as AsyncAction actions, including action results, progress and trace events. The Return<T> action is a helper class to assign the result of a Serial<T> action. Progress and Trace actions are helpers to report progress and events to the execution context. We will see all these classes “in action” later when we implement our example.
Progress
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | using System; namespace CoNatural.Threading.Async { public class Progress : AsyncAction { public int ProgressPercentage { get; private set; } public Progress(int progressPercentage) : base(string.Empty, false) { ProgressPercentage = progressPercentage; } internal override void BeginInvoke(AsyncAction source, Action nextAction) { source.ChangeProgress(ProgressPercentage); nextAction(); } } } |
Trace
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | using System; namespace CoNatural.Threading.Async { public class Trace : AsyncAction { public string Message { get; private set; } public Trace(string message) : base(string.Empty, false) { Message = message; } internal override void BeginInvoke(AsyncAction source, Action nextAction) { source.Trace(Message); nextAction(); } } } |
AsyncContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | using System; using System.Threading; using System.Collections.Generic; using System.ComponentModel; namespace CoNatural.Threading.Async { public class AsyncContext { public event EventHandler<AsyncCompletedEventArgs> Completed; public event EventHandler<AsyncActionCompletedEventArgs> AsyncActionCompleted; public event EventHandler<AsyncActionProgressChangedEventArgs> AsyncActionProgressChanged; public event EventHandler<AsyncActionTracedEventArgs> AsyncActionTraced; private SynchronizationContext _syncContext; public AsyncContext() { _syncContext = SynchronizationContext.Current ?? new SynchronizationContext(); } private volatile bool _busy; public bool Busy { get { return _busy; } } private volatile bool _cancelRequested; private volatile bool _cancelled; internal bool CancelRequested { get { return _cancelRequested; } } public void Cancel() { if (_busy) _cancelRequested = true; } private List<AsyncActionException> _errors = new List<AsyncActionException>(); internal AsyncContext OnStart() { lock (this) { if (_busy) throw new Exception("Context is busy running other actions."); _busy = true; _cancelRequested = false; _cancelled = false; _errors.Clear(); return this; } } internal void OnCompleted() { _busy = false; if (Completed != null) { AsyncContextException error = (_errors.Count > 0 ? new AsyncContextException(_errors.ToArray()) : null); _syncContext.Post((state) => Completed(this, new AsyncCompletedEventArgs(error, _cancelled, state)), null); } } internal void OnAsyncActionCompleted(AsyncAction action, bool cancelled, AsyncActionException error) { lock (this) { if (error != null) { _errors.Add(error); // request cancellation after errors found _cancelRequested = true; } // at least one action was cancelled if (cancelled) { _cancelRequested = true; _cancelled = true; } } if (action.RaiseEvents && AsyncActionCompleted != null) _syncContext.Post((state) => AsyncActionCompleted(action, new AsyncActionCompletedEventArgs(action, Thread.CurrentThread.ManagedThreadId, cancelled, error)), null); } internal void OnAsyncActionProgressChanged(AsyncAction action, int progressPercentage) { if (action.RaiseEvents && AsyncActionProgressChanged != null) { progressPercentage = Math.Max(0, progressPercentage); progressPercentage = Math.Max(100, progressPercentage); _syncContext.Post((state) => AsyncActionProgressChanged(action, new AsyncActionProgressChangedEventArgs(action, Thread.CurrentThread.ManagedThreadId, progressPercentage)), null); } } internal void OnAsyncActionTraced(AsyncAction action, string message) { if (action.RaiseEvents && AsyncActionTraced != null) _syncContext.Post((state) => AsyncActionTraced(action, new AsyncActionTracedEventArgs(action, Thread.CurrentThread.ManagedThreadId, message)), null); } } } |
The execution context is controlled by one AsyncContext instance. We can subscribe for the entire operation’s completion event, or for individual action events (only the actions that are flagged to raise events). The context can receive cancellation requests that will stop all the executing actions. Note that in case of exceptions, the cancellation flag is also set to stop the operation, but I’ll probably reconsider this point later to allow other independent actions to complete.
Putting it all together
Let’s now review our initial example. We need to implement a method to download the contents of a few web sites as follows:
1 2 3 4 5 6 7 8 9 10 11 | void DownloadAll() { Serial<string> microsoft = new Serial<string>("microsoft", true, DownloadAsync("http://www.microsoft.com")); Serial<string> conatural = new Serial<string>("conatural", true, DownloadAsync("http://www.conatural.com")); Serial<string> google = new Serial<string>("google", true, DownloadAsync("http://www.google.com")); Serial<string> yahoo = new Serial<string>("yahoo", true, DownloadAsync("http://www.yahoo.com")); Parallel favorite = new Parallel("Favorite Sites", true, google, yahoo); Parallel all = new Parallel("All Sites", true, microsoft, conatural, favorite); all.Start(context); } |
While the structure of our operation is controlled by the method above, the details are implemented here:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | private IEnumerable<AsyncAction> DownloadAsync(string url) { WebRequest req = HttpWebRequest.Create(url); // get contents of site AsyncAction<WebResponse> response = req.GetResponseAsync(); yield return response; // copy results to memory stream Stream responseStream = response.Result.GetResponseStream(); MemoryStream copyTo = new MemoryStream(); yield return new Serial(url, true, responseStream.CopyAsync(copyTo)); // return results copyTo.Seek(0, SeekOrigin.Begin); string html = new StreamReader(copyTo).ReadToEnd(); yield return new Return<string>(html); } |
Note how we are using the new extension methods to simplify our code.
In the last part of this series I’ll show how to implement asynchronous operations in a Windows Forms application that will:
- Allow to cancel the operation.
- Report the progress of internal actions in the GUI.
- Show how the ThreadPool threads are using efficiently.
- Show how exceptions are handled by the engine.
Stay tuned.


[...] http://blogs.conatural.com/2009/02/streamlining-net-asynchronous-operations-part-ii/ [...]