November 24, 2007

Dream Asynchronicity Library

Steve Bjorg @ 12:50 pm

In previous posts (here and here), I introduced the building blocks for asynchronous programming in MindTouch Dream. In this post, I want to introduce the Async class, which provides common methods for asynchronous programming.

Concurrency & Coordination

The first set of methods are helpers to create concurrent tasks. These are reminiscent of the methods found in the Thread and ThreadPool classes. In fact, they are wrappers for the latter. However, they add a bit of code so that a forked task gets a copy of the current task environment (more on tasks in a future post; for now consider them synonymous with threads). Another addition is the Result parameter which makes it easy to synchronize with the outcome of the forked task.

Result Async.Fork(VoidHandler handler, Result result)
Result<T> Async.Fork<T>(ResultHandler<T> handler, Result<T> result)

For example, consider the following code that uses a forked task to complete an operation in parallel:

int i = 10;
Result<int> result = Async.Fork(delegate() {
    return i + 42;
}, new Result<int>());
Console.WriteLine("Result: " + result.Value); // this will write 'Result: 52'

When working with multiple, concurrent tasks, it’s important to be able to synchronize on their outcome. The WhenAllDone() method serves this purpose.

Result<ResultBase[]> Async.WhenAllDone(ResultBase[] list, Result<ResultBase[]> result)

For example, we might want to do several concurrent operations at once and synchronize on their outcome:

List<Result> list = new List<Result>();
for(int i = 0; i < 10; ++i) {
    list.Add(Async.Fork(delegate() { // fork a task and add the result instance to the list
        Console.WriteLine("Hello"); // write 'Hello'
    }, new Result()));
}
Async.WhenAllDone(list.ToArray(), new Result<ResultBase[]>()).Wait();
Console.WriteLine("Bye"); // this will write 'Bye' after the 10 'Hello' lines

As you can see, the Fork() methods make it a snap to create tasks and synchronize with them.

Asynchronous I/O

The most common use for asynchronous operations is input/output. Here, the Async class provides three methods to make life easier when working with streams.

Result<int> Async.Read(Stream stream, byte[] buffer, int offset, int count, Result<int> result)
Result Async.Write(Stream stream, byte[] buffer, int offset, int count, Result result)
Result<long> Async.CopyStream(Stream source, Stream target, long length, Result<long> result)

The first two are the usual read/write operations. The following example shows how the Write() method can be used to write to a stream asynchronously and close it when done.

void WriteAndForget(Stream stream, byte[] buffer) {
    Async.Write(stream, buffer, 0, buffer.Length, new Result()).WhenDone(delegate(Result result) {
        stream.Close();
        if(result.HasException) {
            Console.WriteLine("An error occurred: " + result.Exception);  // write the exception
        }
    });
}

The third method enables simultaneous read and write operations to two streams. The CopyStream() method reads data in 4KB chunks from the input stream and then writes them to the output stream. However, in good asynchronous fashion, it starts the next read operation before the previous write operation completes. This enable greater throughput when copying files or streaming data. For example, consider the following code to copy a file asynchronously in the background:

void CopyFileAndForget(string from, string to) {
    Stream source = File.OpenRead(from);
    Stream target = File.OpenWrite(to);
    Async.CopyStream(source, target, -1, new Result(TimeSpan.MaxValue)).WhenDone(delegate(Result result) {
        source.Close();
        target.Close();
        if(result.HasException) {
            Console.WriteLine("An error occurred: " + result.Exception);  // write the exception
        }
    });
    // note: this method will exit before copying has completed!
}

Needless to say, we make extensive use of CopyStream() in our Dream and Deki Wiki code. It’s a real life saver, because it enables us to stream data from a network stream to another network stream without tying up a thread to do so.

Miscellaneous

The last methods are less frequently used, but I’ll cover them for completeness.

Result Async.Sleep(TimeSpan duration)

The Sleep() method suspends execution for the given time span. However, it does not block the current thread. Instead, it triggers the Result parameter once the time has elapsed. This functionality may appear superfluous since we already have the TaskTimer class, but it comes in handy for coroutines, which I’ll cover in a future post.

Result<WaitHandle> Async.WaitHandle(WaitHandle handle, Result<WaitHandle> result)

The WaitHandle() method bridges the usual .Net synchronization classes (e.g. Mutex, Semaphore, ManualResetEvent, …) and the Dream Result class. The Result object will be triggered when either the WaitHandle is signaled or the operation times out.

Summary

The methods in the Async class capture the most common use cases for asynchronous programming. Combined with the Result class, they make it much less daunting to write code that works well on multi-proc machines.

To complete our tour of the asynchronous programming model in MindTouch Dream, we need to look at two more concepts: Tasks, which I alluded to at the beginning of this post, and Coroutines, which are a different way to do asynchronous programming without getting entangled in continuations.

3 Comments »

  1. Async.WhenAllDone appears to have changed recently. The one in dream 1.4.1 only takes one argument (not two as above).

    Unfortunately the SVN url from the Source_Code page fails:

    c:\src>svn checkout https://dekiwiki.svn.sourceforge.net/svnroot/dream dream
    svn: PROPFIND request failed on ‘/svnroot/dream’
    svn: Could not open the requested SVN filesystem

    Comment by Vagn Johansen — November 27, 2007 @ 12:30 pm

  2. Thanks for pointing out the broken SVN link. We restructured the repository and forgot to update the links. The correct location is: https://dekiwiki.svn.sourceforge.net/svnroot/dekiwiki/public/dream

    My post above is in relation to Dream 1.5, which is currently only available via /trunk. The difference in WhenAllDone is that the old version did not take a Result object, so you couldn’t set the default timeout for joining on all outstanding operations. The parameter was added in 1.5 during a review of the API to ensure greater consistency.

    Comment by Steve Bjorg — November 27, 2007 @ 12:50 pm

  3. [...] also provides a concurrency and asynchronous execution library that enables efficient programming. Most of the complexity is encapsulated through the versatile [...]

    Pingback by Dream “Emerald” (1.5.0) released! | MindTouch Blog — February 27, 2008 @ 9:50 pm

RSS feed for comments on this post. TrackBack URL

Leave a comment