Welcome to my series introducing Reactive Extensions to .NET (Rx). This series is aimed at any .NET developer curious about the IObservable and IObserver interfaces that have popped up in .NET 4. However Rx is a library so, Silverlight 3 & Silverlight 4 developers as well as .NET 3.5, Windows Phone and JavaScript coders can download the library. Rx is big. It is big in allsorts of ways:
- In the way that it tackles the Observer pattern is bold
- In the way it tackles concurrency is quite a shift from how I have done it before.
- The number of (extension) methods is huge.
- The way in which it integrates with LINQ to leverage LINQ’s composability & declarative style
- The fact that any .NET developer should care. UI developer, backend algorithm coder or integrator; It helps all of us.
- The future plans are even more grand, but that is a different series all together 🙂
In this series I will introduce you to:
- the new types the Rx will provide
- the extension methods and how to use them
- how to manage subscriptions to “streams” of data
- how to deal with concurrency to your advantage and avoid the common old pitfalls
- how to compose, aggregate and transform streams
- how to build workflows with Rx
- some tips and tricks I have picked while using Rx over the past months.
So download the assemblies to reference, fire up Visual Studio and let’s get started:
- Part 1 – Introduction to Rx
- Part 2 – Static and extension methods
- Part 3 – Lifetime management – Completing and Unsubscribing
- Part 4 – Flow control
- Part 5 – Combining multiple IObservable streams
- Part 6 – Scheduling and threading
- Part 7 – Hot and Cold observables
- Part 8 – Testing Rx
- Part 9 – Join, Window, Buffer and Group Join
The full source code is now available either via SVN at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Edit – This series of posts was first created in mid 2010. The Rx framework has gone through numerous changes during and after the writing of these posts. I am constantly making an effort to ensure that the blog stays as current as I can make it –Lee Campbell
Don't forget that RX is also packaged in the Windows Phone 7 Series libraries and a variant is available in SQL Server 2008 r2 Stream Insight.
LikeLike
These tutorials are awesome. It would be great it you bundled them into a PDF for distribution 🙂
LikeLike
@C.L.Phillips I will see what I can do 🙂
I have a few more posts to do and then I will try to make sure I can release a 3.5 and a 4.0 version that matches the latest Rx build (the API has changes twice since I started this series).
LikeLike
This comment has been removed by the author.
LikeLike
@Matt, on the home page http://introtorx.com/ there is a link to the Rx forums (MSDN) where you can leave suggestions. Feel free to alternatively leave comments/suggestions here or tweet them to me @LeeRyanCampbell
Looking forward to your feedback!
LikeLike
Hi, is there a plan to update introtorx.com to RX 2.0?
LikeLike
There are no immediate plans to upgrade the site to cater for Rx 2.0 or 2.1. The first edition was a very large commitment, and I dont feel I actually have used Rx 2.x enough to consider myself any sort of authority on it.
In the meantime I am working on an Rx 'cookbook' at RxCookbook on github
LikeLike
Nice post very helpful
DBAKings
LikeLike
Hi Lee – intro to Rx is an excellent resource! There's a small issue in the Timeout section; the example using a TimeSpan would actually output 0-9, rather than 0-4. Great stuff though. Thank you, Julius
LikeLike
Hi Lee,
Thanks for your book, it is great help.
I'm going through it now, doing some tests. I found that adding ObserveOn to subscriptions effectively slows them down about 8 times (compared to having lock on resource inside subscription, in my test no real work is done, just pumping events). I understand all benefits of rx, but such performance drop could not be ignored. Is that the case in real world as well, or only my test? Thank you, regards, Evgeny.
LikeLike
Thanks for taking a look Evgeny. I don't have enough information to make an educated comment about your tests as you don't provide enough information or a test suite to validate (e.g. which version of Rx, .NET and Scheduler you are using).
What I can offer is the following :
* Locking inside your Subscribe function ties the performance of your producer to the sum-performance of all of your consumers.
* Using ObserveOn, decouples your production from your consumption. Events are effectively en-queued and then processed by the provided scheduler as fast as it can.
* The performance of your ObserveOn will depend on the performance characteristics of the Scheduler you provide. e.g. I imagine a dedicated EventLoopShceduler may outperform a TaskPoolScheduler or ThreadPoolScheduer.
* Generally the ObserveOn method is used to run things on two separate threads. The classic example is in a client GUI application where data is fetch and processed on a background thread and then the UI is updated on the Dispatcher. Here the thread transition is required, so your example would be a non-starter.
LikeLike
Hello,
First of all thanks for your articles. They have been very educational and useful. Second is, can you please update the “Intro to Rx kindle edition” link?. I'm getting a 404. Here is the link: http://www.introtorx.com/content/v1.0.10621.0/Content/v1.0.10621.0/IntroToRx.mobi
LikeLike
Sorry, the link seems to be relative so only works correctly on the home page. This link should work fine
http://introtorx.com/Content/v1.0.10621.0/IntroToRx.mobi
A rewrite of the site is about to get under way to celebrate Rx3.0
LikeLike
Hi Lee,
do you have also a PDF version of your book?
Thanks
LikeLike
Hi Lee,
Many thanks for your really excellent 'Intro' book. Am intrigued by the reference to Rx3.0 in yr post on July 15 … is there an update to Rx in the works ? Can find no mention of it on GitHub.
LikeLike
Check out Bart's video on the future of Rx titled “Cloud-Scale Event Processing” here – https://vimeo.com/132192255
There were also some talk about this on various forums/back channels (https://gitter.im/Reactive-Extensions/Rx.NET)
LikeLike
Hey, I'm not sure whether it's a good place for my post, but there mistype on section http://www.introtorx.com/Content/v1.0.10621.0/05_Filtering.html#SkipUntilTakeUntil. Author talking about TakeUntil but uses TakeWhile name instead. Just FYI and to make book even better.
LikeLike
Could you please post the code for the MemberSearchViewModel.PropertyChanges method and the IMemberSearchModel.SearchMembers method in this code snippet of yours?
http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html#WhyRx
I am curious as to the return type and the implementation of the PropertyChanges method. I first thought it was a typographical error and that you were invoking the INotifyPropertyChanged.PropertyChanged delegate's invocation list, but then the subsequent call to the Subscribe method on the return value of PropertyChanges makes me curious.
LikeLike
You can see some content I have written about converting INotifyPropertyChanged events to Observable sequences over at RxCookbook – https://github.com/LeeCampbell/RxCookbook/blob/master/Model/PropertyChange.md
Note there is a difference in signature; the RxCookbook code uses 'OnPropertyChanges', but IntroToRx references 'PropertyChanges'
LikeLike
Thank you.
In your example demonstrating Observable.Create that uses a timer and returns the timer as the IDisposable, or the next example where you return an action that detaches the OnTimerElapsed event handler and then disposes the timer, would the first unsubscription by the first observer also just stop the observable from producing any further values for any other subscribers / observers?
http://www.introtorx.com/Content/v1.0.10621.0/04_CreatingObservableSequences.html
I know that that is a contrived example used simply to explain Observable.Create and I am not judging the usefulness of the example in real world code. Instead, I am just trying to validate my thinking and get a confirmation of my assumption.
LikeLike
Both examples will dispose of the timer, and therefore no longer produce any results once disposed. However, removing the eventHandler is a better solution as it is deterministicly and explicitly dealing with your resources.
LikeLike
Sure, I get that removing the event handler is nice. My question is — won't any other subscriber / observer attached to the observable stop receiving messages if one of the subscribers / observers calls Dispose() on its subscription?
//Example code only
var ob = Observable.Create(
observer =>
{
var timer = new System.Timers.Timer();
timer.Enabled = true;
timer.Interval = 100;
timer.Elapsed += OnTimerElapsed;
timer.Start();
return ()=>{
timer.Elapsed -= OnTimerElapsed;
timer.Dispose();
};
});
// first observer
using(ob.Subscribe(Console.WriteLine)) { }
// second one
using(ob.Subscribe(Console.WriteLine)
{
// Hey, why did you dispose off the timer?
// I am still interested.
}
LikeLike
No the second subscription wont affect the first subscription, as they will different internal instances of the Timer. This is covered in the book in sections Create/Resources/HotCold
LikeLike
Oh, yes. Because the delegate passed to create will be called upon every subscription with the observer that requested it, and so each time, for each subscription, a new timer will be created?
LikeLike
Thank you for amazing introtorx.com! One of the best programming books I’ve read – easy to follow, insightful – absolutely brilliant!
I wanted to made a small edit to the page but didn’t find a way to send feedback directly on the website.
On this page
http://www.introtorx.com/content/v1.0.10621.0/05_Filtering.html
there is a small code snippet right under the sentence “Unlike SkipLast, TakeLast does have to wait for the source sequence to complete to be able to push its results.”
Even though it illustrates the usage of TakeLast function, it contains SkipLast
LikeLike