Streamlining .NET Asynchronous Operations - Part I

Roger Torres - February 8th, 2009

I would like to get some feedback from the community about how they feel when writing asynchronous applications in C#. In the past, I have scheduled work items to the thread pool and implemented basic operations around the BackgroundWorker class like everyone else… but this month, when dealing with a “massive” number of asynchronous operations in my code, I questioned for a moment the benefits of this path over the more inefficient by cleaner synchronous ways. It’s not just that the existing .NET asynchronous patterns are not trivial, but the complexity of the resulting code when multiple asynchronous actions must be executed in a predetermined sequence is unacceptable.

I understand that imperative languages are not specifically designed to deal with these scenarios, but I learned from a group of bloggers (references can be found in my previous post) that C# has been capable of some cool functional stuff for a while, and with the introduction of lambda expressions and extension methods in version 3.0, the syntax is even cleaner.

This weekend I was finally able to streamline my code, encapsulating many of my original issues in a tool that I’m going to present in details here.

Asynchronous Operation

In short, an asynchronous operation is some code that you want to execute without blocking the calling thread. The operation will generally complete within an expected time frame, notifying the calling thread with the results (including exceptions found in the process). In order to use your computing resources at a maximum, it’s a good practice to break a process into multiple independent asynchronous tasks that can be executed in parallel, but it’s also common to require a number of asynchronous operations to execute in a predetermined sequence in order to complete a subprocess. These compositional properties are the key driver of my API, leaving all the thread scheduling and synchronization details to an internal runtime engine fueled by recursive C# generators.

The traditional patterns

We usually want to go “async” when dealing with large files, interfacing with remote computers, or querying a database… and the .NET Framework does a great job providing the basic APIs in the following two flavors:

  • Asynchronous operations that use IAsyncResult objects.
  • Asynchronous operations that use events.

We can find asynchronous methods to operate with files, streams, sockets, or web services among others. What the .NET framework doesn’t provide is a simple mechanism to glue operations together (although Microsoft has been working for a while on a special API dedicated to parallel programming and concurrent operations).

Let’s illustrate my problem with a simple but common scenario. I’m going to perform a web request to a given URL and write the results to a file. These are two sequential asynchronous actions that must be synchronized in order to achieve the desired results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void Download(string url, string path) {
  ManualResetEvent bufferInUse = new ManualResetEvent(false);
  WebRequest r = HttpWebRequest.Create(url);
  r.Method = "GET";
  r.BeginGetResponse(delegate(IAsyncResult result1) {
     HttpWebResponse response = (HttpWebResponse)r.EndGetResponse(result1);
     using(FileStream fs = File.Create(path)) {
         byte[] buffer = new byte[1024];
         int offset = 0;
         bool done = false;
         while(!done) {
             response.GetResponseStream().BeginRead(buffer, 0, buffer.Length, delegate(IAsyncResult result2) {
                 int count = response.GetResponseStream().EndRead(result2);
                 if(count > 0) {
                    fs.BeginWrite(buffer, 0, count, delegate(IAsyncResult result3) {
                    fs.EndWrite(result3);
                    bufferInUse.Set();
                }, null);
                offset += count;
             }
             else
                done = true;
             }, null);
             bufferInUse.WaitOne();
             bufferInUse.Reset();
          };
          fs.Close();
       }
    }, null);
 }
}

Note how detached these operations look in our code, and this is just a trivial example. We are even using anonymous delegates to make the code more compact, but it’s clear that we cannot sustain this pattern for too long if more operations are required in the sequence. There are also synchronization issues we have to consider, like locking the buffer to avoid concurrent reading and writing operations… and we are not taking care of exception handling, cancellations, and progress reporting yet.

It would be really nice if we could write synchronous-like code but the actions still execute asynchronously as below:

1
2
3
4
5
6
7
8
9
10
11
12
 static void Download(string url, string path) {
   WebRequest req = HttpWebRequest.Create(url);
   WebResponse response = AsyncAction(url, req.BeginGetResponse, req.EndGetResponse);
   Stream rs = response.Result.GetResponseStream();
   FileStream fs = File.Create(path);
   int count = 1;
   byte[] buffer = new byte[1024];
   while(count > 0) {
       count = AsyncAction(rs.BeginRead(buffer, 0, buffer.Length), rs.EndRead);
       AsyncAction(fs.BeginWrite(buffer, 0, count, fs.EndWrite);
   }
}

Well, it’s possible if we follow the syntax required by C# generators, yielding partial results wrapped inside a set of classes provided by our API. This is a small price to pay considering that our code will now look cleaner:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 static IEnumerable<AsyncAction> Download(string url, string path) {
   WebRequest req = HttpWebRequest.Create(url);
 
   AsyncAction<WebResponse> response = new AsyncAction<WebResponse>(url, req.BeginGetResponse, req.EndGetResponse);
   yield return response;
   Stream rs = response.Result.GetResponseStream();
 
   FileStream fs = File.Create(path);
   int count = 1;
   byte[] buffer = new byte[1024];
   while(count > 0) {
       AsyncAction<int> a_count = new AsyncAction<int>("ReadAsync", (callback, state) => 
           rs.BeginRead(buffer, 0, buffer.Length, callback, state), rs.EndRead);
       yield return a_count;
 
       AsyncAction<object> state = new AsyncAction<object>("WriteAsync", (callback, state) => 
           fs.BeginWrite(buffer, 0, a_count.Result, callback, state), 
           (asyncResult) => { fs.EndWrite(asyncResult); return asyncResult.AsyncState; });
       yield return state;
 
       count = a_count.Result;
   }
}

In the next post we will see how the actions in this example can be simplified with extensions methods. The important thing to note is that now it’s a lot easier to add more operations to the sequence.

To invoke our new generator method, we need to introduce another class (Serial) to wrap sequential operations, and a context to provide the following familiar functionality:

  • Completed and ProgressChanged events.
  • Cancellation with Cancelled flag returned by Completed event.
  • Exception handling with Error returned by Completed event.
  • Tracing events for debugging.
  • Events posted to the current synchronization context (so we can modify controls in the UI thread).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void DownloadUrl(string url, string path) {
   AsyncContext context = new AsyncContext();
   context.Completed += new EventHandler<AsyncCompletedEventArgs>(Completed);
 
   Serial op = new Serial("test", true, Download(url, path));
   op.Start(context);
}
 
private void Completed(object sender, AsyncCompletedEventArgs args) {
   if (args.Cancelled)
      Console.WriteLine("CONTEXT WAS CANCELLED.");
 
   if (args.Error != null)
      foreach (Exception ex in ((AsyncContextException)args.Error).Errors)
         DisplayError(ex);
 
   Console.WriteLine("CONTEXT COMPLETED.");
}

That’s all for now. It’s past midnight and I have work tomorrow ;-)

In my next post I’ll describe in detail the entire API, introducing the remaining pieces - parallel operations, progress reporting, tracing and cancellations.

Stay tuned.

Leave a comment