I have moved!

I've moved my blog
CLICK HERE

Friday, 5 December 2008

Asynchronous Sockets with Yield Return of Lambdas

Update: More in-depth stuff about this

I just watched this video of Jeffrey Richter demonstrating his AsyncEnumerator class:

http://channel9.msdn.com/posts/Charles/Jeffrey-Richter-and-his-AsyncEnumerator

The key idea is the realisation that yield return takes care of turning a single function into a continuation that can be resumed under the control of some other code, but still provides a natural way to write procedurally. This is just what is needed to manage asynchronous communication. Great stuff.

But I had an idea while watching it. His code seems to still place some burden on the caller to know about the details of asynchronous socket programming - how to specify a callback, etc.

Instead, why not hide those details. The trick is to use yield return to pass back lambda expressions containing the work to do asynchronously.

Here's a quick sample - look at this first, to see how easy it is to use. It's important you see that first, because unless you're into functional programming, the behind-the-scenes stuff may seem confusing.

/* Trivial example of a protocol - send a byte containing the payload
 * length, followed by that many bytes of data.
 * 
 * This server just sends back the same data in the same way.
 */
static IEnumerable<AsyncIOPipeAction> HandleMyClient()
{
    // Start by reading a byte from the client
    byte[] singleByte = new byte[1];
    yield return pipe => pipe.Read(singleByte, 0, 1);
 
    // That byte tells us how much to read next
    int inputPayloadLength = singleByte[0];
 
    byte[] inputPayload = new byte[inputPayloadLength];
    yield return pipe => pipe.Read(inputPayload, 0, inputPayloadLength);
 
    // Send same thing back again
    yield return pipe => pipe.Write(singleByte, 0, 1);
    yield return pipe => pipe.Write(inputPayload, 0, inputPayloadLength);
}

As you can see, this function doesn't actually get passed a socket to work on. And yet it has these suspiciously convenient-looking calls on to a pipe object. Where does that pipe object come from and what is it?

Note how the whole function returns IEnumerable<AsyncIOPipeAction>, so it produces actions to be called somehow. Those actions are delegates:

public delegate void AsyncIOPipeAction(IAsyncIOPipe pipe);

That pipe object supports this interface:

/* Example of an interface with operations that are either synchronous 
 * or asynchronous depending on the context in which they are called.
 */ 
public interface IAsyncIOPipe
{
    void Read(byte[] buffer, int offset, int size);
    void Write(byte[] buffer, int offset, int size);
}

You could follow the same pattern with any facility that supports asynchronous usage, but here I just provide two obvious operations on a socket.

So in the client-handling function, whenever you want to make use of the socket, you do a yield return to hand off some code that can be called back to. Your code must accept a pipe (which is just what you wanted). Then it can call a method on it.

The implementation IAsyncIOPipe is then very simple indeed:

class SocketPipe : IAsyncIOPipe
{
    private readonly Socket _socket;
    private readonly IEnumerator<AsyncIOPipeAction> _actions;
    
    public SocketPipe(Socket socket, 
                      IEnumerable<AsyncIOPipeAction> actions)
    {
        _socket = socket;
        _actions = actions.GetEnumerator();
 
        Step();
    }
 
    private void Step()
    {
        if (!_actions.MoveNext())
            return;
 
        _actions.Current(this);
    }
 
    private void Callback(IAsyncResult result)
    {
        _socket.EndReceive(result);
        Step();
    }
 
    public void Read(byte[] buffer, int offset, int size)
    {
        _socket.BeginReceive(buffer, offset, size, 
                             SocketFlags.None, Callback, this);
    }
 
    public void Write(byte[] buffer, int offset, int size)
    {
        _socket.BeginSend(buffer, offset, size, 
                          SocketFlags.None, Callback, this);
    }
}

The constructor is passed a bunch of actions to be carried out asynchronously. It doesn't actually care how that collection of actions is implemented. But of course we can call HandleMyClient to produce a suitable collection.

SocketPipe starts the ball rolling immediately, by starting the enumeration and then calling Step. This executes the first portion of code in the function up to the first yield return, after which we get control back. We then just execute the lambda giving it a pipe to work on - our implementation, in fact.

This means that when the lambda calls Read or Write, we can simply begin the right kind of asynchronous operation on our concealed socket, and we take care of specifying the callback. All our callback has to do is end the asynchronous operation and run the next Step.

Exercises for the reader:

  • There isn't much consideration given to error handling, mainly because I never even looked at the asynchronous socket APIs until just now, so I'm not sure where it needs to go. It should be possible to put error handling into this approach without putting any mess in the application code (e.g. HandleMyClient).
  • It should be possible to launch multiple asynchronous calls on multiple objects, where that makes sense (I can't see how it does in this simple example, but it might if you have multiple sockets for some reason). The lambda would need to be passed multiple parameters, one for each object currently "in play", and the management object would need to ensure that all outstanding asynchronous calls had completed before it called Step again.

Solution zip file is here:

http://www.earwicker.com/downloads/asyncsockets.zip

1 comment:

Simon Sanderson said...

This is an excellent post and I shall definately be adding that to my list of useful stuff for the future.

Simon Sanderson