Benjamin van der Veen

Asynchronous Programming with the Reactive Framework and the Task Parallel Library — Part 1

Note: This is the first in a series of three articles exploring differences between Rx and the TPL. The series assumes familiarity with the .NET Asynchronous Programming Model (APM).

On the .NET HTTP Abstractions list we decided early on that we wanted to allow OWIN applications to provide some sort of asynchronous primitive through the response enumerable. The rationale is that sometimes an OWIN host might attempt to enumerate the next item in the enumerable sequence, but the application might not be ready to provide the item immediately. We considered the options .NET had built-in: IObservable<T> (the Reactive Framework) and Task<T> (the Task Parallel Library).

I had already built Kayak around the Reactive Framework, and I was pretty happy with that mechanism. The one perceived drawback was that an observable could contain multiple values, but everywhere I used it throughout my system, it was axiomatically required that every observable yield a single value and complete, or yield an exception. The fact that this wasn’t enforced by the compiler nagged at me.

I was leaning toward Task<T> for OWIN because, by definition, it yields a single value or an exception. However, I hadn’t played with the Task Parallel Library yet, so I didn’t press the issue too hard initially. Last week, I spent some time familiarizing myself with the Task Parallel Library by porting Kayak’s asynchronous architecture from the Reactive Framework to the TPL.

A Brief Introduction to the Reactive Framework

In the Reactive Framework, there are two primary interfaces: IObservable<T> and IObserver<T>.

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<T>
{
    void OnNext(T value);
    void OnException(Exception e);
    void OnCompleted();
}

IObservable<T> defines a single Subscribe method which takes an IObserver<T>. After Subscribe is called, the observable may call the methods of the observer with values and exceptions at some point in the future. The Subscribe method returns an IDisposable which, when disposed, informs the observable that the observer provided to Subscribe is no longer interested in receiving values from the observable.

Depending on the semantics of the observable, Subscribe may cause the observable to “start” whatever process generates the values that make up the observable sequence—for instance, if the observable represents a TCP listener, its values would be sockets representing accepted connections. Upon a call to Subscribe, the TCP listener would cause a new listening socket to be created and bound, and whenever a connection is accepted, it would call the OnNext method of the observer with the socket object which represents the connection.

Again, depending on the semantics of the observable, disposing the disposable returned from the Subscribe method may cause the observable to “stop” doing whatever it was doing to generate values. In our TCP listener example, the listener would shut down the listening socket, preventing any more connections from being accepted.

Wrapping an APM operation with IObservable<T>

In the Kayak’s asynchronous architecture, I originally used IObservable<T> to represent and wrap up asynchronous operations following the .NET Asynchronous Programming Model (APM) pattern. The Reactive Framework provides Observable.FromAsyncPattern which creates an function which, when invoked, returns observable representing an APM operation. When an observer subscribes to this observable, the Begin method of the APM operation is invoked, and the operation is started. When it completes, the OnNext method of the observer is invoked with the result of the operation. Unsubscribing from this observable has no effect, because APM operations cannot be cancelled.

public static partial class Extensions
{
    public static Func<IObservable<int>> ReadAsyncRx_FromAsync(this Stream stream, byte[] buffer, int offset, int count)
    {
        return Observable.FromAsyncPattern<int>(
            (c, s) => stream.BeginRead(buffer, offset, count, c, s),
            iasr => stream.EndRead(iasr));
    }
}

However, this method was not adequate for Kayak. Kayak supports a “green threaded” model, so I need to very carefully control when and on what threads Kayak code gets executed, and I achieve this by making sure that the AsyncCallback of any APM operation is invoked on a special thread managed by Kayak (or more specifically, the underlying IO driver). The observables created by FromAsyncPattern do not guarantee that the OnNext, OnException, and OnCompleted methods of subscribed observers are called on the same thread on which the underlying APM calls the AsyncCallback provided to it. Thus, I wrote a special observable which issued callbacks in a predictable way.

public static partial class Extensions
{
        public static IObservable<int> ReadAsyncRx_ApmObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return new ApmObservable<int>(
                (c, s) => stream.BeginRead(buffer, offset, count, c, s),
                iasr => stream.EndRead(iasr));
        }

}

// Observable.FromAsyncPattern reschedules the callback on the ThreadPoolScheduler. Lame!
// We need to allow the underlying APM operation to determine what thread the callback 
// comes in on.
// 
// And so we roll our own.
public class ApmObservable<T> : IObservable<T>
{
    Func<AsyncCallback, object, IAsyncResult> begin;
    Func<IAsyncResult, T> end;

    public ApmObservable(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, T> end)
    {
        this.begin = begin;
        this.end = end;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IAsyncResult asyncResult = null;

        AsyncCallback callback = (iasr) =>
        {
            T value = default(T);
            try
            {
                value = end(iasr);
            }
            catch (Exception e)
            {
                observer.OnError(e);
                return;
            }
            observer.OnNext(value);
            observer.OnCompleted();
        };

        try
        {
            asyncResult = begin(callback, null);
        }
        catch (Exception e)
        {
            observer.OnError(e);
        }

        return Disposable.Empty;
    }
}

Upon subscription, the asynchronous operation is initiated by a call to the APM Begin method, and an AsyncCallback delegate is provided. In typical APM fashion, the implementation of the AsyncCallback calls the APM End method to retrieve the result of the operation, and catches the exception, if any. The implementation of the AsyncCallback then calls the observer’s OnNext method with the result of the operation followed by OnCompleted, or, if the APM End method threw an exception, it calls OnError with the exception. Again, because APM operations in .NET cannot be cancelled, the IDisposable returned from Subscribe is Disposable.Empty.

A Brief Introduction to the Task Parallel Library

The core interfaces of the Reactive Framework roughly correspond to TPL classes Task<T> and TaskCompletionSource<T>. Task is a bit like IObservable<T>, in that it, by itself, represents an asynchronous operation. TaskCompletionSource<T> is a bit like IObserver<T>, in that it defines SetResult and SetException methods which accept values and exceptions.

Tasks provide values and exceptions though their Result and Exception properties, but these properties are a bit freaky—if the task hasn’t completed yet, these properties will block the calling thread until the task completes before returning a value. It’s a very, very rare case where this behavior is what you want—if you’re doing asynchronous programming, threads blocking is often what you’re trying to avoid in the first place! Instead, you want to get a callback when the task is completed, so that you can safely access the Result or Exception properties without blocking. Task objects provide the ContinueWith method for this purpose. There are a lot of overloads of ContinueWith which introduce slight variations, but the basic idea is that it takes a delegate which is executed when the target (or in MSDN parlance, “antecedent”) task completes, and returns a new Task which represents the execution of the delegate.

Think about that for a second. Task.ContinueWith doesn’t take a task, it creates one from a delegate. Now take a look at the constructors for Task—they all take delegates and create new Tasks from them. If a delegate is sufficient to create a task, then one might infer that the task completes when the delegate is finished executing. So constructing a task with a delegate is just a fancy way of saying ThreadPool.QueueUserWorkItem, with the added plumbing of handling a return value and any exceptions and making them available through some properties.

So how can a Task represent an asynchronous operation which isn’t backed by a thread pool thread, such as a read or write to or from a network socket?

Enter TaskCompletionSource<T>. TaskCompletionSource<T> is kind of like IObserver<T> and IObservable<T> wrapped into a single package. (In the Reactive Framework, there is a type called ISubject<T> which is just that—it implements both IObservable<T> and IObserver<T>). When you construct a TaskCompletionSource<T>, it internally constructs a special task, accessible through the TaskCompletionSource’s Task property, which doesn’t require a delegate. Instead, TaskCompletionSource<T> defines two methods, SetResult and SetException, which when called, cause the TaskCompletionSource’s special Task to complete with the given result or exception.

Wrapping an APM operation with Task<T>

I’m now using Task<T> to wrap up APM operations in Kayak. The Task used to represent these operations is provided by a TaskCompletionSource<T>, since in Kayak, every asynchronous operation may be performing asynchronous IO somewhere and therefore cannot be expressed as a single delegate.

public static partial class Extensions
{
    public static Task<int> ReadAsyncTpl(this Stream stream, byte[] buffer, int offset, int count)
    {
        var tcs = new TaskCompletionSource<int>();

        stream.BeginRead(buffer, offset, count, iasr =>
        {
            try
            {
                tcs.SetResult(stream.EndRead(iasr));
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
        }, null);

        return tcs.Task;
    }
}

So the first step in wrapping up an asynchronous operation with Task<T> is to construct a TaskCompletionSource<T>, whose task will ultimately be returned. Next, the wrapper method invokes the APM Begin method, and provides an AsyncCallback whose implementation attempts to call the APM End method. The AsyncCallback implementation calls TaskCompletionSource<T>.SetResult with the return value of the End method, or, if the End method throws an exception, the AsyncCallback calls TaskCompletionSource<T>.SetException with the exception. When either SetResult or SetException are called, the TaskCompletionSource’s special task completes. Any threads waiting on the Result or Exception properties of the task will continue, and any tasks to which the TaskCompletionSource’s task is antecedent (i.e., those tasks created by calling ContinueWith on the special TaskCompletionSource task, which is returned from this wrapper method) will be executed.

The TPL does actually provide an adapter from the APM pattern out of the box in the form of the method TaskFactory.FromAsync. This method is heavily overloaded, but there are two basic variations:

public Task<TResult> FromAsync<TResult>(Func<AsyncCallback, object, IAsyncResult> beginMethod, Func<IAsyncResult, TResult> endMethod, object state);
public Task<TResult> FromAsync<TResult>(IAsyncResult asyncResult, Func<IAsyncResult, TResult> endMethod);

Our example is equivalent to the first. The second however has a markedly different behavior. Notice that it takes IAsyncResult as an argument—when using this overload, the calling code must invoke the APM Begin method to provide an IAsyncResult, which means the FromAsync method cannot provide an AsyncCallback to the begin method. Therefore, it cannot be notified asynchronously when the APM operation is completed. Instead, it must wait on the AsyncWaitHandle property of the IAsyncResult. Beware when using overloads of FromAsync which take IAsyncResult—threads waiting is rarely the behavior you want! This subtle difference has very large implications for the flow of your program.

What else?

An important difference between the TPL and the Reactive Framework versions of these wrapper method is that the APM Begin method is called when the TPL wrapper method is called, whereas the Rx version defers calling the APM Begin method until the observable gets a subscription. This has important ramifications for your programs which will be covered in a subsequent article.

We seen how to invoke existing APM operations using both the Reactive Framework and the Task Parallel Library. The source code for these examples is available in a Gist.

In the next article, we will implement new APM operations in terms of both the Reactive Framework and the Task Parallel Library.


Copyright © 2015 Benjamin van der Veen. atom feed