STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far we have discovered the basics of  the Reactive Framework which allows us to create, subscribe and perform some basic aggregations, buffering and time shifting over implementations of IObservable. We have yet to look at how we could unsubscribe from a subscription. If you were to look for an Unsubscribe method in the Rx public API you would not find any. Instead of supplying an Unsubscribe method, Rx will return an IDisposable when ever a subscription is made. This disposable can be thought of as the subscription itself and therefore disposing it will dispose the subscription and effectively unsubscribe. Note that calling Dispose on the result of a Subscribe call will not affect the underlying IObservable, just the instance of the subscription to the IObservable. This then allows us to call Subscribe many times on a single IObservable, allowing subscriptions to come an go with out affecting each other. In this example we initially have two subscriptions, then we dispose of one subscription early which still allows the other to continue to receive publications from the underlying IObservable.

var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");

Console.ReadKey();
/*Outputs:
1st subscription recieved 0
2nd subscription recieved 0
1st subscription recieved 1
2nd subscription recieved 1
1st subscription recieved 2
2nd subscription recieved 2
1st subscription recieved 3
2nd subscription recieved 3
2nd subscription recieved 4
Disposed of 1st subscription
2nd subscription recieved 5
2nd subscription recieved 6
2nd subscription recieved 7

etc....
*/

In the above example, it looks like the values are being produced by the interval Observable by a single OnNext call, however these are independent and work similarly to how a Observable.Create method would work. In this sample we just pause a bit before making our second subscription. Note that the output is different to the above example.

var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));

var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
Thread.Sleep(500);
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");
/*
Ouput:
1st subscription recieved 0
1st subscription recieved 1
1st subscription recieved 2
1st subscription recieved 3
1st subscription recieved 4
2nd subscription recieved 0 
1st subscription recieved 5
2nd subscription recieved 1
1st subscription recieved 6
1st subscription recieved 7
2nd subscription recieved 2
1st subscription recieved 8
2nd subscription recieved 3
Disposed of 1st subscription
2nd subscription recieved 4
2nd subscription recieved 5
2nd subscription recieved 6
etc...

*/

The benefits of using the IDisposable Type instead of creating a new ISubscription/IUnsubscription interface or amending the IObservable interface to have an Unsubscribe method is that you get all of these things for free:

  • The type already exists
  • people understand the type
  • IDisposable has standard usages and patterns
  • Language support via the Using keyword
  • Static analysis tools like FxCop can help you with its usage.

OnError and OnCompleted()

Both the OnError and OnCompleted signify the completion of a stream. If your stream publishes a OnError or OnCompleted it will be the last publication and no further calls to OnNext can be performed. In this example we try to publish an OnNext call after an OnCompleted and the OnNext is ignored.

var subject = new Subject<int>();
subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
subject.OnCompleted();
subject.OnNext(2);

Of course you could implement your own IObservable that did allow publishing after an OnComplete or an OnError, however it would not follow the precedence of the current Subject types and would be a non standard implementation. I think it would be safe to say that the inconsistent behaviour would cause unpredictable behaviour in the applications that consumed your code.

An interesting thing to consider is that when a stream completes or errors, you should still dispose of you subscription. This can make for messy code, but we will discuss best practices in a later post.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 2 – Static and extension methods
Forward to next post; Part 4 – Flow control

Technorati Tags: ,,