Thursday, 20 June 2013

Delay


Delay method is used to delay the stream for a certain period of time. Please take care that this will be storing the elements lapsed during the delayed time interval in memory, so this needs to be used very carefully as the memory can grow quite quickly.

Let us take an example:-

        static void Main(string[] args)
        {
            var myObservable = Observable.Interval(TimeSpan.FromSeconds(1));
         
            myObservable.Delay(TimeSpan.FromSeconds(5)).Subscribe(num =>
            {
                Console.WriteLine(num);
            });        
         
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
                     
        }


This will create a stream which is delayed by 5 seconds. Note that the elements observed in every five second window will be cached and replayed.




Wednesday, 19 June 2013

Zip & Merge


CombineLatest Method

Ractive UI


Asynchronous Interactions


Hot Vs Cold Observable

Note: I will add a topic on TestSchedulers prior to this topic.

Cold Vs Hot !

Let's first discuss a scenario of Enumeration world. We need a function that bumps out 5 integers from 1 to 5 . We can write them in two ways:-


        public IEnumerable<int> GetMeValue()
        {
            return new List<int> { 1, 2, 3, 4, 5 };
        }

AND

        public IEnumerable<int> GetMeValue()
        {
            for (int i = 1; i <= 5; i++)
                yield return i;
        }

Difference between these two implementation is that in the second implementation, values are created and returned on the fly. Unless we use this, values are not created or we can say this is COLD. But in the first implementation, all values are created as soon as this method is called. We can say this is HOT!


Similarly for observables we can say that they are HOT when they are being produced regardless of whether some subscriber is there to consume them, e.g. MouseMove event. On the other hand, if they are only published if subscribers exist, they are COLD observable.

LeeCampbell explains:-

COLD Observable - Streams that are passive and start publishing on request,
HOT Observable - Streams that are active and publish regardless of subscriptions.


Observable.Defer method can be used to convert a hot observable to cold ans Observable.Publish can be used to convert a cold observable to hot.

Copied from http://stackoverflow.com/questions/2521277/what-are-the-hot-and-cold-observables

Hot observable is an exact match for event. In events, values usually are fed into the handler even if no subscribers are listening. All subscribers are receiving the same set of values. Because of following the "event" pattern, hot observables are easier to understand than the cold ones.
Cold observable is also like an an event, but with a twist - Cold observable's event is not a property on a shared instance, it is a property on an object that is produced from a factory each time when somebody subscribes. In addition, subscription starts the production of the values. Because of the above, multiple subscribers are isolated and each receives its own set of values.
The most common mistake RX beginners make is creating a cold observable (well, thinking they are creating a cold observable) using some state variables within a function (f.e. accumulated total) and not wrapping it into a .Defer() statement. As a result, multiple subscribers share these variables and cause side effects between them.

Things to do: Get hands dirty on Defer and Publish


Multicast, Publish & IConnectableObservable

Coming Soon...

Buffer & Window

Buffer

Buffer has got number of overloads to suit your needs. All it does is chops the stream into windows and returns observable of lists of elements. Lets start with the following example and dig down later:-


        static void Main(string[] args)
        {
           var input = new[] {1,2,3,4,5}.ToObservable();
            int i = 0;
            input.Buffer(2).Subscribe(list => Console.WriteLine(list.Sum()));
            Console.ReadKey();

        }

Window
Window returns <IObservable<IObservable<T>>> which make it a bit complex, but this method is extremely useful. Unlike the Buffer, which is caching the item internally, the Window does not cache the items at all, each item is immediately project through IObservable<T>.OnNext.



Following Marble diagram will show the differences between two operators:-




Following is an example of using a window to find Max prices. The window is non-overlapping one containing five elements each and pumping out maximum for each window. If you are familiar to candlestick charts, this is the same thing as the high of each candlestick.


    class Program
    {
        static void Main(string[] args)
        {
            var input = GetPrice().ToObservable();

            var windowMax = from window in input.Window(2, 1)
                                   from item in window.Max()
                                   select item;

            windowMax.Subscribe(Console.WriteLine);
   
            Console.ReadKey();
        }

        private static IEnumerable<double> GetPrice()
        {
            var r = new Random(DateTime.Now.Millisecond);

            while (true)
            {
                Thread.Sleep(500);
                yield return Math.Round(r.NextDouble(), 2);
            }
        }

    }



The output:-


Rx Operators contd.. (Aggregate, All, Amb, And, When, Then, Any, Average)

Aggregate Operator

public static IObservable<TSource> Aggregate<TSource>(this IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator);

Aggregate function applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The accumulator function is represented by Func<TSource,TSource,TSource>. This can be any func like (acc,i)=>{return acc+i;};

This functions returns ONLY single result representing the aggregate of the stream after OnComplete is called. What?? THEN WHY DOES IT RETURN AN OBSERVABLE - IT CONFUSES ME!

Exactly, the only reason it returns observable is to retain the asynchronous behaviour, the returned stream will have only one element.

Lets understand this with an example:-

        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            var myObservable = list.ToObservable().Aggregate((acc, i) => { return acc + i; });
            myObservable.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

Output:-



There are two more overloads of the Aggregate function having seed value and result selector. Seed in the initial value of accumulator, while result selector  can be used to apply transformations on produced values. To understand them, lets see another example. Let's say we want to sum up values from 1 to 10 in accumulator whose initial value is 5, and after the value is produced, we need to multiply it by a factor of 100.

        static void Main(string[] args)
        {

            var myObservable = Observable.Range(1, 10).Aggregate(5,
                            (acc, i) => { return acc + i; },
                            acc => { return acc * 100; }
                            );
            myObservable.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

The Output is: 6000

Note: There is another operator for aggregation behavior with incremental intermediate results - SCAN. We will discuss this later.


All Operator

This determines whether all elements of an observable sequence satisfy a given condition. Similar to average operator, this also returns an observable sequence containing a single element determining whether all elements in the source sequence pass the test in the specified predicate.

static void Main(string[] args)
        {

            var myObservable = (new List<int> { 2,4,6}).ToObservable().All(i => i % 2 == 0);

            myObservable.Subscribe(Console.WriteLine);

            Console.ReadKey();


        }

This returns true.


Amb  Operator

This is an interesting operator. Amb stands for 'ambiguous'. This returns the sequence that reacts first. Lets take a real world example. You have got two Market data services providing price ticks as observable sequence. You want that whichever service returns the first price, should be used, other one should be ignored. You will then use amb. Lets take an example:-

        static void Main(string[] args)
        {

            var myObservableStream1 = Observable.Range(10,10).Delay(TimeSpan.FromSeconds(.5));
            var myObservableStream2 = Observable.Range(21, 10);

            var myResultStream = Observable.Amb(myObservableStream1, myObservableStream2);

            myResultStream.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

Here we have intentionally delayed the first stream (kind of cheating... we want second stream to win!). The marble diagram can be drawn as follows:-


Output:-



And, When & Then Operator (Join Pattern)

• When(): At the very end says “Tell me when any of the patterns happen”
• And(): Creates a Join pattern by saying “Wait until A and B produce an item. These
can be concatenated, allowing you to wait for A and B and C.
• Then(): Once you finish combining using and, then works like select, it allows you
to combine the results into a value.

Example:-

        static void Main(string[] args)
        {

            var myObservableStream1 = Observable.Range(10,10);
            var myObservableStream2 = Observable.Range(21, 10);

            var myResultStream = Observable
                                    .When(myObservableStream1.And(myObservableStream2)
                                    .Then((l, r) => "Product of elements are:" + l * r));

            myResultStream.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

Output:-




Any  Operator

public static IObservable<bool> Any<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate);

This returns an observable sequence containing a single element determining whether any elements in the source sequence pass the test in the specified predicate.

Example:-
 static void Main(string[] args)
        {
            var myObservableStream1 = Observable.Range(10,10);
            var myResultStream = myObservableStream1.Any(e=>e%2==0);
            myResultStream.Subscribe(Console.WriteLine);
            Console.ReadKey();

        }

Result: True

Average  Operator
This is like aggregate operator, the only difference is that it provides the average as an output.

Tuesday, 18 June 2013

Rx Operators - Basic stream filtering (Take,Skip, First, Last etc..)

Take

There are four overloads in all, the first one takes the specified number of contiguous elements, the second will take elements till the end of specified timespan. Other two overloads are taking IScheduler instance fro synchronization if required.



TakeLast, TakeLastBuffer, TakeUntil, TakeWhile

TakeLast will confuse many. Since observables are future collection, how will we know if we will reach the end of stream exactly after n elements. To explain this, in reality, TakeLast is delayed. This operator acumulates a buffer with a length enough to store elements count elements. Upon completion of the source sequence, this buffer is drained on the result sequence.

TakeLastBuffer behaves exactly the same. The only difference is that it returns a observable of list of source element i.e. the contents of the entire buffer upon reaching the end of stream. On the contrary TakeLast returns Observable of elements.


The following overload comes handy when we require to control the stream using some external functions:-

public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate);



Skip

Similarly skip has got three overloads.


SkipLast, SkipUntil & SkipWhile

We also have following skip functions useful for filtering out elements from observable stream:-



First, FirstOrDefault, FirstAsync, FirstOrDefaultAsync

First returns the first element of an observable sequence, while FirstAsync returns the sequence containing the first element in the observable sequence.

FirstOrDefault returns the first element in the observable sequence, or a default value if no such element exists. FirstOrDefaultAsync returns the sequence containing  first element in the observable sequence, or a default value if no such element exists.


Last, LastOrDefault, LastAsync, LastOrDefaultAsync



Last returns the Last element of an observable sequence, while LastAsync returns the sequence containing the Last element in the observable sequence.

LastOrDefault returns the Last element in the observable sequence, or a default value if no such element exists. LastOrDefaultAsync returns the sequence containing  Last element in the observable sequence, or a default value if no such element exists.



Schedulers - Managing Reactive Concurrency


As we all know that while writing asynchronous programs, we are at times forced to execute things on some specific thread. For example WPF and Winforms follow STA model where any UI element can be updated only on the UI thread. So if we spawn a non-UI thread for some computation, we will have to send the result back to the UI thread and it will be responsible to update the UI. Traditionally this has been achieved using various synchronization mechanisms that sends messages to the UI message queue and the UI thread used that. For example in winforms, we used controls BeginInvoke/Invoke method, in WPF we use Dispatcher's synchronization context (Dispatcher is provided by DispatcherObject which is the base class from which DependencyObject derives).

In Rx, we achieve this using SubscribeOn and ObserveOn methods. They have overloads which accepts an IScheduler. Lets first talk about this interface, it has the following definition:-

IScheduler Interface



The Big Question: What do ObserveOn and SubscribeOn do?

There are two overloads of each of these methods one taking in an instance of IScheduler another taking an instance of SynchronizationContext:-



The Reactive Extensions ObserveOn operator changes the execution context (the thread) of the IObservable OnNext/OnComplete/OnError methods, whereas the SubscribeOn operator changes the execution context of the implementation of the Subscribe method in the chain of observers. Both methods can be useful to improve the performance of an application's UI by putting work on background threads.

So when do we use ObserveOn?
Use ObserveOn where you need to push observing logic on a specific thread, for example in windows UI applications.

An when do we use SubscribeOn?
Use this if you need to execute subscription code on any specific thread.Usually this is done to run what we call 'Side Effects' of subscription on separate thread.

Example of 'Side Effects'

Rx provides an extension method called Do() with below shown overloads. This provides a way to create side effects of subscribing. In production code we use this for This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.



Lets examine this using an example:-

        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            Console.WriteLine("UI thread id is:" + Thread.CurrentThread.ManagedThreadId);

            var myObservable = list.ToObservable().SubscribeOn(Scheduler.NewThread)
                .Do(_ => { Console.WriteLine("I am a side effect of subscription and running on thread id " + Thread.CurrentThread.ManagedThreadId); });

            myObservable.ObserveOn(Scheduler.NewThread).Subscribe(_ => { Thread.Sleep(100); Console.WriteLine("I am OnNext and running on thread id " + Thread.CurrentThread.ManagedThreadId); });

            Console.ReadKey();

        }


The output for this code is:-



So you see, subscriptions are run on a new thread as shown by the message bumped out from the Do() method, while observeOn runs on a different thread. So point to remember here are that Do() method can be used to create subscription side effects and SubscribeOn defines the thread context of the subscription logic. On the other hand ObserveOn is much simpler to understand and it provides the thread context on which the OnNext method of the observer runs.


Next we will explore more Rx operators. Stay tuned!

Throttling



Throttling Usage:
{MyObservable}.Throttle(TimeSpan.FromSeconds(2))

This will throttle the stream with timespan of 2 seconds. This means that it will buffer events and keep releasing in chunks after every 2 seconds:-




Ok, so after understanding what Throttle does, we can easily throttle our observable:-

var textChangedEvent = Observable.FromEventPattern(txtWord, "TextChanged").Throttle(TimeSpan.FromSeconds(2));


But we have got a problem here. as soon as we introduce throttle, concurrency issues occur. Try and run the application and you will encounter this problem. Since the observable is running on a different thread, it cannot modify UI elements, so we need to somehow make use of the UI synchronization context. So here comes Schedulers, which we will cover in detail in later topics. Lets change the code as follows:-

        private void Window_Loaded_1(object sender, RoutedEventArgs e)
        {
            var uiSyncContext = SynchronizationContext.Current;

            var textChangedEvent = Observable.FromEventPattern(txtWord, "TextChanged").Throttle(TimeSpan.FromSeconds(2)).ObserveOn(uiSyncContext);
         
            var disposable = textChangedEvent.Subscribe(_ =>
                {                
                        txtBlock.Text = txtWord.Text;
                }
               );

            textChangedEvent.Subscribe(x =>
                {
                    var count=txtWord.Text.Split(' ').Count();
                    this.Title = "Word Count= " + count + x.GetType().ToString();
                    if(count>=10)
                        disposable.Dispose();
                }
                );
        }

Note: I am intentionally not using Rx Schedulers here to keep things simple.

This will do exactly what we need, update textblock after every two seconds and synchronize with textbox. Another very good place where throttling can be used is where we want to implement functionality like google suggest but we don't want to bug the server with many requests.

Unsubscribing to Observables using IDisposable

In continuation to my last post, now we have requirement that the subscription should be disposed as soon as the word count reaches 10.

So the new behavior should be something like:-



Note that the textblock only copies 10 items and then unsubscribes. This can be done using the instance of IDisposable that is returned when we do a subscription. In previous examples we just ignored them.


Now the new C# code will look like:-

        private void Window_Loaded_1(object sender, RoutedEventArgs e)
        {
            var textChangedEvent = Observable.FromEventPattern(txtWord, "TextChanged");
         
            var disposable = textChangedEvent.Subscribe(_ => txtBlock.Text = txtWord.Text);

            textChangedEvent.Subscribe(_ =>
                {
                    var count=txtWord.Text.Split(' ').Count();
                    this.Title = "Word Count= " + count;
                    if(count>=10)
                        disposable.Dispose();
                }
                );
        }


Now this thing has started pleasing me. Rx is all over me and if I be imaginative, I know it has numerous applications? Are you with me? Hey wait.. this is just the beginning. Rx is much more powerful.

Rx makes things easy by allowing us to do operations on observables. Suppose your manager comes to you and asks you to change the above code so that text in textbox will get copied to textblock only when user has finished writing a word or a sentence (i.e. when a space or full stop is encountered). No problem, the changed code should look like:-

        private void Window_Loaded_1(object sender, RoutedEventArgs e)
        {
            var textChangedEvent = Observable.FromEventPattern(txtWord, "TextChanged");
         
            var disposable = textChangedEvent.Subscribe(_ =>
                {
                    if (txtWord.Text.ToCharArray().Last() == ' ' || txtWord.Text.ToCharArray().Last() == '.')
                        txtBlock.Text = txtWord.Text;
                }
               );

            textChangedEvent.Subscribe(_ =>
                {
                    var count=txtWord.Text.Split(' ').Count();
                    this.Title = "Word Count= " + count;
                    if(count>=10)
                        disposable.Dispose();
                }
                );
        }
    }


Then he comes back and asks you to copy texts only after every 2 seconds. For this, Rx provides another cool extension method called throttle. Stay tuned...


Creating Observables. contd..

Just like creating observables with Observable.Return, we can create an observable stream which actually returns nothing, i.e. it calls OnCompleted() directly:-


Why do we have Observable.Empty() and Observable.return() when we could just return a value or null?

This is to support an existing design where some code expects an observable. For example say, we are mocking something in unit tests and we have to provide an Observable stream. We don't care about the actual data, but for code to compile, an observable is required. There will be many scenarios where these could be used once you start thinking in a reactive manner.


Observable.Range()

This is comparable to Enumerable.Range of the IEnumerable world. This will basically create a range of Observables and will call OnCompleted in the end. So the marble diagram for Observable.Range(10,4) will be:-



Simple isn't it?


Creating Observables from lists and arrays:-

From lists:-

            List<int> mylist = new List<int>{1,2,3,4,5,6};
            var myObseravle = mylist.ToObservable();


From Arrays:-

            int[] myArray = new int[] { 1, 2, 3, 4, 5, 6 };
            var myObseravle = myArray.ToObservable();


We can consume above observables easily by subscribing them. This was easy and was easy to understand and till now you might be wondering if this was of much use. Now lets start creating Observable streams from more complex things - this will be of much use.


Creating observables from Events

There are two methods which can be used for converting events to streams of observables:-
Observable.FromEvent(..)
Observable.FromEventPattern(..)

Both these methods exhibit similar behaviour. The difference as explained by Bart De Smet [MSFT] from microsoft team is:-
"Use FromEvent for events structurally don't look like a .NET event pattern (i.e. not based on sender, event args), and use FromEventPattern for the pattern-based ones. The sample code show for FromEvent above violates this distinction and uses an EventHandler inside (which indicates the use of the pattern)."


Let us write a simple wpf application with a textbox and a textblock. The requirement is that as the user types the text in the textbox, it should get copied into the textblock.




1. Create a new WPF application.
2. Open Mainwindow.xaml and add the following xaml:-

    <StackPanel>
        <TextBox x:Name="txtWord" />
        <TextBlock x:Name="txtBlock" Height="200"/>
    </StackPanel>


3. Add handler for window loaded and copy paste the following code:-

        private void Window_Loaded_1(object sender, RoutedEventArgs e)
        {
            var textChangedEvent = Observable.FromEventPattern(txtWord, "TextChanged");
            textChangedEvent.Subscribe(_ => txtBlock.Text = txtWord.Text);
        }


that's is it. This will be enough to achieve the desired functionality.

Ok, so far so good. Now suddenly there is a new requirement. We need to display the total word count in the title bar while the user is typing.


No problem. Just a matter of another subscription. Add the following line to the cs code and we are all done:-

            textChangedEvent.Subscribe(_ => this.Title = "Word Count= " + txtWord.Text.Split(' ').Count());


This was easy!

But now there is another requirement. Once the word count reaches 10, we don't want more in the textblock. User can still type, but it won't be copied in textblock.

Now we need to know how to unsubscribe! Stay tuned for my next post.

Lets start creating Observables.

The easiest way of creating a stream of observable is using Observable.Return (Make sure you get Rx-Main from Nuget packages before doing this. Also include using System.Reactive.Linq namespace).



What will this statement do?
Observable.Return(10);

This can be represented using the following 'marble diagram'

This will call OnNext(10)  of the observer only once and then will call OnCompleted();

So Lets write this piece of code:-
var myObservable =  Observable.Return(10);

Congratulations!! you have just written your first observable.


Now What do I do with it? Lets subscribe to it and use as I would use a stream of data:-


 Console.WriteLine("Subscribing to my first observable");
            myObseravle.Subscribe(
                x => { Console.WriteLine("Next Value: "+ x); }
                );

You should see something like this:-




Now lets see if OnCompleted was called. Lets add another piece of lambda as follows:-

            myObseravle.Subscribe(
                x => { Console.WriteLine("Next Value: " + x); },
                () => { Console.WriteLine("Sequence Completed"); }
                );


The new output:-



So we know that this behaves exactly as it was shown in the marble diagram.

What is Rx?

Reactive extensions is extension of LINQ, the only difference being that it is based on Observables instead of Enumerables. Parallelism comes for free, and so does problems arising from concurrency. To handle those problems Rx provides schedulers So In all Rx can be assumed to be the combination of "Obseravables+LINQ+Schedulers"


Key interfaces that Rx uses are IObserver & IObservable

Observer provides mechanism through which others can observe using OnNext() method. It also provides OnError() and OnCompleted() Methods. Sample implementation from MSDN:-
using System;

public class LocationReporter : IObserver<Location>
{
   private IDisposable unsubscriber;
   private string instName;

   public LocationReporter(string name)
   {
      this.instName = name;
   }

   public string Name
   {  get{ return this.instName; } }

   public virtual void Subscribe(IObservable<Location> provider)
   {
      if (provider != null) 
         unsubscriber = provider.Subscribe(this);
   }

   public virtual void OnCompleted()
   {
      Console.WriteLine("The Location Tracker has completed transmitting data to {0}.", this.Name);
      this.Unsubscribe();
   }

   public virtual void OnError(Exception e)
   {
      Console.WriteLine("{0}: The location cannot be determined.", this.Name);
   }

   public virtual void OnNext(Location value)
   {
      Console.WriteLine("{2}: The current location is {0}, {1}", value.Latitude, value.Longitude, this.Name);
   }

   public virtual void Unsubscribe()
   {
      unsubscriber.Dispose();
   }
}



IObservable contains only one method Subscribe() using which observers can 
subscribe to it. Sample MSDN example:-

public class LocationTracker : IObservable<Location>
{
   public LocationTracker()
   {
      observers = new List<IObserver<Location>>();
   }

   private List<IObserver<Location>> observers;

   public IDisposable Subscribe(IObserver<Location> observer) 
   {
      if (! observers.Contains(observer)) 
         observers.Add(observer);
      return new Unsubscriber(observers, observer);
   }

   private class Unsubscriber : IDisposable
   {
      private List<IObserver<Location>>_observers;
      private IObserver<Location> _observer;

      public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
      {
         this._observers = observers;
         this._observer = observer;
      }

      public void Dispose()
      {
         if (_observer != null && _observers.Contains(_observer))
            _observers.Remove(_observer);
      }
   }

   public void TrackLocation(Nullable<Location> loc)
   {
      foreach (var observer in observers) {
         if (! loc.HasValue)
            observer.OnError(new LocationUnknownException());
         else
            observer.OnNext(loc.Value);
      }
   }

   public void EndTransmission()
   {
      foreach (var observer in observers.ToArray())
         if (observers.Contains(observer))
            observer.OnCompleted();

      observers.Clear();
   }
}