When I teach people how to use Rx, for example on the PracticalRx course, I avoid the usage of the Subject types. I also strongly discourage others from using them, as in my opinion they are an anti pattern. I find that they are generally a code smell that indicates a deeper design issue.
I find myself telling people to not use them (in a book, on forums, on chat channels), but often I am challenged on this stance where people don’t know how to go about solving their problem without one. The problem here is each case is different, so really requires me to look at the code using the Subject<T>
and the consumers of the code.
The GOTO of Rx?
I find that Subject<T>
somewhat like the goto statement in C#. It is there, you could use it, but should you? Just like when someone challenges your guidance to not use Subjects, when guiding people away from goto
it helps to know the context;
- are they trying to conditionally run some code
- are they trying to loop
- are they trying to exit the sub routine early
- is this some sort of error handling
Each of these scenarios has a better language construct to solve the problem (if
/else
, for
, return
, try
/catch
), but to give guidance, you need to know what they were trying to achieve. As is the case when providing guidance for avoiding Subjects.
The IProgress challenge
On the Rx Slack channel, Stephen Cleary (author of Concurrency in C# Cookbook) asked the specific question (https://aspnetcore.slack.com/archives/rx/p1474656404000393)
stephencleary
@leecampbell @onovotny I have a `Subject`-based implementation of `IProgress<T>` here: https://gist.github.com/StephenCleary/4248e50b4cb52b933c0d
The idea is to convert `IProgress<T>.Report` calls into data items in the observable stream.
Is there a better way to do this without using `Subject`?
Posted in #rxSept 24th at 2:46 AM
It is far easier to give guidance on a specific problem than abstractly declare that Subjects are Evil, which is nonsense.
In Stephen’s case we just need to remember all we need is one piece of code to call IProgress<T>.Report(value)
and to have that translate to calling our subscriber’s IObserver<T>.OnNext(value)
handler.
So our goal is to create something that allows this code flow
LongRunningTask -> progress.Report(value) -> observer.OnNext(value)
So initially it would look like we have several options, either:
- Something that implemented
IProgress<T>
andIObservable<T>
- or, something that implemented
IProgress<T>
and accepted anIObserver<T>
as a parameter - or, something that composed two delegate implementations of
IProgress<T>
andIObservable<T>
Personally, I immediately discard option 1 following my own guidance .
Avoid creating your own implementations of the IObservable<T> interface.
Next I look at option 2 and see that it will end up being something very close to actually implementing the IObservable<T>
interface i.e. accepting an IObserver<T>
as a parameter.
public interface IObservable<T> { IDisposable Subscribe(IObserver<T> observer); }
So is there any way that I can use number three?
First I see that Progress<T>
is an existing implementation of IProgress<T>
that just takes a delegate. So this is heartening. Next I know that I will always need to call a method that takes an IProgress<T>
argument as a parameter to invoke the long running task. This guides me to thinking I will need some sort of method that takes an Action<IProgress>
parameter. The method should also return an IObservable<T>
. So my first stab at this is
IObservable<T> ProgressToObservable<T>(Action<IProgress<T>> task) { throw new NotImplementedException(); }
Next I want to see how this would feel to use. So with a dummy implementation of a long running task, I see how it plays out.
private void Solve(IProgress<int> progress) { for (int i = 0; i < 100; i++) { Thread.Sleep(10); progress.Report(i); } } void Main() { ProgressToObservable(Solve).Subscribe( i=>Console.WriteLine(i) ()=>Console.WriteLine("Done")); }
That seems ok. So what would the implementation look like? Would I need a Subject<T>
? Going with my default tool, I use Observable.Create
. I should be able to dovetail that together with the Progress<T>
implementation.
IObservable<T> ProgressToObservable<T>(Action<IProgress<T>> task) { return Observable.Create<T>(obs => { task(new Progress<T>(obs.OnNext)); return Disposable.Empty; }); }
Ok, cool. I think that will work. However I note that the naming isn’t great and I also don’t complete the sequence when the task is completed. Actually, naming that parameter task
makes me think that it is a Task<T>
so lets rename that too.
public static class ObservableProgress { public static IObservable<T> Create<T>(this Action<IProgress<T>> action) { return Observable.Create<T>(obs => { action(new Progress<T>(obs.OnNext)); obs.OnCompleted(); //No apparent cancellation support. return Disposable.Empty; }); } }
I think that solves our problem, and in 13 lines of code (6 of which are curlies and 1 of which is a comment). This gives us a great platform to add cancellation support, TPL (async/await) support, Synchronization/Scheduling support and any other feature that Rx is strong on delivering.
Show me more
To see a fully fleshed out code sample, check out the page in RxCookBook on IProgress. Here we explore why Progress<T>
might not actually be appropriate and how to add support for Tasks (async/await) as well.
More on the Subject debate
Here are some old discussions on the pros and cons on Subjects.
- http://stackoverflow.com/questions/14396449/why-are-subjects-not-recommended-in-net-reactive-extensions/14460634#14460634
- http://davesexton.com/blog/post/To-Use-Subject-Or-Not-To-Use-Subject.aspx
- http://stackoverflow.com/questions/9299813/rx-subjects-are-they-to-be-avoided
- https://social.msdn.microsoft.com/Forums/en-US/2d6b73f1-0340-4480-8ea2-2c71a4721116/when-to-use-rx-subjects?forum=rx
- http://akarnokd.blogspot.com.au/2015/06/subjects-part-1.html
- http://tomstechnicalblog.blogspot.com.au/2016/03/rxjava-problem-with-subjects.html
- https://upday.github.io/blog/subjects_neither_good_nor_evil/
- http://futurice.com/blog/reactive-c-number-in-practice
Interesting approach… but I’m wandering if it is always possible to replace Subject? What I mean, e.g if you need to generate not a progress messages, but something like StatusChanged? Because progress usually associated with one method we could use your approach, but status of the Component or Service could be changed in different places of the class (Start, Stop, etc. methods). Here is my little snippet of code https://gist.github.com/sleemer/94b53370ab8ff552d31689704601367a.
LikeLike
This is a more commonly accepted usage of Subjects, or very specifically `BehaviorSubject`.
However it is equally valid to just use a property that exposes notification with perhaps `INotifyPropertyChanged`.
This reduces it to a style choice, but definitely not a case of “couldn’t be done with out a subject”.
I like your sample. I am sure that there are more improvements that we could squeeze out of it still.
LikeLike
Is it possible to implement a RetryWhen without the Subject things?
https://github.com/Reactive-Extensions/Rx.NET/issues/449
LikeLike
Probably, i havn’t looked. I would suggest looking to the implementations in other languages (if they exist). It also appears that the linked Issue suffers from mixing Rx and Tasks (IMO) inelegantly. Maybe also named poorly. I would suggest what he they have there is ‘RetyWhile’ not ‘RetryWhen’
LikeLiked by 1 person
Thank you for the reply.I mean the implementation that akarnokd commented following the question.The BehaviorSubject<IObservable> can be replace by a Create<IObservable> and recursive schedule maybe?But i have no idea how to get rid of the Subject.Also, I tried to look into the rxjava’s implementation of retryWhen, but no lucky it uses a bunch of internal classes rather than the combination of basic operators.And I’m so agree with the naming of RetryWhile. Anyway, thank you very much.
LikeLike