Tuesday, 3 July 2012

Reactive Extensions (Unplugged) : Using Rx to write non-blocking code

Writing non-blocking code helps us ensure we have responsive UI's, save resources by not having blocked threads, and take advantage of multiple cores.

Rx provides us with a few extension methods on the Observable class that allow us to expose an IObservable instead of having to use the classic Asynchronous Programming model which can be a little combersome.

Here are a few very basic examples:

namespace RxDemoApp
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
 
    /// <summary>
    /// The various ways of making the GetPrimes function non-blocking or asynchronous
    /// </summary>
    class NonBlockingDemo
    {
        internal static void Run(IScheduler subscriptionScheduler, IScheduler observationScheduler, long maxPrimeNumber)
        {
            Func<longIEnumerable<long>> func = Util.GetPrimes;
 
            // Using classic APM
            AsyncCallback callback = ar =>
            {
                IEnumerable<long> result = func.EndInvoke(ar);
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            };
            func.BeginInvoke(maxPrimeNumber, callback, null);
 
            Util.EndExample("Completed APM invocation.");
 
            // Using FromAsyncPattern
            Func<longIObservable<IEnumerable<long>>> ofunc1 = Observable.FromAsyncPattern<longIEnumerable<long>>
                (func.BeginInvoke, func.EndInvoke);
            ofunc1(maxPrimeNumber).SubscribeOn(subscriptionScheduler).ObserveOn(observationScheduler)
                .Subscribe(result =>
            {
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            });
 
            Util.EndExample("Completed FromAsyncPattern invocation.");
 
            // Using ToAsync
            Func<longIObservable<IEnumerable<long>>> ofunc2 = func.ToAsync();
            ofunc2(maxPrimeNumber).SubscribeOn(subscriptionScheduler).ObserveOn(observationScheduler)
                .Subscribe(result =>
            {
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            });
 
            Util.EndExample("Completed ToAsync invocation.");
 
            //Observable.Start
            Observable.Start(() => Util.GetPrimes(maxPrimeNumber)).SubscribeOn(subscriptionScheduler)
                .ObserveOn(observationScheduler)
                .Subscribe(result =>
            {
                Util.WriteToConsole<string>(String.Format("Sum of numbers is: {0}", result.Sum()));
            });
 
            Util.EndExample("Completed Observable.Start invocation.");
        }
    }
}

Monday, 2 July 2012

Reactive Extensions (Unplugged) - Rx vs Events

In this blog post I am just going to compare what is looks like to raise an event the classic way versus using Reactive Extensions (Rx).

I don't get much time to blog, but when I do get the occasional free hour or two I want to do my best just to keep the post simple and maybe show of some re-usable snippets of code that other developers can use. If you do need a deeper understanding of Rx, it's definitely worth reading the various blog posts, and books you can find out there on the subject. I have been fortunate enough to have had the opportunity to use the Reactive Extensions Framework on various Silverlight projects and it caught my interest. I am not going to go through the absolute basics on Rx in my blog posts because that has been done a thousand times already, but will rather show you a few simple insights that you may be able to use. All you need is the Rx-Main package from nuget.

It showed me a simpler (and cleaner) way to work with events and event streams:
This includes the consumption, filtering, composition of event streams, and the scheduling of work on the required thread (which is always useful in our Single Threaded Apartment model).

In the code below you can see that I am using Rx at the bottom to raise an event by calling OnNext which will in turn call the printToConsole function. Additionally I have used filtering and specified that I would like to inform the consumer that the event is the only event in the stream by calling OnCompleted. Since the "handle" returned from the Subscribe method implements IDisposable I can use the "Using" keyword to ensure the unsubscribe occurs at the end of the block.

namespace RxDemoApp
{
    using System;
 
    using System.Reactive.Subjects;
    using System.Threading;
 
    using System.Reactive.Linq;
 
    /// <summary>
    /// Demonstrates the different ways of exposing and subscribing to events
    /// </summary>
   class RxVsEventsDemo
    {
 
        delegate string onChangedHandler(long num);
        event onChangedHandler onChanged;
 
        delegate string MyEventHandler<T>(T value);
        event MyEventHandler<long> onChanged1;
 
        event Func<longstring> onChanged2;
 
        Subject<long> onChanged3 = new Subject<long>();
 
        internal static void Run()
        {
            RxVsEventsDemo demo = new RxVsEventsDemo();
 
            Func<longstring> printToConsole = num => 
            { 
                string message = String.Format("Message on thread {1} : Output value : {0}", 
                    num, 
                    Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine(message);
                return message;
            };
 
            // Using classic event handler
            onChangedHandler handler = new onChangedHandler(printToConsole);
            demo.onChanged += handler;
            demo.onChanged(10);
            demo.onChanged -= handler;
 
            // Wiring an event handler to an anonymous delegate
            Func<longstring> func = arg => printToConsole(11);
            
            //onChangedHandler handler1 = func; //Does not work
            onChangedHandler handler1 = arg => printToConsole(11);
            demo.onChanged += handler1;
            demo.onChanged(11);
            demo.onChanged -= handler1;
 
            MyEventHandler<long> handler2 = new MyEventHandler<long>(printToConsole);
            demo.onChanged1 += handler2;
            demo.onChanged1(11);
            demo.onChanged1 -= handler2;
 
            // Using a func
            demo.onChanged2 += printToConsole;
            demo.onChanged2(30);
            demo.onChanged2 -= printToConsole;
 
            //using Rx
            using (demo.onChanged3
                .Where(arg => arg % 2 ==0) // Reactive.Linq extension methods also allow the filtering of events
                .Subscribe((arg) => printToConsole(arg), 
                () => Util.WriteToConsole<string>("Done!!"))) // An additional OnCompleted(and OnError) action used
            {
                demo.onChanged3.OnNext(41); // Never raised because 41 is not an even number i.e. it was filtered out
                demo.onChanged3.OnCompleted();
            }
        }
    }
}

Sunday, 8 April 2012

Dynamically compiling code in memory using the CodeDomProvider

Recently I had to dynamically load some code from a text file and execute it at run time in memory. The CodeDomProvider seemed to do the trick nicely as you can see below. You can generate an assembly in memory using the CompileAssemblyFromSource method like I have done or use it's CompileAssemblyFromFile or CompileAssemblyFromDom counterparts.
using System;
using System.Collections.Generic;
using System.Linq;
 
using Microsoft.CSharp;
using System.CodeDom.Compiler;
 
namespace DynamicCompilerConsoleApp
{
    
    public interface IConsoleWriter
    {
        void WriteToConsole(string message);
    }
    
    public class DynamicAssembyExecution
    {
        public static void DynamicCodeExecution()
        {
            const string code = @"using System;
                                  using DynamicCompilerConsoleApp;
                            namespace LoadXmlTestConsole
                            {
                                public class ConsoleWriter : IConsoleWriter
                                {
                                    public void WriteToConsole(string message)
                                    {
                                        Console.WriteLine(message);
                                    }
                                }
                            }";
 
            CodeDomProvider codeCompiler =
                new CSharpCodeProvider(new Dictionary<stringstring> {{"CompilerVersion""v4.0"}});
            // Create the optional compiler parameters
            CompilerParameters compilerParameters = new CompilerParameters
                                                        {
                                                            GenerateExecutable = false,
                                                            GenerateInMemory = true,
                                                            WarningLevel = 3,
                                                            TreatWarningsAsErrors = false,
                                                            CompilerOptions = "/optimize"
                                                        };
 
            foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies())
            {
                compilerParameters.ReferencedAssemblies.Add(assembly.Location);    
            }
 
            // Compile the string containing the code, using the provided set of parameters
            CompilerResults compilerResults = codeCompiler.CompileAssemblyFromSource(compilerParameters,
                                                                                        code);
            //Display any compiler errors in the console window
            if (compilerResults.Errors.HasErrors)
                foreach (string line in compilerResults.Output)
                    Console.WriteLine(line);
 
            // Get the required type out of the assembly
            Type consoleWriterType =
                compilerResults.CompiledAssembly.GetTypes().SingleOrDefault(
                    lt => lt.GetInterface(typeof (IConsoleWriter).Name) != null);
              
            // create an instance of the type
            IConsoleWriter consoleWriter = (IConsoleWriter)Activator.CreateInstance(consoleWriterType);
                
            // execute the compiled code
            consoleWriter.WriteToConsole("hello world");
            
        }
    }
}