Benjamin van der Veen

Asynchronous Programming with the Reactive Framework and the Task Parallel Library — Part 2

Note: This is the second in a series of three articles exploring differences between Rx and the TPL. The series assumes familiarity with the .NET Asynchronous Programming Model (APM).

In the Part 1, we discussed how to wrap APM asynchronous operations in Task<T> and IObservable<T>. In this article we’ll take a look at doing the opposite: how to implement an APM-style asynchronous operation using the Task Parallel Library and the Reactive Framework. The first draft of the OWIN specification defined APM operations for invoking a web application and reading the body of an HTTP request. Because Kayak had used both Rx and the TPL for all of its internal asynchronous logic, I needed to be able to expose that asynchronous logic as an APM operation in order to implement the spec.

A Bit About IAsyncResult

One of the nastiest things about implementing the APM pattern is the need to implement IAsyncResult. The idea behind IAsyncResult is that it should provide code that initiates an APM operation information about the identity and state of the operation. A full discussion of IAsyncResult is beyond the scope of this article, but it will suffice to say that in modern C#, IAsyncResult is little more than an opaque token used to obtain the result of an asynchronous operation from an APM End method.

Quite curiously and very unfortunately, the .NET BCL did not historically provide an implementation of IAsyncResult. For our APM pattern implementation using the Reactive Framework, we will use the implementation of IAsyncResult described in this MSDN Magazine article by Jeffrey Richter.

Implementing the APM Pattern with the Reactive Framework

We will create a simple wrapper class that adapts a Func<TArg, IObservable<TResult>> delegate into an APM pattern Begin/End pair. When the APM Begin method is called, our wrapper will call the delegate to retrieve an observable which represents the asynchronous operation. When the observable yields a result or a value, our wrapper class will invoke the AsyncCallback, and the wrapper’s APM End method will provide that result or value.

public class RxAsyncOperation<TArg, TResult> 
{
    Func<TArg, IObservable<TResult>> operation;
    IDisposable disposable;

    public RxAsyncOperation(Func<TArg, IObservable<TResult>> operation)
    {
        this.operation = operation;
    }

    public IAsyncResult BeginInvoke(TArg arg, AsyncCallback callback, object state)
    {
        AsyncResult<TResult> asyncResult = new AsyncResult<TResult>(callback, state);
        disposable = operation(arg).Subscribe(
            // the boolean arguments to SetAsCompleted indicate whether the operation 
            // completed synchronously. IObservable doesn't relay that information,
            // so we assume false.
            r =>
            {
                UnsubscribeIfPossible();
                asyncResult.SetAsCompleted(r, false);
            },
            e =>
            {
                UnsubscribeIfPossible();
                asyncResult.SetAsCompleted(e, false);
            },
            () => { });

        // if the operation completed during the call to subscribe, we won't have 
        // unsubscribed yet, so do it now.
        if (asyncResult.IsCompleted)
            UnsubscribeIfPossible();

        return asyncResult;
    }

    public TResult EndInvoke(IAsyncResult asyncResult)
    {
        // the AsyncResult<T> implementation takes care of waiting and throwing exceptions for us.
        return ((AsyncResult<TResult>)asyncResult).EndInvoke();
    }

    void UnsubscribeIfPossible()
    {
        if (disposable != null)
        {
            disposable.Dispose();
            disposable = null;
        }
    }
}

In the APM Begin method, we first construct an instance of AsyncResult<T>, the implementation of IAsyncResult we’re using. We provide the APM AsyncCallback and state object to the constructor of the AsyncResult<T>—it will take care of invoking the AsyncCallback for us.

Next, we invoke the delegate to retrieve an observable, and subscribe to that observable. Notice that we are keeping a reference to the IDisposable returned by the Subscribe method of the observable. This is necessary because after the observable yields a value or an exception, we need to unsubscribe from it—inform it that we no longer want to receive notifications. This is accomplished by calling the Dispose method of the IDisposable object returned from the Subscribe method of the observable.

In the OnNext and OnError callbacks, the first thing we do is unsubscribe from the observable to prevent further callbacks. It’s important to note that an observable may yield a value or an exception before Subscribe returns, which means that it’s possible that the observable callbacks might get called before the value of the disposable instance variable has set! Thus, unsubscribing from the observable might not be possible from within the OnNext and OnError callbacks. For this reason, after the Subscribe method returns, we check to see if the operation has already completed, and if so, attempt again to unsubscribe.

After unsubscribing in the OnNext and OnError callbacks, we inform the AsyncResult<T> implementation that the operation has completed with a value or error by calling the SetAsCompleted method. In response to SetAsCompleted, the AsyncResult<T> object will call the AsyncCallback we provided to its constructor, and the IsCompleted property of the will AsyncResult become true.

The final step in the implementation of the APM Begin method is to return the IAsyncResult instance.

The implementation of the APM End method is fairly trivial. Its job is to wait for the operation to complete, and return the result of the APM operation, or, if the operation resulted in an exception, re-throw that exception. Fortunately, the AsyncResult<T> class takes care of all of this for us with its EndInvoke method.

Implementing the APM Pattern with the Task Parallel Library

It is quite a bit simpler to implement the APM pattern with the TPL due to one convenient fact: Task<T> implements the IAsyncResult interface! In this example, we’re going to make a wrapper class which is very similar to the one we created for the Rx example—it will adapt a Func<TArg, Task<TResult>> delegate to an APM operation. When the APM Begin method is called, our wrapper will call the delegate to obtain a Task<T> which represents the operation, and when the operation completes, our wrapper will invoke the AsyncCallback delegate provided by the user, and the APM End method will provide the result value or exception.

public class TplAsyncOperation<TArg, TResult>
{
    Func<TArg, Task<TResult>> operation;

    public TplAsyncOperation(Func<TArg, Task<TResult>> operation)
    {
        this.operation = operation;
    }

    public IAsyncResult BeginInvoke(TArg arg, AsyncCallback callback, object state)
    {
        var task = operation(arg);
        task.ContinueWith(_ => callback(task));
        return task;
    }

    public TResult EndInvoke(IAsyncResult result)
    {
        return ((Task<TResult>)result).Result;
    }
}

As you can see, the TPL version is quite a bit simpler than the Rx version! We simply retrieve the underlying Task<T>, register a callback to be notified of its completion using ContinueWith, and return the Task<T>! When that task completes, the delegate we provided to ContinueWith is invoked, which in turn invokes the AsyncCallback provided by the client code.

Like the implementation of AsyncResult.EndInvoke which we used for the Rx example, the implementation of Task<T>.Result takes care of waiting and throwing exceptions for us, so we can just return that value, and everything will work as expected.

A winner?

The TPL version of the code for wrapping and implementing the APM pattern is much shorter and sweeter than the equivalent Rx code. This is not to say that the TPL is superior to the Reactive Framework, but it is clear that the TPL onto the APM pattern more cleanly than does the Reactive Framework. Both the APM and TPL share single-value-or-exception semantics, with only the producer side defining when a value will be produced; the semantics of the Reactive Framework are more general, allowing many-values-maybe-one-exception, with either the producer or consumer choosing when how many values are transmitted and when transmission should start or stop.

In the previous article, we wrapped APM operations with the Task Parallel Library and the Reactive Framework; this time, we’ve gone the other way and wrapped the Task Parallel Library and the Reactive Framework with APM operations. The complete source code for this article is available in a Gist. In the next article, we will consider threading and the how-when-where of asynchronous callbacks in both the TPL and Rx.


Copyright © 2015 Benjamin van der Veen. atom feed