When I teach people how to use Rx, for example on the PracticalRx course, I avoid the usage of the Subject types. I also strongly discourage others from using them, as in my opinion they are an anti pattern. I find that they are generally a code smell that indicates a deeper design issue.

I find myself telling people to not use them (in a book, on forums, on chat channels), but often I am challenged on this stance where people don’t know how to go about solving their problem without one. The problem here is each case is different, so really requires me to look at the code using the Subject<T> and the consumers of the code.

The GOTO of Rx?

I find that Subject<T> somewhat like the goto statement in C#. It is there, you could use it, but should you? Just like when someone challenges your guidance to not use Subjects, when guiding people away from goto it helps to know the context;

  • are they trying to conditionally run some code
  • are they trying to loop
  • are they trying to exit the sub routine early
  • is this some sort of error handling

Each of these scenarios has a better language construct to solve the problem (if/elsefor, return, try/catch), but to give guidance, you need to know what they were trying to achieve. As is the case when providing guidance for avoiding Subjects.

The IProgress challenge

On the Rx Slack channel, Stephen Cleary (author of  Concurrency in C# Cookbook) asked the specific question (https://aspnetcore.slack.com/archives/rx/p1474656404000393)

stephencleary
@leecampbell @onovotny I have a `Subject`-based implementation of `IProgress<T>` here: https://gist.github.com/StephenCleary/4248e50b4cb52b933c0d
The idea is to convert `IProgress<T>.Report` calls into data items in the observable stream.
Is there a better way to do this without using `Subject`?
Posted in #rxSept 24th at 2:46 AM

It is far easier to give guidance on a specific problem than abstractly declare that Subjects are Evil, which is nonsense.

In Stephen’s case we just need to remember all we need is one piece of code to call IProgress<T>.Report(value) and to have that translate to calling our subscriber’s IObserver<T>.OnNext(value) handler.

So our goal is to create something that allows this code flow

LongRunningTask -> progress.Report(value) -> observer.OnNext(value)

So initially it would look like we have several options, either:

  1. Something that implemented IProgress<T> and IObservable<T>
  2. or, something that implemented IProgress<T> and accepted an IObserver<T> as a parameter
  3. or, something that composed two delegate implementations of  IProgress<T> and IObservable<T>

Personally, I immediately discard option 1 following my own guidance .

Avoid creating your own implementations of the IObservable<T> interface.

Next I look at option 2 and see that it will end up being something very close to actually implementing the IObservable<T> interface i.e. accepting an IObserver<T> as a parameter.

public interface IObservable<T>
{
   IDisposable Subscribe(IObserver<T> observer);
}

So is there any way that I can use number three?

First I see that Progress<T> is an existing implementation of IProgress<T> that just takes a delegate. So this is heartening. Next I know that I will always need to call a method that takes an IProgress<T> argument as a parameter to invoke the long running task. This guides me to thinking I will need some sort of method that takes an Action<IProgress> parameter. The method should also return an IObservable<T>. So my first stab at this is

IObservable<T> ProgressToObservable<T>(Action<IProgress<T>> task)
{
   throw new NotImplementedException();
}

Next I want to see how this would feel to use. So with a dummy implementation of a long running task, I see how it plays out.

private void Solve(IProgress<int> progress)
{
    for (int i = 0; i < 100; i++)
    {
        Thread.Sleep(10);
        progress.Report(i);
    }
}

void Main()
{
    ProgressToObservable(Solve).Subscribe(
      i=>Console.WriteLine(i)
      ()=>Console.WriteLine("Done"));
}

That seems ok. So what would the implementation look like? Would I need a Subject<T>? Going with my default tool, I use Observable.Create. I should be able to dovetail that together with the Progress<T> implementation.

IObservable<T> ProgressToObservable<T>(Action<IProgress<T>> task)
{
    return Observable.Create<T>(obs =>
    {
        task(new Progress<T>(obs.OnNext));
        return Disposable.Empty;
    });
}

Ok, cool. I think that will work. However I note that the naming isn’t great and I also don’t complete the sequence when the task is completed. Actually, naming that parameter task makes me think that it is a Task<T> so lets rename that too.

public static class ObservableProgress
{
    public static IObservable<T> Create<T>(this Action<IProgress<T>> action)
    {
        return Observable.Create<T>(obs =>
        {
            action(new Progress<T>(obs.OnNext));
            obs.OnCompleted();
            //No apparent cancellation support.
            return Disposable.Empty;
        });
    }
}

I think that solves our problem, and in 13 lines of code (6 of which are curlies and 1 of which is a comment). This gives us a great platform to add cancellation support, TPL (async/await) support, Synchronization/Scheduling support and any other feature that Rx is strong on delivering.

Show me more

To see a fully fleshed out code sample, check out the page in RxCookBook on IProgress. Here we explore why Progress<T> might not actually be appropriate and how to add support for Tasks (async/await) as well.

More on the Subject debate

Here are some old discussions on the pros and cons on Subjects.