Tuesday, 3 July 2012

Reactive Extensions (Unplugged) : Using Rx to write non-blocking code

Writing non-blocking code helps us ensure we have responsive UI's, save resources by not having blocked threads, and take advantage of multiple cores.

Rx provides us with a few extension methods on the Observable class that allow us to expose an IObservable instead of having to use the classic Asynchronous Programming model which can be a little combersome.

Here are a few very basic examples:

namespace RxDemoApp
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
 
    /// <summary>
    /// The various ways of making the GetPrimes function non-blocking or asynchronous
    /// </summary>
    class NonBlockingDemo
    {
        internal static void Run(IScheduler subscriptionScheduler, IScheduler observationScheduler, long maxPrimeNumber)
        {
            Func<longIEnumerable<long>> func = Util.GetPrimes;
 
            // Using classic APM
            AsyncCallback callback = ar =>
            {
                IEnumerable<long> result = func.EndInvoke(ar);
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            };
            func.BeginInvoke(maxPrimeNumber, callback, null);
 
            Util.EndExample("Completed APM invocation.");
 
            // Using FromAsyncPattern
            Func<longIObservable<IEnumerable<long>>> ofunc1 = Observable.FromAsyncPattern<longIEnumerable<long>>
                (func.BeginInvoke, func.EndInvoke);
            ofunc1(maxPrimeNumber).SubscribeOn(subscriptionScheduler).ObserveOn(observationScheduler)
                .Subscribe(result =>
            {
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            });
 
            Util.EndExample("Completed FromAsyncPattern invocation.");
 
            // Using ToAsync
            Func<longIObservable<IEnumerable<long>>> ofunc2 = func.ToAsync();
            ofunc2(maxPrimeNumber).SubscribeOn(subscriptionScheduler).ObserveOn(observationScheduler)
                .Subscribe(result =>
            {
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            });
 
            Util.EndExample("Completed ToAsync invocation.");
 
            //Observable.Start
            Observable.Start(() => Util.GetPrimes(maxPrimeNumber)).SubscribeOn(subscriptionScheduler)
                .ObserveOn(observationScheduler)
                .Subscribe(result =>
            {
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            });
 
            Util.EndExample("Completed Observable.Start invocation.");
        }
    }
}