I have moved!

I've moved my blog
CLICK HERE

Saturday, 6 December 2008

More on Jeffrey Richter’s AsyncEnumerator and Functional Programming

If you do asynchronous programming (or have been put off it in the past by the complexity) and you haven’t already looked at this, then you really should:

http://msdn.microsoft.com/en-us/magazine/cc546608.aspx

I blogged yesterday about an idea for doing the same kind of thing but using lambdas to encapsulate the last remaining bit of complexity. Today I’ve applied the same idea, but with the goal of providing a thin layer that works along with Jeffrey Richter’s AsyncEnumerator class, essentially providing an optional new way of working with it. As I am not especially familiar with the asynchronous APIs, it would be especially stupid for me to try and reinvent the wheel instead of building on the work Jeffrey has already done.

I should point out that everything in this article is provided “as is”, but even more so than people usually mean by that phrase, because I haven’t even tried running any of this code (although it does compile, at least). I’ve merely attempted to analytically prove (by substitution and expansion) that it produces the equivalent of something that works.

These are just ideas. I don’t currently have a need for this facility in my own work, so I can’t afford to invest in testing it. I just got carried away with the implications of an idea today, and here are the results of that.

I’ll start by explaining the nuts and bolts. Using pure functional programming techniques, I’ve defined a handful of static functions, mostly extension methods on pre-existing types, making them very easy to discover and use. To begin with, I’ll leave out a detail to do with exception handling, and then add it in as an afterthought, as it makes things a little messier.

Jeffrey’s AsyncEnumerator class is effectively a consumer of a sequence of integers. The sequence is generated by a function that the user writes, taking advantage of the yield return keyword. They yield a count of 1 for each asynchronous operation that they start:

stream.BeginWrite(outputData, 0, outputData.Length, ae.End(), null);
yield return 1;
stream.EndWrite(ae.DequeueAsyncResult());

In the above snippet (taken from Jeffrey’s TcpClient example), a single write is made. Characteristically, there are three steps:

  • Call a BeginXXX API to launch the asynchronous operation. Such operations always require a callback function, which is provided by calling AsyncEnumerator.End.
  • Yield the value 1 (because only one operation has been launched in this case).
  • Call an EndXXX API to finish the operation. The AsyncEnumerator.DequeueAsyncResult is used to obtain some data associated with results of the operation.

The integers so yielded are interpreted as requests to wait for that number of asynchronous operations to complete. So to do three operations in parallel, you would begin three asynchronous calls, and then yield return 3. When your function got back control, the three calls would all have completed. However, they may of course complete in any order, so when the results are dequeued some care may need to be taken in matching the results up with their corresponding EndXXX functions, because they may be of different types (e.g. a stream operation and a WebRequest).

Using Linq’s Select function, we can easily convert between sequence types. This means that we can provide a way for the user to write a function that yields a sequence of some other kind, such that it can still be plugged into AsyncEnumerator.

The elements of the new kind of sequence are called activities. Although most user need not be aware of this, they are in fact functions, defined by this delegate:

public delegate int Activity(AsyncEnumerator asyncEnum);

An activity launches one or more asynchronous operations, and returns how many it has launched, making it perfect to fit AsyncEnumerator’s requirements. So we can easily convert a sequence of these activities into the kind of sequence that AsyncEnumerator likes to get:

public static void Execute(this AsyncEnumerator asyncEnum, 
                                   IEnumerable<Activity> activities)
{
    asyncEnum.Execute(activities.Select(activity => activity(asyncEnum))
                                .GetEnumerator());
}

In other words, we simply produce a sequence of integers by calling each activity in the input sequence. Then as a final step, get the IEnumerator<int> of the sequence and pass it to the intrinsic AsyncEnumerator.Execute function. By providing this as an extension method, it’s as convenient to use as its intrinsic cousin.

How can we compose a set of activities to make a single activity that causes all the activities in the set to execute in parallel? By defining a higher-order function that does it for us:

public static Activity Parallel(params Activity[] list)
{
    return asyncEnum => list.Sum(activity => activity(asyncEnum));
}

Again, the implementation is trivial thanks to Linq. Our composite activity has to return the sum of the return values of the activities on the input list, so it’s the perfect fit for the Sum function, which adds together the values produced by a function executed for each item in a sequence.

Finally, we need a higher-order function to allow us to easily define an activity. This one’s a little more mind-bending, but still pretty short. Crucially, it is much easier to use than it is to fully understand, and even then most users do not need to use it directly.

public static Activity Pair(Action<AsyncCallback> begin, AsyncCallback end)
{
    return asyncEnum => // action accepts an AsyncEnumerator 
    {
        begin(result => // callback accepts a result
           {
               end(result); // pass on to the user's handler
               asyncEnum.End()(result); // and to AsyncEnumerator
               asyncEnum.DequeueAsyncResult(); // we don't need this
           });
 
        return 1;
    };
}

It pairs together two functions. The first one, begin, is responsible for launching the asynchronous call. To do this, it needs a callback function that it can pass to whatever asynchronous API it calls. The second one, end, will run when the call completes, and (as you will see if you look up the definition of the standard AsyncCallback delegate) accepts an IAsyncResult object.

So the Activity function returned by Pair will call the begin function, and pass it a function to serve as the callback for the asynchronous API. That function is defined by the inner lambda. It calls onto three things:

  • Firstly, the end function (whatever that may be) so that the result of the call can be interpreted.
  • Secondly, the callback supplied by AsyncEnumerator. This is crucial, as it ensures that the thread pool is asked to execute another step through of the sequence of integers.
  • Finally, the AsyncEnumerator.DequeAsyncResult function, although the result is discarded. This is because we have already passed the result to the end function. But we should still call this function once for each call that is made, so that the queue doesn’t needlessly grow in length.

That’s all we need as a basis. But for maximum convenience for most users, we can add extension methods for the most commonly used asynchronous call types. For example, writing to a stream:

public static Activity RequestWrite(this Stream stream, byte[] buffer, 
                                    int offset, int size)
{
    return Pair(
        callback => stream.BeginWrite(buffer, offset, size, callback, null),
        result => stream.EndWrite(result));
}

This is a higher order function – it makes an Activity function, using the handy Pair function to tie together the two halves of the operation. But it can be called directly on a Stream object. If so called, it does not actually do anything; the return value must be passed on via yield return.

So how does code look using this whole technique? Let’s look back at that real example again. The function starts with this signature:

private static IEnumerator<Int32> Process(AsyncEnumerator ae, 
                                          String server, 
                                          String message) 
{

Then in the body of the function there are triplets of lines like this:

stream.BeginWrite(outputData, 0, outputData.Length, ae.End(), null);
yield return 1;
stream.EndWrite(ae.DequeueAsyncResult());

By contrast, the alternative way of working enabled by these new functions begins by declaring the function like so:

private static IEnumerable<Activity> Process(String server, 
                                             String message)
{

The first thing to note (aside from the obvious change to the return type) is that there is no need to pass in an AsyncEnumerator object, even though it interoperates with one automatically. The code to write to the stream looks like this:

yield return stream.RequestWrite(outputData, 0, outputData.Length);

The triplet of lines has been boiled down to a single line, which appears just like a non-asynchronous call but with yield return in front of it.

Because the helper extensions are higher order functions, the user doesn’t need to even be aware that they’re using functional programming, or that an Activity is a function. They just make a method call that appears to do what they want, although they prefix it with yield return.

What about parallel activities running at the same time? Loosely inspired by another of Jeffrey’s examples, this function obtains results from two website simultaneously:

private static IEnumerable<Activity> ProcessAllAndEachOps()
{
   yield return Async.Parallel(
 
            WebRequest.Create("http://www.google.com").RequestResponse(true, response =>
                Console.WriteLine("Bytes from Google: " + response.ContentLength)),
 
            WebRequest.Create("http://www.microsoft.com").RequestResponse(true, response =>
                Console.WriteLine("Bytes from Microsoft" + response.ContentLength))
       );
 
   Console.WriteLine("All the operations completed.");
}

The Parallel function we saw defined above takes care of composing a set of activities into a new activity that will launch all of them to run in parallel.

The end result appears so simple that it is worth going through an exercise of “expansion” to see how this alternative way of working must produce the same results as the original way. Hold onto your hats…

Here’s the nice and easy way it looks to the typical user:

yield return stream.RequestWrite(outputData, 0, outputData.Length);

Let’s substitute in the definition of RequestWrite:

yield return Async.Pair(
    callback => stream.BeginWrite(outputData, 0, outputData.Length, callback, null),
    result => stream.EndWrite(result));

Then the definition of Pair, in two stages to make it easy to follow. Firstly, keeping the definitions of begin and end separated out:

yield return asyncEnum => // action accepts an AsyncEnumerator 
{
    Action<AsyncCallback> begin =
              callback => stream.BeginWrite(outputData, 0, 
                            outputData.Length, callback, null);
 
    AsyncCallback end = result => stream.EndWrite(result);
    
    begin(result => // callback accepts a result
    {
        end(result); // pass on to the user's handler
        asyncEnum.End()(result); // and to AsyncEnumerator
        asyncEnum.DequeueAsyncResult(); // we don't need this
    });
    
    return 1;
};

Then finally substituting those definitions into their places:

yield return asyncEnum => // action accepts an AsyncEnumerator 
{
    stream.BeginWrite(outputData, 0, outputData.Length, 
        result => // callback accepts a result
        {
          stream.EndWrite(result); // pass on to the user's handler
          asyncEnum.End()(result); // and to AsyncEnumerator
          asyncEnum.DequeueAsyncResult(); // we don't need this
        }, null);
 
    return 1;
};

Now we can see what’s really happening. The statement yields a function that calls BeginWrite on the stream and then returns 1. BeginWrite requires a callback, and we build one for it. In the normal way of using AsyncEnumerator as seen in Jeffrey’s example, the callback is provided by calling AsyncEnumerator.End, but here we effectively wrap that in a lambda so we can do other things as well. First we call EndWrite on the stream (which may throw an exception, but I’ll deal with that in a moment), then we call on to the callback returned by AsyncEnumerator.End, and finally we do the housekeeping of discarding one IAsyncResult instance from AsyncEnumerator’s internal inbox.

Then this whole function will be executed by the Execute extension method. Some more substitution will make this clearer. Here’s the expression we use to generate the kind of IEnumerable<int> that the intrinsic Execute method requires:

activities.Select(activity => activity(asyncEnum))

So each activity is called in turn to produce integers, and the result is the same as if the return value of the activity was being yielded directly. In other words, continuing our expansion (now supposing we are in a function that is yielding integers having been passed the parameter asyncEnum):

stream.BeginWrite(outputData, 0, outputData.Length,
    result => // callback accepts a result
    {
        stream.EndWrite(result); // pass on to the user's handler
        asyncEnum.End()(result); // and to AsyncEnumerator
        asyncEnum.DequeueAsyncResult(); // we don't need this
    }, null);
 
yield return 1;

It is now clear what the only actual differences are. Compare with Jeffrey’s original code (I’ll repeat it here one more time to save you scrolling up and down the page).

stream.BeginWrite(outputData, 0, outputData.Length, ae.End(), null);
yield return 1;
stream.EndWrite(ae.DequeueAsyncResult());

There, the End() callback causes the function to resume executing after the operation finishes, and the next line of code is the call to stream.EndWrite. Working the new way, stream.EndWrite is called from within the callback itself. It also has the correct IAsyncResult ready to use.

Also, immediately after calling the function returned by End(), the result is removed from the inbox by calling DequeAsyncResult. This places an additional thread-safety constraint on AsyncEnumerator, because previously it looks like the inbox is only accessed by one thread at a time. But by looking at the code of AsyncEnumerator in Reflector, I can see that it takes out a lock before accessing the queue, so this should be fine.

So what are the limitations of this new approach? The main one is the lack of flexibility in exception handling. With the original raw approach, you are free to place a try/catch around either the BeginXXX or EndXXX calls (although you can’t place one such handler around both calls, due to a limitation of the yield return implementation.)

In the new approach, the best we can do is to allow any exceptions to propagate out of the IEnumerator<int> sequence generator. In other words, if the BeginXXX or EndXXX calls throw an exception, then it is just as if they were to throw uncaught exceptions in the original approach.

To achieve this, we need to make a further change. The reason is that the EndXXX call is made in the context of a thread that is running due to the fact that an asynchronous call has completed. We do not control that thread. Rather than bothering it with an exception, we need to transfer the exception into the context of whichever thread is executing our function.

To achieve this, first we have to catch the exception. The user’s callback is executed in the inner lambda in Pair, so that’s where we need a try/catch block:

public static Activity Pair(Action<AsyncCallback> begin, AsyncCallback end)
{
    return asyncEnum => // action accepts an AsyncEnumerator 
    {
        begin(result => // callback accepts a result
        {
            try
            {
                end(result); // pass on to the user's handler
            }
            catch (Exception e)
            {
                asyncEnum.Cancel(e);
            }
            asyncEnum.End()(result); // and to AsyncEnumerator
            asyncEnum.DequeueAsyncResult(); // we don't need this
        });
 
        return 1;
    };
}

Note that we make use of a handy feature of AsyncEnumerator where we stash the exception inside it by calling Cancel. We then continue as normal, calling on to AsyncEnumerator’s callback, which means that the iteration will resume. What does this mean?

It means that when iteration resumes, the first thing we need to check for the cancellation, retrieve the exception and throw it. Unfortunately, this means we need to mess up our nice simple Execute function. If you refer to where I defined it above, you’ll see that I originally used Select to do all the looping and yielding. But this doesn’t allow us to perform the exception check immediately after resuming from the yield return. We need to write out the equivalent of Select in “long hand” so we can insert the extra code:

public static void Execute(this AsyncEnumerator asyncEnum, 
                           IEnumerable<Activity> activities)
{
    asyncEnum.Execute(AdaptEnumerator(asyncEnum, activities));
}
 
private static IEnumerator<int> AdaptEnumerator(
       AsyncEnumerator asyncEnum, Enumerable<Activity> activities)
{
    foreach (Activity activity in activities)
    {
        yield return activity(asyncEnum);
 
        object x;
        if (asyncEnum.IsCanceled(out x) && (x is Exception))
            throw (Exception)x;
    }
}

The private AdaptEnumerator function serves as the equivalent of Select, except that it checks for an exception to throw immediately following resumption after the yield return. This means that after the asynchronous action completes with an error, no further code executes in the underlying function that yields the Activities.

Like I say, I have only written this and done the above analysis on it, not really tested much. If you want to give it a try, be my guest:

http://www.earwicker.com/downloads/async.zip

Please let me know if you find any ridiculous bugs or design flaws, or even if it works. It’s just a source file containing all the functions (just 150 or so lines of code), so it could be added to any project already using AsyncEnumerator, or built into a separate class library.

4 comments:

Charles Prakash Dasari said...

This article is way too cool... I use Jeffrey's AE all over the place and I am having trouble beautifying my exception handling around Begin/End methods. I like the idea of the pair ... Great ideas, thanks for that.

Daniel said...

Glad you found it interesting. I should repeat the same warnings: I still haven't found an excuse to use this stuff at work, so it remains totally untested, and almost certainly contains bugs, or mistaken assumptions about the right way to use various features of AsyncEnumerator. That said, I would be very interested to know how you get on with it.

Charles Prakash Dasari said...

I haven't yet fully incorporated this technique in my code. However, I am thinking about ProcessAllAndEachOps -- in Jeffrey's example we can see that when we launch all the operations in parallel, we can still keep yielding a return value of 1 (after actually all the begins are executed) until the count of the begins and process the ends as they complete. That was very helpful to let the thread pool maximize the use of the CPU - instead of waiting for all the begins to complete.

I have been mulling around how to accomplish the same with your suggested ideads in this article. Do you have any ideas off the top of your head?

Daniel said...

If you look at my take on ProcessAllAndEachOps, each of those web requests has a lambda that writes to the console when the request is done. Those lambdas execute as soon as the operation completes, so you don't need to do anything else.

However, this is almost certainly a very bad weakness in my implementation!

It means that the lambdas may execute in parallel with each other, which is bad. The good thing about AsyncEnumerator is that it follows a really simple pattern that ensures only one piece of your code is executing at a time.

It would probably be easier to implement that properly here without using AsyncEnumerator as the basis, as there will be little left for it to take care of and it's not open source so it's hard to second-guess the precise semantics of it.

You just need a thread-safe queue of end-handler delegates that need executing, and whenever an async operation completes, you check a bool flag called IsRunning. If it's false, set it true and then loop through that queue executing them until there are none left, and then goes back into the IEnumerator to execute the next step. Else if IsRunning is false, that loop is already running in another thread, so you just push the new end-handler onto the queue so the other thread will run it soon. Need to lock access to the IsRunning and the queue very carefully. All this could be written in one (not too long) method, wouldn't need to be a class.

If I have some time to look at this, I'll write an implementation, but I may not!