November 24, 2007

Dream Asynchronicity Library

In previous posts (here and here), I introduced the building blocks for asynchronous programming in MindTouch Dream. In this post, I want to introduce the Async class, which provides common methods for asynchronous programming.

Concurrency & Coordination

The first set of methods are helpers to create concurrent tasks. These are reminiscent of the methods found in the Thread and ThreadPool classes. In fact, they are wrappers for the latter. However, they add a bit of code so that a forked task gets a copy of the current task environment (more on tasks in a future post; for now consider them synonymous with threads). Another addition is the Result parameter which makes it easy to synchronize with the outcome of the forked task.

Result Async.Fork(VoidHandler handler, Result result)
Result<T> Async.Fork<T>(ResultHandler<T> handler, Result<T> result)

For example, consider the following code that uses a forked task to complete an operation in parallel:

int i = 10;
Result<int> result = Async.Fork(delegate() {
    return i + 42;
}, new Result<int>());
Console.WriteLine("Result: " + result.Value); // this will write 'Result: 52'

When working with multiple, concurrent tasks, it’s important to be able to synchronize on their outcome. The WhenAllDone() method serves this purpose.

Result<ResultBase[]> Async.WhenAllDone(ResultBase[] list, Result<ResultBase[]> result)

For example, we might want to do several concurrent operations at once and synchronize on their outcome:

List<Result> list = new List<Result>();
for(int i = 0; i < 10; ++i) {
    list.Add(Async.Fork(delegate() { // fork a task and add the result instance to the list
        Console.WriteLine("Hello"); // write 'Hello'
    }, new Result()));
}
Async.WhenAllDone(list.ToArray(), new Result<ResultBase[]>()).Wait();
Console.WriteLine("Bye"); // this will write 'Bye' after the 10 'Hello' lines

As you can see, the Fork() methods make it a snap to create tasks and synchronize with them.

Asynchronous I/O

The most common use for asynchronous operations is input/output. Here, the Async class provides three methods to make life easier when working with streams.

Result<int> Async.Read(Stream stream, byte[] buffer, int offset, int count, Result<int> result)
Result Async.Write(Stream stream, byte[] buffer, int offset, int count, Result result)
Result<long> Async.CopyStream(Stream source, Stream target, long length, Result<long> result)

The first two are the usual read/write operations. The following example shows how the Write() method can be used to write to a stream asynchronously and close it when done.

void WriteAndForget(Stream stream, byte[] buffer) {
    Async.Write(stream, buffer, 0, buffer.Length, new Result()).WhenDone(delegate(Result result) {
        stream.Close();
        if(result.HasException) {
            Console.WriteLine("An error occurred: " + result.Exception);  // write the exception
        }
    });
}

The third method enables simultaneous read and write operations to two streams. The CopyStream() method reads data in 4KB chunks from the input stream and then writes them to the output stream. However, in good asynchronous fashion, it starts the next read operation before the previous write operation completes. This enable greater throughput when copying files or streaming data. For example, consider the following code to copy a file asynchronously in the background:

void CopyFileAndForget(string from, string to) {
    Stream source = File.OpenRead(from);
    Stream target = File.OpenWrite(to);
    Async.CopyStream(source, target, -1, new Result(TimeSpan.MaxValue)).WhenDone(delegate(Result result) {
        source.Close();
        target.Close();
        if(result.HasException) {
            Console.WriteLine("An error occurred: " + result.Exception);  // write the exception
        }
    });
    // note: this method will exit before copying has completed!
}

Needless to say, we make extensive use of CopyStream() in our Dream and Deki Wiki code. It’s a real life saver, because it enables us to stream data from a network stream to another network stream without tying up a thread to do so.

Miscellaneous

The last methods are less frequently used, but I’ll cover them for completeness.

Result Async.Sleep(TimeSpan duration)

The Sleep() method suspends execution for the given time span. However, it does not block the current thread. Instead, it triggers the Result parameter once the time has elapsed. This functionality may appear superfluous since we already have the TaskTimer class, but it comes in handy for coroutines, which I’ll cover in a future post.

Result<WaitHandle> Async.WaitHandle(WaitHandle handle, Result<WaitHandle> result)

The WaitHandle() method bridges the usual .Net synchronization classes (e.g. Mutex, Semaphore, ManualResetEvent, …) and the Dream Result class. The Result object will be triggered when either the WaitHandle is signaled or the operation times out.

Summary

The methods in the Async class capture the most common use cases for asynchronous programming. Combined with the Result class, they make it much less daunting to write code that works well on multi-proc machines.

To complete our tour of the asynchronous programming model in MindTouch Dream, we need to look at two more concepts: Tasks, which I alluded to at the beginning of this post, and Coroutines, which are a different way to do asynchronous programming without getting entangled in continuations.