Monday, 2 July 2012

Reactive Extensions (Unplugged) - Rx vs Events

In this blog post I am just going to compare what is looks like to raise an event the classic way versus using Reactive Extensions (Rx).

I don't get much time to blog, but when I do get the occasional free hour or two I want to do my best just to keep the post simple and maybe show of some re-usable snippets of code that other developers can use. If you do need a deeper understanding of Rx, it's definitely worth reading the various blog posts, and books you can find out there on the subject. I have been fortunate enough to have had the opportunity to use the Reactive Extensions Framework on various Silverlight projects and it caught my interest. I am not going to go through the absolute basics on Rx in my blog posts because that has been done a thousand times already, but will rather show you a few simple insights that you may be able to use. All you need is the Rx-Main package from nuget.

It showed me a simpler (and cleaner) way to work with events and event streams:
This includes the consumption, filtering, composition of event streams, and the scheduling of work on the required thread (which is always useful in our Single Threaded Apartment model).

In the code below you can see that I am using Rx at the bottom to raise an event by calling OnNext which will in turn call the printToConsole function. Additionally I have used filtering and specified that I would like to inform the consumer that the event is the only event in the stream by calling OnCompleted. Since the "handle" returned from the Subscribe method implements IDisposable I can use the "Using" keyword to ensure the unsubscribe occurs at the end of the block.

namespace RxDemoApp
{
    using System;
 
    using System.Reactive.Subjects;
    using System.Threading;
 
    using System.Reactive.Linq;
 
    /// <summary>
    /// Demonstrates the different ways of exposing and subscribing to events
    /// </summary>
   class RxVsEventsDemo
    {
 
        delegate string onChangedHandler(long num);
        event onChangedHandler onChanged;
 
        delegate string MyEventHandler<T>(T value);
        event MyEventHandler<long> onChanged1;
 
        event Func<longstring> onChanged2;
 
        Subject<long> onChanged3 = new Subject<long>();
 
        internal static void Run()
        {
            RxVsEventsDemo demo = new RxVsEventsDemo();
 
            Func<longstring> printToConsole = num => 
            { 
                string message = String.Format("Message on thread {1} : Output value : {0}", 
                    num, 
                    Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine(message);
                return message;
            };
 
            // Using classic event handler
            onChangedHandler handler = new onChangedHandler(printToConsole);
            demo.onChanged += handler;
            demo.onChanged(10);
            demo.onChanged -= handler;
 
            // Wiring an event handler to an anonymous delegate
            Func<longstring> func = arg => printToConsole(11);
            
            //onChangedHandler handler1 = func; //Does not work
            onChangedHandler handler1 = arg => printToConsole(11);
            demo.onChanged += handler1;
            demo.onChanged(11);
            demo.onChanged -= handler1;
 
            MyEventHandler<long> handler2 = new MyEventHandler<long>(printToConsole);
            demo.onChanged1 += handler2;
            demo.onChanged1(11);
            demo.onChanged1 -= handler2;
 
            // Using a func
            demo.onChanged2 += printToConsole;
            demo.onChanged2(30);
            demo.onChanged2 -= printToConsole;
 
            //using Rx
            using (demo.onChanged3
                .Where(arg => arg % 2 ==0) // Reactive.Linq extension methods also allow the filtering of events
                .Subscribe((arg) => printToConsole(arg), 
                () => Util.WriteToConsole<string>("Done!!"))) // An additional OnCompleted(and OnError) action used
            {
                demo.onChanged3.OnNext(41); // Never raised because 41 is not an even number i.e. it was filtered out
                demo.onChanged3.OnCompleted();
            }
        }
    }
}