If you do asynchronous programming (or have been put off it in the past by the complexity) and you haven’t already looked at this, then you really should:
I blogged yesterday about an idea for doing the same kind of thing but using lambdas to encapsulate the last remaining bit of complexity. Today I’ve applied the same idea, but with the goal of providing a thin layer that works along with Jeffrey Richter’s AsyncEnumerator class, essentially providing an optional new way of working with it. As I am not especially familiar with the asynchronous APIs, it would be especially stupid for me to try and reinvent the wheel instead of building on the work Jeffrey has already done.
I should point out that everything in this article is provided “as is”, but even more so than people usually mean by that phrase, because I haven’t even tried running any of this code (although it does compile, at least). I’ve merely attempted to analytically prove (by substitution and expansion) that it produces the equivalent of something that works.
These are just ideas. I don’t currently have a need for this facility in my own work, so I can’t afford to invest in testing it. I just got carried away with the implications of an idea today, and here are the results of that.
I’ll start by explaining the nuts and bolts. Using pure functional programming techniques, I’ve defined a handful of static functions, mostly extension methods on pre-existing types, making them very easy to discover and use. To begin with, I’ll leave out a detail to do with exception handling, and then add it in as an afterthought, as it makes things a little messier.
Jeffrey’s AsyncEnumerator class is effectively a consumer of a sequence of integers. The sequence is generated by a function that the user writes, taking advantage of the yield return keyword. They yield a count of 1 for each asynchronous operation that they start:
In the above snippet (taken from Jeffrey’s TcpClient example), a single write is made. Characteristically, there are three steps:
- Call a BeginXXX API to launch the asynchronous operation. Such operations always require a callback function, which is provided by calling AsyncEnumerator.End.
- Yield the value 1 (because only one operation has been launched in this case).
- Call an EndXXX API to finish the operation. The AsyncEnumerator.DequeueAsyncResult is used to obtain some data associated with results of the operation.
The integers so yielded are interpreted as requests to wait for that number of asynchronous operations to complete. So to do three operations in parallel, you would begin three asynchronous calls, and then yield return 3. When your function got back control, the three calls would all have completed. However, they may of course complete in any order, so when the results are dequeued some care may need to be taken in matching the results up with their corresponding EndXXX functions, because they may be of different types (e.g. a stream operation and a WebRequest).
Using Linq’s Select function, we can easily convert between sequence types. This means that we can provide a way for the user to write a function that yields a sequence of some other kind, such that it can still be plugged into AsyncEnumerator.
The elements of the new kind of sequence are called activities. Although most user need not be aware of this, they are in fact functions, defined by this delegate:
An activity launches one or more asynchronous operations, and returns how many it has launched, making it perfect to fit AsyncEnumerator’s requirements. So we can easily convert a sequence of these activities into the kind of sequence that AsyncEnumerator likes to get:
In other words, we simply produce a sequence of integers by calling each activity in the input sequence. Then as a final step, get the IEnumerator<int> of the sequence and pass it to the intrinsic AsyncEnumerator.Execute function. By providing this as an extension method, it’s as convenient to use as its intrinsic cousin.
How can we compose a set of activities to make a single activity that causes all the activities in the set to execute in parallel? By defining a higher-order function that does it for us:
Again, the implementation is trivial thanks to Linq. Our composite activity has to return the sum of the return values of the activities on the input list, so it’s the perfect fit for the Sum function, which adds together the values produced by a function executed for each item in a sequence.
Finally, we need a higher-order function to allow us to easily define an activity. This one’s a little more mind-bending, but still pretty short. Crucially, it is much easier to use than it is to fully understand, and even then most users do not need to use it directly.
It pairs together two functions. The first one, begin, is responsible for launching the asynchronous call. To do this, it needs a callback function that it can pass to whatever asynchronous API it calls. The second one, end, will run when the call completes, and (as you will see if you look up the definition of the standard AsyncCallback delegate) accepts an IAsyncResult object.
So the Activity function returned by Pair will call the begin function, and pass it a function to serve as the callback for the asynchronous API. That function is defined by the inner lambda. It calls onto three things:
- Firstly, the end function (whatever that may be) so that the result of the call can be interpreted.
- Secondly, the callback supplied by AsyncEnumerator. This is crucial, as it ensures that the thread pool is asked to execute another step through of the sequence of integers.
- Finally, the AsyncEnumerator.DequeAsyncResult function, although the result is discarded. This is because we have already passed the result to the end function. But we should still call this function once for each call that is made, so that the queue doesn’t needlessly grow in length.
That’s all we need as a basis. But for maximum convenience for most users, we can add extension methods for the most commonly used asynchronous call types. For example, writing to a stream:
This is a higher order function – it makes an Activity function, using the handy Pair function to tie together the two halves of the operation. But it can be called directly on a Stream object. If so called, it does not actually do anything; the return value must be passed on via yield return.
So how does code look using this whole technique? Let’s look back at that real example again. The function starts with this signature:
Then in the body of the function there are triplets of lines like this:
By contrast, the alternative way of working enabled by these new functions begins by declaring the function like so:
The first thing to note (aside from the obvious change to the return type) is that there is no need to pass in an AsyncEnumerator object, even though it interoperates with one automatically. The code to write to the stream looks like this:
The triplet of lines has been boiled down to a single line, which appears just like a non-asynchronous call but with yield return in front of it.
Because the helper extensions are higher order functions, the user doesn’t need to even be aware that they’re using functional programming, or that an Activity is a function. They just make a method call that appears to do what they want, although they prefix it with yield return.
What about parallel activities running at the same time? Loosely inspired by another of Jeffrey’s examples, this function obtains results from two website simultaneously:
The Parallel function we saw defined above takes care of composing a set of activities into a new activity that will launch all of them to run in parallel.
The end result appears so simple that it is worth going through an exercise of “expansion” to see how this alternative way of working must produce the same results as the original way. Hold onto your hats…
Here’s the nice and easy way it looks to the typical user:
Let’s substitute in the definition of RequestWrite:
Then the definition of Pair, in two stages to make it easy to follow. Firstly, keeping the definitions of begin and end separated out:
Then finally substituting those definitions into their places:
Now we can see what’s really happening. The statement yields a function that calls BeginWrite on the stream and then returns 1. BeginWrite requires a callback, and we build one for it. In the normal way of using AsyncEnumerator as seen in Jeffrey’s example, the callback is provided by calling AsyncEnumerator.End, but here we effectively wrap that in a lambda so we can do other things as well. First we call EndWrite on the stream (which may throw an exception, but I’ll deal with that in a moment), then we call on to the callback returned by AsyncEnumerator.End, and finally we do the housekeeping of discarding one IAsyncResult instance from AsyncEnumerator’s internal inbox.
Then this whole function will be executed by the Execute extension method. Some more substitution will make this clearer. Here’s the expression we use to generate the kind of IEnumerable<int> that the intrinsic Execute method requires:
So each activity is called in turn to produce integers, and the result is the same as if the return value of the activity was being yielded directly. In other words, continuing our expansion (now supposing we are in a function that is yielding integers having been passed the parameter asyncEnum):
It is now clear what the only actual differences are. Compare with Jeffrey’s original code (I’ll repeat it here one more time to save you scrolling up and down the page).
There, the End() callback causes the function to resume executing after the operation finishes, and the next line of code is the call to stream.EndWrite. Working the new way, stream.EndWrite is called from within the callback itself. It also has the correct IAsyncResult ready to use.
Also, immediately after calling the function returned by End(), the result is removed from the inbox by calling DequeAsyncResult. This places an additional thread-safety constraint on AsyncEnumerator, because previously it looks like the inbox is only accessed by one thread at a time. But by looking at the code of AsyncEnumerator in Reflector, I can see that it takes out a lock before accessing the queue, so this should be fine.
So what are the limitations of this new approach? The main one is the lack of flexibility in exception handling. With the original raw approach, you are free to place a try/catch around either the BeginXXX or EndXXX calls (although you can’t place one such handler around both calls, due to a limitation of the yield return implementation.)
In the new approach, the best we can do is to allow any exceptions to propagate out of the IEnumerator<int> sequence generator. In other words, if the BeginXXX or EndXXX calls throw an exception, then it is just as if they were to throw uncaught exceptions in the original approach.
To achieve this, we need to make a further change. The reason is that the EndXXX call is made in the context of a thread that is running due to the fact that an asynchronous call has completed. We do not control that thread. Rather than bothering it with an exception, we need to transfer the exception into the context of whichever thread is executing our function.
To achieve this, first we have to catch the exception. The user’s callback is executed in the inner lambda in Pair, so that’s where we need a try/catch block:
Note that we make use of a handy feature of AsyncEnumerator where we stash the exception inside it by calling Cancel. We then continue as normal, calling on to AsyncEnumerator’s callback, which means that the iteration will resume. What does this mean?
It means that when iteration resumes, the first thing we need to check for the cancellation, retrieve the exception and throw it. Unfortunately, this means we need to mess up our nice simple Execute function. If you refer to where I defined it above, you’ll see that I originally used Select to do all the looping and yielding. But this doesn’t allow us to perform the exception check immediately after resuming from the yield return. We need to write out the equivalent of Select in “long hand” so we can insert the extra code:
The private AdaptEnumerator function serves as the equivalent of Select, except that it checks for an exception to throw immediately following resumption after the yield return. This means that after the asynchronous action completes with an error, no further code executes in the underlying function that yields the Activities.
Like I say, I have only written this and done the above analysis on it, not really tested much. If you want to give it a try, be my guest:
Please let me know if you find any ridiculous bugs or design flaws, or even if it works. It’s just a source file containing all the functions (just 150 or so lines of code), so it could be added to any project already using AsyncEnumerator, or built into a separate class library.