Asynchronous Programming with the Reactive Framework and the Task Parallel Library — Part 3
02 January 2011
Note: This is the last in a series of three articles exploring differences between Rx and the TPL. The series assumes familiarity with the .NET Asynchronous Programming Model (APM).
- Part 1: Wrapping APM Operations
- Part 2: Implementing APM Operations
- Part 3: Schedulers
As you may imagine, threading is very important to asynchronous programming. Where are all those callbacks coming from? What thread are they executed on and when? The answers to these questions are determined by a mechanism called a scheduler.
A scheduler?
Both the Reactive Framework and the Task Parallel Library define their own concept of a scheduler. A scheduler is like a gatekeeper to the CPU. Schedulers expose a method which allows outside code to provide a piece of work which should be performed. In many scheduler implementations, the piece of work is put into a queue, and the scheduler performs the work at some point in the future. The thread on which the work is performed and when is decided by the scheduler, and is important to consider if you’re writing a program with a user interface, or if you’re writing a server which manages special worker threads.
Scheduling in Rx
In the Reactive Framework, there are three places where the calling thread is important: the call to IObservable.Subscribe
, the call to the Dispose
method of the IDisposable
returned by IObservable.Subscribe
, and the calls to the methods of IObserver
. Because a subscription or a disposal of a subscription may cause the observable to start or stop doing something, its important to consider what thread those calls are coming in on when implementing an observable. Similarly, when subscribing to an observable, you need to be sure about the thread on which your observer is notified by the observable.
The Reactive Framework defines an interface called IScheduler
, which allows consumers of the interface to provide an Action
which should be invoked by the scheduler instance.
public interface IScheduler
{
IDisposable Schedule(Action action);
IDisposable Schedule(Action action, TimeSpan dueTime);
DateTimeOffset Now { get; }
}
The Reactive Framework provides two extension methods for working with schedulers, both defined on IObservable
.
public static class Observable
{
public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, IScheduler scheduler);
public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, IScheduler scheduler);
}
SubscribeOn
returns an observable which, when subscribed to or unsubscribed from, causes subscription to and unsubscription from the source observable to occur as scheduled by the scheduler. Essentially, it creates and returns a proxy observable which, rather than immediately performing the actions, delegates them to the given scheduler.
The ObserverOn
method does the same for notifications. It creates and returns a proxy observable which simply forwards subscription to the source observable, but whenever the source observable yields a value, an exception, or completes, the proxy schedules an action on the scheduler which in turn invokes the appropriate callback on any observer which subscribes to the proxy observable. The effect is that when you subscribe to the resulting observable, you will always get callbacks on a thread determined by the scheduler you provided to ObserverOn
.
Scheduling in the TPL
Since the TPL is all about running user delegates, it provides a mechanism to easily configure the thread on which those delegates are run.
The scheduler interface in the TPL is an abstract class called TaskScheduler
. The fact that it is an abstract class makes it a bit more difficult to sum up neatly in the same way that the Reactive Framework’s IScheduler
interface, but it largely performs the same function—it accepts work in the form of Task
instances, and eventually performs that work.
To control the execution of tasks, several APIs in the TPL accept a TaskScheduler
as an argument. The most important and commonly used ones are Task.ContinueWith
, Task.Start
, and TaskFactory.StartNew
. The behavior of Task.Start
, and TaskFactory.StartNew
can be considered equivalent—TaskFactory.StartNew
just saves you from having to allocate a new Task
yourself and then call the Start
method on it.
All of these APIs create a new Task
, and a TaskScheduler
may be provided to control the execution of the newly-created task. If a TaskScheduler
is not provided, the TPL will execute the task on the current TaskScheduler
. The current scheduler is available through the static method TaskScheduler.Current
. In the context of an executing Task
, the current scheduler is, as you might expect, the TaskScheduler
which is executing the current task. Outside the context of an executing task, the current TaskScheduler
is the default TaskScheduler
, which queues tasks for execution on the thread pool.
When completing a task using TaskCompletionSource
, the thread which calls SetResult
or SetException
does not matter. Any tasks consequent to the Task
managed by TaskCompletionSource
will execute on the scheduler provided to the call to ContinueWith
which created the consequent task.
What does it all mean?
For most users of these frameworks, schedulers are a relatively opaque mechanism which addresses threading concerns. For example, if you’re writing a program with a GUI, all you have to do when you want to perform a background operation is make sure it is performed on a scheduler which works in the background. When you want to retrieve the results of that operation, just make sure that the callbacks which receive the results are performed on a scheduler which runs on the UI thread.
void RxButton_Click(...)
{
Observable.Create<Unit>(o => {
try
{
longRunningOperation();
}
catch (Exception e)
{
o.OnError(e);
}
o.OnCompleted();
return () => { }; // no-op disposable
})
.SubscribeOn(Scheduler.ThreadPool) // subscribe operation will kick off on a thread pool thread
.ObserveOn(Scheduler.Dispatcher) // results will come in on the UI thread
.Subscribe(
_ => { },
e => Console.WriteLine("Got error."),
() => Console.WriteLine("Success! Back on UI thread."));
}
void TplButton_Click(...)
{
var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
var task = new Task.Factory.StartNew(() => longRunningOperation());
task.ContinueWith(t => {
if (t.IsFaulted) Console.WriteLine("Got error.");
else Console.WriteLine("Success! Back on UI thread.");
}, uiScheduler);
}
An important difference between the TPL and Rx is that tasks are usually passed around already-started, so stringing up callbacks with ContinueWith
happens after the task has already started and possibly already completed. With Rx, observables can be created but are generally not active until callbacks are provided through a subscription.
In the previous articles, we saw that the asynchronous mechanisms of the TPL, Rx, and the APM pattern have been roughly equivalent; that it is possible to implement adapters between all of them. However, the APM doesn’t provide any explicit concept of a scheduler. The thread on which the callback from an APM operation is called is up to the implementor of the operation. Most APM operations provided by the .NET BCL invoke their callbacks on thread pool threads, and in the callback you must manually marshal the result or exception of the APM End method using the SynchronizationContext
class or a similar mechanism. Both the TPL and Rx make this much easier to do by providing a scheduling mechanism and exposing APIs which allow the user to easily provide a scheduler to indicate what thread should be used to perform work.
Tweet Follow @bvanderveen