STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

Microsoft has released a new library for building “reactive” applications. It’s full name is Reactive Extensions for .NET but is generally referred to as just “Rx”. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as Multicast delegates or Events. Multicast delegates (which Events are) however can be cumbersome to use, have a nasty interface and are difficult to compose and can not be queried. Rx looks to solve these problems.
Here I will introduce you to the building blocks and some basic types that make up Rx.

IObservable

IObservable is one of the 2 core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft are so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable as a Stream of T objects. So if a method returned an IObservable I could think of it as a stream of Prices.

IObserver

IObserver is the other one of the 2 core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don’t worry if you are not on .NET 4.0 yet as the Rx team have included these 2 interfaces in a separate assembly for .NET 3.5 users. IObserver is meant to be the “functional dual of IEnumerable”. If you want to know what that last statement meant then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable can effectively yield 3 things (the next value, an exception or the end of the sequence), so too can IObservable via IObserver’s 3 methods OnNext(T), OnError(Exception) and OnCompleted().
Interestingly, while you will be exposed to the IObservable interface a lot if you work with Rx, I find I don’t often need to concern myself with IObserver. Another interesting thing I have found with Rx is that I never actually implement these interfaces myself, Rx provides all of the implementations I need out of the box. Lets have a look at the simple ones.

Subject

If you were to create your own implementation of IObservable you may find that you need to expose method to publish items to the subscribers, throw errors and notify when the stream is complete. Hmmm they all sound like the methods on the IObserver interface. While it may seem odd to have one type implementing both interfaces, it does make life easy. This is what subjects can do for you.  Subject is the most basic of the subjects. Effectively you can expose your Subject behind a method that returns IObservable but internally you can use the OnNext, OnError and OnCompleted methods to control the stream.
In this (awfully basic) example, I create a subject, subscribe to that subject and then publish to the stream.

using System;
using System.Collections.Generic;

namespace RxConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            var subject = new Subject<string>();

            WriteStreamToConsole(subject);

            subject.OnNext("a");
            subject.OnNext("b");
            subject.OnNext("c");
            Console.ReadKey();
        }

        private static void WriteStreamToConsole(IObservable<string> stream)
        {
            stream.Subscribe(Console.WriteLine);
        }
    }
}

Note that the WriteStreamToConsole method takes an IObservable as it only wants access to the subscribe method. Hang on, doesn’t the Subscribe method need an IObserver? Surely Console.WriteLine does not match that interface. Well not it doesn’t but the Rx team supply me with an Extension Method to IObservable that just takes an Action. The action will be executed every time an item is published. There are other overloads to the Subscribe extension method that allows you to pass combinations of delegates to be invoke for OnNext, OnCompleted and OnError. This effectively means I don’t need to implement IObserver. Cool.
As you can see, Subject could be quite useful for getting started in Rx programming. Subject is a basic implementation however. There are 3 siblings to Subject that offer subtly different implementations which can drastically change the way your program runs.

ReplaySubject

ReplaySubject will remember all publications to it so that any subscriptions that happen after publications have been made, will still get all of the publications. Consider this example where we have moved our first publication to occur before our subscription

static void Main(string[] args)
{
    var subject = new Subject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}

The result of this would be that “b” and “c” would be written to the console, but “a” ignored. If we were to make the minor change to make subject a ReplaySubject we would see all publications again.

static void Main(string[] args)
{
    var subject = new ReplaySubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}

This can be very handy for eliminating race conditions.

BehaviorSubject

BehaviorSubject is similar to ReplaySubject except it only remembers the last publication. BehaviorSubject also requires you to provide it a default value of T. This means that all subscribers will receive a value immediately (unless it is already completed).
In this example the value “a” is written to the console.

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}

In this example the value “b” is written to the console, but not “a”.

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    subject.OnNext("b");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}

In this example the values “b”, “c” & “d” are all written to the console, but again not “a”

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    WriteStreamToConsole(subject);
    subject.OnNext("c");
    subject.OnNext("d");
    Console.ReadKey();
}

Finally in this example, no values will be published as the stream has completed. Nothing is written to the console.

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    WriteStreamToConsole(subject);
    
    Console.ReadKey();
}

AsyncSubject

AsyncSubject is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the stream is completed.
In this example no values will be published so no values will be written to the console.

static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}

In this example we invoke the OnCompleted method and the value “c” is published and therefore written to the console.

static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    Console.ReadKey();
}

So that is the very basics of Rx. With only that under you belt it may be hard to understand why Rx is a topic of interest. To follow on from this post I will discuss further fundamentals to Rx

  1. Extension methods
  2. Scheduling / Multithreading
  3. LINQ syntax

Once we have covered these it should allow you to really get Rx working for you to produce some tasty Reactive applications. Hopefully after we have covered these background topics we can knock up some Samples where Rx can really help you in your day to day coding.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Related links :

IObservable interface – MSDN

IObserver interface – MSDN

Observer Design pattern – MSDN

Rx Home

Exploring the Major Interfaces in Rx – MSDN

ObservableExtensions class – MSDN

Using Rx Subjects – MSDN

System.Reactive.Subjects Namespace – MSDN

Subject – MSDN

AsyncSubject – MSDN

BehaviorSubject – MSDN

ReplaySubject – MSDN

Subject static class – MSDN

ISubject – MSDN

ISubject – MSDN

Back to the contents page for Reactive Extensions for .NET Introduction
Forward to next post; Part 2 – Static and extension methods

Technorati Tags: ,,