Streamlining .NET Asynchronous Operations - Part III

Roger Torres - February 21st, 2009
Comments

This is the last part of my series about .NET asynchronous operations. Here you can download:

Since we didn’t quite finish our example in part II, in this post I’m going to show you a simple Windows Forms application that will:

  • Provide a GUI to our last example.
  • Allow the user to cancel the asynchronous operation.
  • Show the managed thread id where internal actions execute, so you can understand how ThreadPool threads are created.
  • Show the code that handles operation events and errors.

The Form

Let’s start by creating a new Windows Application Project named WinFormDemo. The default Form1 will do, we just need to add a toolbar with two buttons (to start and cancel the asynchronous operation), and a list view to show the execution log - see the screenshots below.

Now we can associate the following code-behind to our form:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Text;
using System.Windows.Forms;
using System.Net;
using System.IO;
using CoNatural.Threading.Async;
using CoNatural.Extensions.Threading.Async.IO;
using CoNatural.Extensions.Threading.Async.Net;
 
namespace WinFormDemo {
   public partial class Form1 : Form {
      AsyncContext context;
 
      public Form1() {
         InitializeComponent();
 
         // create context
         context = new AsyncContext();
         context.Completed += new EventHandler<AsyncCompletedEventArgs>(Completed);
         context.AsyncActionCompleted += new EventHandler<AsyncActionCompletedEventArgs>(ActionCompleted);
         context.AsyncActionProgressChanged += new EventHandler<AsyncActionProgressChangedEventArgs>(ReportProgress);
         context.AsyncActionTraced += new EventHandler<AsyncActionTracedEventArgs>(TraceAction);
      }
 
      private void WriteLine(string message, params object[] args) {
         logListView.Items.Add(string.Format(message, args));
      }
 
      private IEnumerable<AsyncAction> DownloadAsync(string url) {
         WebRequest req = HttpWebRequest.Create(url);
 
         // get contents of site
         AsyncAction<WebResponse> response = req.GetResponseAsync();
         yield return response;
 
         // copy results to memory stream
         Stream responseStream = response.Result.GetResponseStream();
         MemoryStream copyTo = new MemoryStream();
         yield return new Serial(url, true, responseStream.CopyAsync(copyTo));
 
         // return results
         copyTo.Seek(0, SeekOrigin.Begin);
         string html = new StreamReader(copyTo).ReadToEnd();
         yield return new Return<string>(html);
      }
 
      void DownloadAll() {
         Serial<string> microsoft = new Serial<string>("microsoft", true, DownloadAsync("http://www.microsoft.com"));
         Serial<string> conatural = new Serial<string>("conatural", true, DownloadAsync("http://www.conatural.com"));
 
         Serial<string> google = new Serial<string>("google", true, DownloadAsync("http://www.google.com"));
         Serial<string> yahoo = new Serial<string>("yahoo", true, DownloadAsync("http://www.yahoo.com"));
 
         Parallel favorite = new Parallel("Favorite Sites", true, google, yahoo);
 
         Parallel all = new Parallel("All Sites", true, microsoft, conatural, favorite);
 
         all.Start(context);
      }
 
      private void Completed(object sender, AsyncCompletedEventArgs args) {
         if (args.Cancelled)
            WriteLine("CONTEXT WAS CANCELLED.");
 
         if (args.Error != null)
            foreach (Exception ex in ((AsyncContextException)args.Error).Errors)
               DisplayError(ex);
 
         WriteLine("CONTEXT COMPLETED.");
      }
 
      private void ActionCompleted(object sender, AsyncActionCompletedEventArgs args) {
         if (args.Error != null)
            DisplayError(args.Error);
 
         if (args.Cancelled)
            WriteLine(args.Action.Name + " Cancelled");
 
         if (args.Error == null && !args.Cancelled) {
            if (args.Action is Serial<string>) {
               // Form2 displays the contents of a web site in a Browser control
               //Form2 f2 = new Form2();
               //f2.SetContent(((Serial<string>)args.Action).Result);
               //f2.Text = args.Action.Name;
               //f2.Show();
            }
            else {
               WriteLine("[{0}][{1}] completed", args.ManagedThreadId, args.Action.Name);
            }
         }
      }
 
      private void ReportProgress(object sender, AsyncActionProgressChangedEventArgs args) {
         WriteLine("[{0}][{1}] {2}", args.ManagedThreadId, args.Action.Name, args.ProgressPercentage);
      }
 
      private void TraceAction(object sender, AsyncActionTracedEventArgs args) {
         WriteLine("[{0}][{1}] {2}", args.ManagedThreadId, args.Action.Name, args.Message);
      }
 
      private void DisplayError(Exception ex) {
         if (ex != null) {
            if (ex is AsyncActionException)
               WriteLine("{0} -> {1}", ((AsyncActionException)ex).Action.Name, ex.Message);
            else
               WriteLine(ex.Message);
            DisplayError(ex.InnerException);
         }
      }
 
      private void startToolStripButton_Click(object sender, EventArgs e) {
         try {
            DownloadAll();
            logListView.Items.Clear();
            WriteLine("[{0}] GUI thread is alive...", System.Threading.Thread.CurrentThread.ManagedThreadId);
         }
         catch (Exception ex) {
            WriteLine(ex.Message);
         }
      }
 
      private void cancelToolStripButton_Click(object sender, EventArgs e) {
         context.Cancel();
      }
   }
}

We are using exactly the same code from our example in part II, but we are now handling operation events. Note that we don’t need to check if the event handling methods are executing in the GUI thread since the AsyncContext class is already taking care of the details by posting the events to the current synchronization context.

The first of the following two screenshots depicts the application after successfully downloading the contents of several web sites. The log is created by handling Trace events that return the managed thread id where the action is executing, the action’s name, and total bytes downloaded so far.

The second screenshot shows what happens when the Start button is hit twice after the operation has been started - An exception is raised by a “busy” context.

Also note that when the Cancel button is hit, the entire operation stops (the completion event is raised with the Cancelled flag set to true).

Async Test Results

Async Test Results

The best way to understand what’s going on here is by playing with this code, so I encourage you to build your own prototypes… and let me know if you find any issues please ;-)

What’s next?

Now that I have a system to invoke asynchronous operations in any predetermined sequence, I think it would be a good idea to go back to the drawing board and add an asynchronous component to the CoNatural.Data DAL. The ADO.NET SQL Server provider (under System.Data.SqlClient) already supports asynchronous execution, so with a couple of extension methods I think it will be very easy to invoke asynchronous data commands that will:

  • Execute without blocking the calling thread, making our GUI more responsive.
  • Allow the user to cancel commands.
  • Invoke multiple commands in an ordered sequence or in parallel to maximize performance.
  • The commands can be executed on one or multiple databases, and under the same transaction scope.

Stay tuned.

Streamlining .NET Asynchronous Operations - Part II

Roger Torres - February 15th, 2009
1 Comment

In Part I of this series I questioned the use of traditional .NET asynchronous patterns when executing a large number of asynchronous operations in a predetermined sequence, showing how the advantages in performance are overshadowed by the complexity in the implementation.

Thanks to the functional language capabilities of C#, we can easily streamline our code by following the guidelines I’m going to discuss here today. The whole idea is based on coroutines, where unlike regular methods (subroutines), we have multiple entry points to resume execution. C# doesn’t support coroutines natively (I hope this is something we will have in future versions), but we can simulate some of that behavior using generators (best known as iterators in C# terms).

The key is to return an enumeration of asynchronous actions that must be executed in sequence by yielding each individual action to the iterator… and as usual, there are many ways to accomplish this goal. I’m going to present a solution that I found well balanced in terms of complexity and functionality.

The Solution

The following class diagram depicts the components of my solution:

CoNatural.Threading.Async Class Diagram

Note that all actions have a common base abstract class AsyncAction, so we can nest actions to create compositions of parallel and serial sequences as long as we want. Today we are going to work with an enhanced version of the example discussed in part I, this time downloading the contents of several web pages as shown in the following diagram:

Asynchronous Operation

We will see how this mechanism uses ThreadPool threads very efficiently, only starting new threads when necessary.

AsyncAction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using System;
using System.Threading;
 
namespace CoNatural.Threading.Async {
   public abstract class AsyncAction {
      public string Name { get; private set; }
      public bool RaiseEvents { get; private set; }
 
      protected AsyncAction(string name, bool raiseEvents) {
         Name = name;
         RaiseEvents = raiseEvents;
      }
 
      internal void Complete(bool cancelled, AsyncActionException error) {
         Context.OnAsyncActionCompleted(this, cancelled, error);
      }
 
      internal void ChangeProgress(int progressPercentage) {
         Context.OnAsyncActionProgressChanged(this, progressPercentage);
      }
 
      internal void Trace(string message) {
         Context.OnAsyncActionTraced(this, message);
      }
 
      internal AsyncContext Context { get; set; }
 
      public virtual void Start(AsyncContext context) {
         Context = context.OnStart();
         ThreadPool.QueueUserWorkItem((state) => { BeginInvoke(this, () => context.OnCompleted()); });
      }
 
      internal abstract void BeginInvoke(AsyncAction source, Action nextAction);
   }
}

This is the base (abstract) class representing any type of asynchronous action returned by the generator. All our actions are instantiated with a name (to facilitate tracing and debugging), and a flag used by the operation context to raise events generated by the action (Completed, ProgressChanged, Traced). Raising events is something that must be reserved only for actions that you want to debug or display their progress report and termination feedback to the GUI.

We can define an asynchronous operation as the execution of one or many nested asynchronous actions. To start an operation, the user invokes the Start method of the root action, passing an execution context that is reserved by the engine until the operation is completed. Note that the root’s BeginInvoke method is queued in the ThreadPool, so the main calling thread is not blocked by the operation.

BeginInvoke is the method that binds our nested actions together. Each concrete action implements BeginInvoke according to their specific compositional properties. When the asynchronous operation is started, the engine binds each executing action to the execution context (AsyncContext) and starts calling each action’s BeginInvoke in order until the operation is completed. We will see below how the AsyncContext takes care of posting individual AsyncAction events (and its own Completed event) to the current synchronization context.

AsyncAction<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
using System;
using System.Threading;
 
namespace CoNatural.Threading.Async {
   public class AsyncAction<T> : AsyncAction {
      private Func<AsyncCallback, object, IAsyncResult> _beginXxx;
      private Func<IAsyncResult, T> _endXxx;
      public T Result { get; private set; }
 
      public AsyncAction(string name, Func<AsyncCallback, object, IAsyncResult> beginXxx, Func<IAsyncResult, T> endXxx)
         : base(name, false) {
         _beginXxx = beginXxx;
         _endXxx = endXxx;
      }
 
      internal override void BeginInvoke(AsyncAction source, Action nextAction) {
         Context = source.Context;
 
         try {
            _beginXxx(
               delegate(IAsyncResult asyncResult) {
                  try {
                     Result = _endXxx(asyncResult);
                     Complete(false, null);
                  }
                  catch (Exception ex) {
                     Complete(false, new AsyncActionException(this, string.Empty, ex));
                  }
                  nextAction();
               },
               null
            );
         }
         catch (Exception ex) {
            Complete(false, new AsyncActionException(this, string.Empty, ex));
            nextAction();       
         }
      }
   }
}

This is our first and most important concrete AsyncAction. Its main function is to wrap all .NET APIs using the BeginXxx-EndXxx asynchronous pattern, returning a result of type T (in most cases). The constructor accepts the parameters of the base class plus delegates to the BeginXxx-EndXxx methods.

Note how BeginInvoke wraps the .NET asynchronous pattern, effectively blocking the sequence until the action is completed by EndXxx. In case of success, the result of the action is saved and the Completed event is raised. In case of errors, the Completed event is also raised, but this time with the exception returned in the Error property of the event’s arguments. Finally, the sequence is resumed by invoking the delegate of the next action.

We are achieving our first goal of executing multiple asynchronous actions in a predetermined serial sequence, but we need a way to encapsulate the specific details of every .NET API in a nicer form. Now is when the C# 3.0 extension methods come to the rescue. The code below shows one way to extend System.Net.WebRequest and System.IO.Stream by wrapping their BeginXxx-EndXxx .NET asynchronous operations inside AsyncAction<T> actions.

1
2
3
4
5
6
7
8
9
10
11
using System;
using System.Net;
 
namespace CoNatural.Threading.Async.Extensions {
   public static class WebRequestExtensions {
      public static AsyncAction<WebResponse> GetResponseAsync(this WebRequest request) {
         return new AsyncAction<WebResponse>(request.RequestUri.AbsoluteUri, 
            request.BeginGetResponse, request.EndGetResponse);
      }
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
using System;
using System.Collections.Generic;
using System.IO;
 
namespace CoNatural.Threading.Async.Extensions {
   public static class StreamExtensions {
      public static AsyncAction<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count) {
         return new AsyncAction<int>("ReadAsync",
            (callback, state) =>
               stream.BeginRead(buffer, offset, count, callback, state), stream.EndRead);
      }
 
      public static AsyncAction<int> WriteAsync(this Stream stream, byte[] buffer, int offset, int count) {
         return new AsyncAction<int>("WriteAsync",
            (callback, state) =>
               stream.BeginWrite(buffer, offset, count, callback, state),
               (asyncResult) => { stream.EndWrite(asyncResult); return 0; });
      }
 
      public static IEnumerable<AsyncAction> CopyAsync(this Stream stream, Stream copyTo) {
         long total = stream.CanSeek ? stream.Length : -1;
         int read = -1;
         byte[] buffer = new byte[1024];
         while (read != 0) {
            AsyncAction<int> count = stream.ReadAsync(buffer, 0, 1024);
            yield return count;
 
            yield return copyTo.WriteAsync(buffer, 0, count.Result);
 
            if (total > 0)
               yield return new Progress((int)(((double)count.Result / (double)total) * 100.0));
            else
               yield return new Trace(count.Result.ToString());
 
            read = count.Result;
         }
      }
   }
}

We will see later how we can use these extension methods to implement our asynchronous operations.

Serial

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
using System;
using System.Collections.Generic;
 
namespace CoNatural.Threading.Async {
   public class Serial : AsyncAction {
      protected IEnumerable<AsyncAction> _actions;
 
      public Serial(string name, bool raiseEvents, IEnumerable<AsyncAction> actions)
         : base(name, raiseEvents) {
         _actions = actions;
      }
 
      internal override void BeginInvoke(AsyncAction source, Action nextAction) {
         Context = source.Context;
         MoveNext(_actions.GetEnumerator(), nextAction);
      }
 
      private void MoveNext(IEnumerator<AsyncAction> actionEnumerator, Action nextAction) {
         try {
            // stop when cancellation requested in context
            if (Context.CancelRequested) {
               Complete(true, null);
               nextAction();
            }
            // advance iteration
            else if (actionEnumerator.MoveNext()) {
               actionEnumerator.Current.BeginInvoke(this, () => MoveNext(actionEnumerator, nextAction));
            }
            // we are done
            else {
               Complete(false, null);
               nextAction();
            }
         }
         catch (AsyncActionException aex) {
            Complete(false, aex);
            nextAction();
         }
         catch (Exception ex) {
            Complete(false, new AsyncActionException(this, string.Empty, ex));
            nextAction();
         }
      }
   }
}

Going back to our first goal, we needed serial actions to synchronize our sequences. Our Serial class takes an IEnumerable list of AsyncAction actions that must be invoked in order.

BeginInvoke is where the fun begins, in this case we save the execution context as usual and start the enumeration of actions by calling MoveNext. Here we check if the context has received a cancellation request to stop the execution, otherwise we just need to advance the iteration and call BeginInvoke again, passing a delegate to MoveNext (this is the recursive part) to guarantee the order of the sequence. Once we reach the end of the enumeration, the serial operation is completed and we can stop the recursion. Of course, in case of exceptions, we also stop and report the error in the completion event.

Serial<T>

1
2
3
4
5
6
7
8
9
10
11
using System;
using System.Collections.Generic;
 
namespace CoNatural.Threading.Async {
   public class Serial<T> : Serial {
      public T Result { get; internal set; }
 
      public Serial(string name, bool raiseEvents, IEnumerable<AsyncAction> actions)
         : base(name, raiseEvents, actions) { }
   }
}

In some cases our sequence will return a result of type T, so we need a special Serial<T> class with a property to store the result.

Parallel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
using System;
using System.Collections.Generic;
using System.Threading;
 
namespace CoNatural.Threading.Async {
   /// <summary>
   /// Executes multiple asynchronous actions in parallel.
   /// </summary>
   public class Parallel : AsyncAction {
      public AsyncAction[] Branches { get; private set; }
 
      public Parallel(string name, bool raiseEvents, params AsyncAction[] branches)
         : base(name, raiseEvents) {
         Branches = branches;
      }
 
      public override void Start(AsyncContext context) {
         Context = context.OnStart();
         BeginInvoke(this, () => context.OnCompleted());
      }
 
      internal override void BeginInvoke(AsyncAction source, Action nextAction) {
         Context = source.Context;
 
         // parallel actions start in their own managed threads because they must wait
         // for all branches to complete before resuming
         ThreadPool.QueueUserWorkItem(
            (state) => {
               long activeBranches = Branches.Length;
               ManualResetEvent yield = new ManualResetEvent(false);
 
               foreach (AsyncAction branch in Branches)
                  branch.BeginInvoke(
                     this,
                     () => {
                        // this branch is done, decrement counter
                        long pending = Interlocked.Decrement(ref activeBranches);
 
                        // release parallel action when all branches are completed
                        if (pending == 0)
                           yield.Set();
                     }
                  );
 
               // wait for all branches to complete
               yield.WaitOne();
 
               // we are done
               Complete(false, null);
               nextAction();
            }
         );
      }
   }
}

Parallel actions complete our compositional requirements. Here we take a list of actions that must be executed concurrently (called branches), and schedule a thread to invoke each one of them individually. After all branches are invoked, the thread controlling the parallel operation is blocked, waiting for all branches to complete. Note the lambda expression that’s assigned as the “next action” to complete a branch: The counter of pending branches is decremented, and the ManuelResetEvent is signaled to release the parallel thread when the counter = 0.

Return<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
using System;
 
namespace CoNatural.Threading.Async {
   /// <summary>
   /// Returns Serial of T action results.
   /// </summary>
   /// <typeparam name="T">Result type.</typeparam>
   public class Return<T> : AsyncAction {
      public T Value { get; private set; }
 
      public Return(T value)
         : base(string.Empty, false) {
         Value = value;
      }
 
      internal override void BeginInvoke(AsyncAction source, Action nextAction) {
         if (source is Serial<T>) {
            ((Serial<T>)source).Result = Value;
            nextAction();
         }
         else
            throw new InvalidAsyncActionException(source, "Source of Return<T> must be Serial<T>.");
      }
   }
}

Since we are emulating coroutines with C# iterators, we must “yield” all results to the engine as AsyncAction actions, including action results, progress and trace events. The Return<T> action is a helper class to assign the result of a Serial<T> action. Progress and Trace actions are helpers to report progress and events to the execution context. We will see all these classes “in action” later when we implement our example.

Progress

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
using System;
 
namespace CoNatural.Threading.Async {
   public class Progress : AsyncAction {
      public int ProgressPercentage { get; private set; }
 
      public Progress(int progressPercentage)
         : base(string.Empty, false) {
         ProgressPercentage = progressPercentage;
      }
 
      internal override void BeginInvoke(AsyncAction source, Action nextAction) {
         source.ChangeProgress(ProgressPercentage);
         nextAction();
      }
   }
}

Trace

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
using System;
 
namespace CoNatural.Threading.Async {
   public class Trace : AsyncAction {
      public string Message { get; private set; }
 
      public Trace(string message)
         : base(string.Empty, false) {
         Message = message;
      }
 
      internal override void BeginInvoke(AsyncAction source, Action nextAction) {
         source.Trace(Message);
         nextAction();
      }
   }
}

AsyncContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
using System;
using System.Threading;
using System.Collections.Generic;
using System.ComponentModel;
 
namespace CoNatural.Threading.Async {
   public class AsyncContext {
      public event EventHandler<AsyncCompletedEventArgs> Completed;
      public event EventHandler<AsyncActionCompletedEventArgs> AsyncActionCompleted;
      public event EventHandler<AsyncActionProgressChangedEventArgs> AsyncActionProgressChanged;
      public event EventHandler<AsyncActionTracedEventArgs> AsyncActionTraced;
 
      private SynchronizationContext _syncContext;
      public AsyncContext() {
         _syncContext = SynchronizationContext.Current ?? new SynchronizationContext();
      }
 
      private volatile bool _busy;
      public bool Busy { get { return _busy; } }
 
      private volatile bool _cancelRequested;
      private volatile bool _cancelled;
      internal bool CancelRequested { get { return _cancelRequested; } }
 
      public void Cancel() {
         if (_busy)
            _cancelRequested = true;
      }
 
      private List<AsyncActionException> _errors = new List<AsyncActionException>();
 
      internal AsyncContext OnStart() {
         lock (this) {
            if (_busy)
               throw new Exception("Context is busy running other actions.");
 
            _busy = true;
            _cancelRequested = false;
            _cancelled = false;
            _errors.Clear();
 
            return this;
         }
      }
 
      internal void OnCompleted() {
         _busy = false;
 
         if (Completed != null) {
            AsyncContextException error = (_errors.Count > 0 ? new AsyncContextException(_errors.ToArray()) : null);
            _syncContext.Post((state) =>
               Completed(this, new AsyncCompletedEventArgs(error, _cancelled, state)), null);
         }
      }
 
      internal void OnAsyncActionCompleted(AsyncAction action, bool cancelled, AsyncActionException error) {
         lock (this) {
            if (error != null) {
               _errors.Add(error);
               // request cancellation after errors found
               _cancelRequested = true;
            }
 
            // at least one action was cancelled
            if (cancelled) {
               _cancelRequested = true;
               _cancelled = true;
            }
         }
 
         if (action.RaiseEvents && AsyncActionCompleted != null)
            _syncContext.Post((state) =>
               AsyncActionCompleted(action,
               new AsyncActionCompletedEventArgs(action, Thread.CurrentThread.ManagedThreadId, cancelled, error)), null);
      }
 
      internal void OnAsyncActionProgressChanged(AsyncAction action, int progressPercentage) {
         if (action.RaiseEvents && AsyncActionProgressChanged != null) {
            progressPercentage = Math.Max(0, progressPercentage);
            progressPercentage = Math.Max(100, progressPercentage);
            _syncContext.Post((state) =>
               AsyncActionProgressChanged(action,
               new AsyncActionProgressChangedEventArgs(action, Thread.CurrentThread.ManagedThreadId, progressPercentage)), null);
         }
      }
 
      internal void OnAsyncActionTraced(AsyncAction action, string message) {
         if (action.RaiseEvents && AsyncActionTraced != null)
            _syncContext.Post((state) =>
               AsyncActionTraced(action,
               new AsyncActionTracedEventArgs(action, Thread.CurrentThread.ManagedThreadId, message)), null);
      }
   }
}

The execution context is controlled by one AsyncContext instance. We can subscribe for the entire operation’s completion event, or for individual action events (only the actions that are flagged to raise events). The context can receive cancellation requests that will stop all the executing actions. Note that in case of exceptions, the cancellation flag is also set to stop the operation, but I’ll probably reconsider this point later to allow other independent actions to complete.

Putting it all together

Let’s now review our initial example. We need to implement a method to download the contents of a few web sites as follows:

1
2
3
4
5
6
7
8
9
10
11
void DownloadAll() {
   Serial<string> microsoft = new Serial<string>("microsoft", true, DownloadAsync("http://www.microsoft.com"));
   Serial<string> conatural = new Serial<string>("conatural", true, DownloadAsync("http://www.conatural.com"));
 
   Serial<string> google = new Serial<string>("google", true, DownloadAsync("http://www.google.com"));
   Serial<string> yahoo = new Serial<string>("yahoo", true, DownloadAsync("http://www.yahoo.com"));
 
   Parallel favorite = new Parallel("Favorite Sites", true, google, yahoo);
   Parallel all = new Parallel("All Sites", true, microsoft, conatural, favorite);
   all.Start(context);
}

While the structure of our operation is controlled by the method above, the details are implemented here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private IEnumerable<AsyncAction> DownloadAsync(string url) {
   WebRequest req = HttpWebRequest.Create(url);
 
   // get contents of site
   AsyncAction<WebResponse> response = req.GetResponseAsync();
   yield return response;
 
   // copy results to memory stream
   Stream responseStream = response.Result.GetResponseStream();
   MemoryStream copyTo = new MemoryStream();
   yield return new Serial(url, true, responseStream.CopyAsync(copyTo));
 
   // return results
   copyTo.Seek(0, SeekOrigin.Begin);
   string html = new StreamReader(copyTo).ReadToEnd();
   yield return new Return<string>(html);
}

Note how we are using the new extension methods to simplify our code.

In the last part of this series I’ll show how to implement asynchronous operations in a Windows Forms application that will:

  • Allow to cancel the operation.
  • Report the progress of internal actions in the GUI.
  • Show how the ThreadPool threads are using efficiently.
  • Show how exceptions are handled by the engine.

Stay tuned.

Streamlining .NET Asynchronous Operations - Part I

Roger Torres - February 8th, 2009
Comments

I would like to get some feedback from the community about how they feel when writing asynchronous applications in C#. In the past, I have scheduled work items to the thread pool and implemented basic operations around the BackgroundWorker class like everyone else… but this month, when dealing with a “massive” number of asynchronous operations in my code, I questioned for a moment the benefits of this path over the more inefficient by cleaner synchronous ways. It’s not just that the existing .NET asynchronous patterns are not trivial, but the complexity of the resulting code when multiple asynchronous actions must be executed in a predetermined sequence is unacceptable.

I understand that imperative languages are not specifically designed to deal with these scenarios, but I learned from a group of bloggers (references can be found in my previous post) that C# has been capable of some cool functional stuff for a while, and with the introduction of lambda expressions and extension methods in version 3.0, the syntax is even cleaner.

This weekend I was finally able to streamline my code, encapsulating many of my original issues in a tool that I’m going to present in details here.

Asynchronous Operation

In short, an asynchronous operation is some code that you want to execute without blocking the calling thread. The operation will generally complete within an expected time frame, notifying the calling thread with the results (including exceptions found in the process). In order to use your computing resources at a maximum, it’s a good practice to break a process into multiple independent asynchronous tasks that can be executed in parallel, but it’s also common to require a number of asynchronous operations to execute in a predetermined sequence in order to complete a subprocess. These compositional properties are the key driver of my API, leaving all the thread scheduling and synchronization details to an internal runtime engine fueled by recursive C# generators.

The traditional patterns

We usually want to go “async” when dealing with large files, interfacing with remote computers, or querying a database… and the .NET Framework does a great job providing the basic APIs in the following two flavors:

  • Asynchronous operations that use IAsyncResult objects.
  • Asynchronous operations that use events.

We can find asynchronous methods to operate with files, streams, sockets, or web services among others. What the .NET framework doesn’t provide is a simple mechanism to glue operations together (although Microsoft has been working for a while on a special API dedicated to parallel programming and concurrent operations).

Let’s illustrate my problem with a simple but common scenario. I’m going to perform a web request to a given URL and write the results to a file. These are two sequential asynchronous actions that must be synchronized in order to achieve the desired results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void Download(string url, string path) {
  ManualResetEvent bufferInUse = new ManualResetEvent(false);
  WebRequest r = HttpWebRequest.Create(url);
  r.Method = "GET";
  r.BeginGetResponse(delegate(IAsyncResult result1) {
     HttpWebResponse response = (HttpWebResponse)r.EndGetResponse(result1);
     using(FileStream fs = File.Create(path)) {
         byte[] buffer = new byte[1024];
         int offset = 0;
         bool done = false;
         while(!done) {
             response.GetResponseStream().BeginRead(buffer, 0, buffer.Length, delegate(IAsyncResult result2) {
                 int count = response.GetResponseStream().EndRead(result2);
                 if(count > 0) {
                    fs.BeginWrite(buffer, 0, count, delegate(IAsyncResult result3) {
                    fs.EndWrite(result3);
                    bufferInUse.Set();
                }, null);
                offset += count;
             }
             else
                done = true;
             }, null);
             bufferInUse.WaitOne();
             bufferInUse.Reset();
          };
          fs.Close();
       }
    }, null);
 }
}

Note how detached these operations look in our code, and this is just a trivial example. We are even using anonymous delegates to make the code more compact, but it’s clear that we cannot sustain this pattern for too long if more operations are required in the sequence. There are also synchronization issues we have to consider, like locking the buffer to avoid concurrent reading and writing operations… and we are not taking care of exception handling, cancellations, and progress reporting yet.

It would be really nice if we could write synchronous-like code but the actions still execute asynchronously as below:

1
2
3
4
5
6
7
8
9
10
11
12
 static void Download(string url, string path) {
   WebRequest req = HttpWebRequest.Create(url);
   WebResponse response = AsyncAction(url, req.BeginGetResponse, req.EndGetResponse);
   Stream rs = response.Result.GetResponseStream();
   FileStream fs = File.Create(path);
   int count = 1;
   byte[] buffer = new byte[1024];
   while(count > 0) {
       count = AsyncAction(rs.BeginRead(buffer, 0, buffer.Length), rs.EndRead);
       AsyncAction(fs.BeginWrite(buffer, 0, count, fs.EndWrite);
   }
}

Well, it’s possible if we follow the syntax required by C# generators, yielding partial results wrapped inside a set of classes provided by our API. This is a small price to pay considering that our code will now look cleaner:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 static IEnumerable<AsyncAction> Download(string url, string path) {
   WebRequest req = HttpWebRequest.Create(url);
 
   AsyncAction<WebResponse> response = new AsyncAction<WebResponse>(url, req.BeginGetResponse, req.EndGetResponse);
   yield return response;
   Stream rs = response.Result.GetResponseStream();
 
   FileStream fs = File.Create(path);
   int count = 1;
   byte[] buffer = new byte[1024];
   while(count > 0) {
       AsyncAction<int> a_count = new AsyncAction<int>("ReadAsync", (callback, state) => 
           rs.BeginRead(buffer, 0, buffer.Length, callback, state), rs.EndRead);
       yield return a_count;
 
       AsyncAction<object> state = new AsyncAction<object>("WriteAsync", (callback, state) => 
           fs.BeginWrite(buffer, 0, a_count.Result, callback, state), 
           (asyncResult) => { fs.EndWrite(asyncResult); return asyncResult.AsyncState; });
       yield return state;
 
       count = a_count.Result;
   }
}

In the next post we will see how the actions in this example can be simplified with extensions methods. The important thing to note is that now it’s a lot easier to add more operations to the sequence.

To invoke our new generator method, we need to introduce another class (Serial) to wrap sequential operations, and a context to provide the following familiar functionality:

  • Completed and ProgressChanged events.
  • Cancellation with Cancelled flag returned by Completed event.
  • Exception handling with Error returned by Completed event.
  • Tracing events for debugging.
  • Events posted to the current synchronization context (so we can modify controls in the UI thread).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void DownloadUrl(string url, string path) {
   AsyncContext context = new AsyncContext();
   context.Completed += new EventHandler<AsyncCompletedEventArgs>(Completed);
 
   Serial op = new Serial("test", true, Download(url, path));
   op.Start(context);
}
 
private void Completed(object sender, AsyncCompletedEventArgs args) {
   if (args.Cancelled)
      Console.WriteLine("CONTEXT WAS CANCELLED.");
 
   if (args.Error != null)
      foreach (Exception ex in ((AsyncContextException)args.Error).Errors)
         DisplayError(ex);
 
   Console.WriteLine("CONTEXT COMPLETED.");
}

That’s all for now. It’s past midnight and I have work tomorrow ;-)

In my next post I’ll describe in detail the entire API, introducing the remaining pieces - parallel operations, progress reporting, tracing and cancellations.

Stay tuned.

CoNatural.Threading.dll?

Roger Torres - February 5th, 2009
Comments

Three weeks ago I started working on an email archiving system based on Microsoft Exchange Server 2003. After researching a bit about all the possible ways to interface with Exchange, I decided to try the .NET System.DirectoryServices component to query Active Directory (via LDAP) for users/mailboxes, and complement with System.Net web requests (via WebDAV) to manage exchange items (folders, mail, contacts, etc).

After a few hours struggling with the .NET asynchronous programming model, I started “googling” again for options. The problem was that I was trying to invoke multiple asynchronous operations in parallel to achieve maximum performance, but the resulting code was getting too disorganized and hard to follow. All I needed was to find a way to continue experiencing the natural sequential feeling of my imperative code without sacrificing parallelism. The ideal solution would start background threads from the thread pool automatically when required, dealing with GUI synchronization context issues as well.

I found a group of very interesting articles, all showing various ways to solve my problem by leveraging the new functional language features of C# 3.0. With a combination of lambda expressions, closures, and a hack to simulate (more or less) coroutines via C# generators (or iterators), I was able to see the light at the end of the tunnel.

Here are some links to the best articles I found:

http://msdn.microsoft.com/en-us/magazine/cc163323.aspx

http://blogs.msdn.com/michen/archive/2006/03/30/564671.aspx

http://tomasp.net/blog/csharp-async.aspx

http://msmvps.com/blogs/mihailik/archive/2005/12/26/79813.aspx

http://tirania.org/blog/archive/2003/Apr-22.html

After reading all of them (and sorting many of these brilliant ideas out), I felt compelled to write a tool that was more accessible to .NET developers, a tool that will eventually become handy to complete my mail archiving application.

Last night I finally settled for a pattern that seemed well balanced in terms of complexity and functionality. I still need to implement progress reporting and completion/cancellation events though, but I will start this series describing my line of thinking in parallel with the final coding touches ;-)

I will name and encapsulate this component inside CoNatural.Threading.dll for a change.

Stay tuned.

CoNatural.Data Source Code

Roger Torres - February 4th, 2009
Comments

You can download the [Download not found] here.

This code will compile with Visual Studio 2008. You will have to make a few modifications to compile it with VS2005 (all minor).

I’m also releasing the Visual Studio Wizard project that I described in previous posts. You can modify the template according to your needs. [Download not found]

I hope these components and/or the ideas behind them will make your coding life easier!

In the near future I’m planning to start a new couple of series about parallel programming and ASP.NET MVC.

Stay tuned.