As we all know that while writing asynchronous programs, we are at times forced to execute things on some specific thread. For example WPF and Winforms follow STA model where any UI element can be updated only on the UI thread. So if we spawn a non-UI thread for some computation, we will have to send the result back to the UI thread and it will be responsible to update the UI. Traditionally this has been achieved using various synchronization mechanisms that sends messages to the UI message queue and the UI thread used that. For example in winforms, we used controls BeginInvoke/Invoke method, in WPF we use Dispatcher's synchronization context (Dispatcher is provided by DispatcherObject which is the base class from which DependencyObject derives).
In Rx, we achieve this using SubscribeOn and ObserveOn methods. They have overloads which accepts an IScheduler. Lets first talk about this interface, it has the following definition:-
IScheduler Interface
The Big Question: What do ObserveOn and SubscribeOn do?
There are two overloads of each of these methods one taking in an instance of IScheduler another taking an instance of SynchronizationContext:-
The Reactive Extensions ObserveOn operator changes the execution context (the thread) of the IObservable OnNext/OnComplete/OnError methods, whereas the SubscribeOn operator changes the execution context of the implementation of the Subscribe method in the chain of observers. Both methods can be useful to improve the performance of an application's UI by putting work on background threads.
So when do we use ObserveOn?
Use ObserveOn where you need to push observing logic on a specific thread, for example in windows UI applications.
An when do we use SubscribeOn?
Use this if you need to execute subscription code on any specific thread.Usually this is done to run what we call 'Side Effects' of subscription on separate thread.
Example of 'Side Effects'
Rx provides an extension method called Do() with below shown overloads. This provides a way to create side effects of subscribing. In production code we use this for This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
Lets examine this using an example:-
static void Main(string[] args)
{
var list = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
Console.WriteLine("UI thread id is:" + Thread.CurrentThread.ManagedThreadId);
var myObservable = list.ToObservable().SubscribeOn(Scheduler.NewThread)
.Do(_ => { Console.WriteLine("I am a side effect of subscription and running on thread id " + Thread.CurrentThread.ManagedThreadId); });
myObservable.ObserveOn(Scheduler.NewThread).Subscribe(_ => { Thread.Sleep(100); Console.WriteLine("I am OnNext and running on thread id " + Thread.CurrentThread.ManagedThreadId); });
Console.ReadKey();
}
The output for this code is:-
So you see, subscriptions are run on a new thread as shown by the message bumped out from the Do() method, while observeOn runs on a different thread. So point to remember here are that Do() method can be used to create subscription side effects and SubscribeOn defines the thread context of the subscription logic. On the other hand ObserveOn is much simpler to understand and it provides the thread context on which the OnNext method of the observer runs.
Next we will explore more Rx operators. Stay tuned!
No comments:
Post a Comment