Wednesday, 19 June 2013

Rx Operators contd.. (Aggregate, All, Amb, And, When, Then, Any, Average)

Aggregate Operator

public static IObservable<TSource> Aggregate<TSource>(this IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator);

Aggregate function applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The accumulator function is represented by Func<TSource,TSource,TSource>. This can be any func like (acc,i)=>{return acc+i;};

This functions returns ONLY single result representing the aggregate of the stream after OnComplete is called. What?? THEN WHY DOES IT RETURN AN OBSERVABLE - IT CONFUSES ME!

Exactly, the only reason it returns observable is to retain the asynchronous behaviour, the returned stream will have only one element.

Lets understand this with an example:-

        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            var myObservable = list.ToObservable().Aggregate((acc, i) => { return acc + i; });
            myObservable.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

Output:-



There are two more overloads of the Aggregate function having seed value and result selector. Seed in the initial value of accumulator, while result selector  can be used to apply transformations on produced values. To understand them, lets see another example. Let's say we want to sum up values from 1 to 10 in accumulator whose initial value is 5, and after the value is produced, we need to multiply it by a factor of 100.

        static void Main(string[] args)
        {

            var myObservable = Observable.Range(1, 10).Aggregate(5,
                            (acc, i) => { return acc + i; },
                            acc => { return acc * 100; }
                            );
            myObservable.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

The Output is: 6000

Note: There is another operator for aggregation behavior with incremental intermediate results - SCAN. We will discuss this later.


All Operator

This determines whether all elements of an observable sequence satisfy a given condition. Similar to average operator, this also returns an observable sequence containing a single element determining whether all elements in the source sequence pass the test in the specified predicate.

static void Main(string[] args)
        {

            var myObservable = (new List<int> { 2,4,6}).ToObservable().All(i => i % 2 == 0);

            myObservable.Subscribe(Console.WriteLine);

            Console.ReadKey();


        }

This returns true.


Amb  Operator

This is an interesting operator. Amb stands for 'ambiguous'. This returns the sequence that reacts first. Lets take a real world example. You have got two Market data services providing price ticks as observable sequence. You want that whichever service returns the first price, should be used, other one should be ignored. You will then use amb. Lets take an example:-

        static void Main(string[] args)
        {

            var myObservableStream1 = Observable.Range(10,10).Delay(TimeSpan.FromSeconds(.5));
            var myObservableStream2 = Observable.Range(21, 10);

            var myResultStream = Observable.Amb(myObservableStream1, myObservableStream2);

            myResultStream.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

Here we have intentionally delayed the first stream (kind of cheating... we want second stream to win!). The marble diagram can be drawn as follows:-


Output:-



And, When & Then Operator (Join Pattern)

• When(): At the very end says “Tell me when any of the patterns happen”
• And(): Creates a Join pattern by saying “Wait until A and B produce an item. These
can be concatenated, allowing you to wait for A and B and C.
• Then(): Once you finish combining using and, then works like select, it allows you
to combine the results into a value.

Example:-

        static void Main(string[] args)
        {

            var myObservableStream1 = Observable.Range(10,10);
            var myObservableStream2 = Observable.Range(21, 10);

            var myResultStream = Observable
                                    .When(myObservableStream1.And(myObservableStream2)
                                    .Then((l, r) => "Product of elements are:" + l * r));

            myResultStream.Subscribe(Console.WriteLine);

            Console.ReadKey();

        }

Output:-




Any  Operator

public static IObservable<bool> Any<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate);

This returns an observable sequence containing a single element determining whether any elements in the source sequence pass the test in the specified predicate.

Example:-
 static void Main(string[] args)
        {
            var myObservableStream1 = Observable.Range(10,10);
            var myResultStream = myObservableStream1.Any(e=>e%2==0);
            myResultStream.Subscribe(Console.WriteLine);
            Console.ReadKey();

        }

Result: True

Average  Operator
This is like aggregate operator, the only difference is that it provides the average as an output.

No comments:

Post a Comment