Saturday, December 26, 2009 1:10 AM bart

More LINQ with System.Interactive – The Ultimate Imperative

With the recent release of the Reactive Extensions for .NET (Rx) on DevLabs, you’ll hear quite a bit about reactive programming, based on the IObservable<T> and IObserver<T> interfaces. A great amount of resources is available on Channel 9. In this series, I’ll focus on the dual of the System.Reactive assembly, which is System.Interactive, providing a bunch of extensions to the LINQ Standard Query Operators for IEnumerable<T>. In today’s installment we’ll talk about the imperative style operators provided on EnumerableEx:

image

 

Laziness and side-effecting iterators

LINQ can be quite deceptive on a first encounter due to the lazy island it provides in an otherwise eagerly evaluated language like C# and Visual Basic. Simply writing down a query doesn’t cause it to be executed, assuming no eager operators like ToArray, ToList or ToDictionary are used. In fact, the composition of sequences lies at the heart of this since sequences can evaluate lazily, on demand when calling MoveNext on an enumerator. Iterators are a simple means to provide such a sequence, potentially capturing a sea of side-effects interleaved with the act of producing (or “yielding”) values.

Let’s start with a quite subtle kind of side-effect, reading from a random number generator:

static Random s_random = new Random();

static IEnumerable<int> GetRandomNumbers(int maxValue)
{
    while (true)
    {
        yield return s_random.Next(maxValue);
    }
}

Every time you execute this, you’ll get to see different numbers. What’s more important in this context though is the fact every yield return point in the code is a place where the iterator suspends till the next call to MoveNext occurs, causing it to run till the next yield return is encountered. In other words, the whole loop is immunized till a consumer comes along. To visualize this a bit more, let’s add some Console.WriteLine output calls as an additional side-effect:

static Random s_random = new Random();

static IEnumerable<int> GetRandomNumbers(int maxValue)
{
    while (true)
    {
        Console.WriteLine("Next");
        yield return s_random.Next(maxValue);
    }
}

The following code fragment illustrates the point in time where the sequence executes:

var res = GetRandomNumbers(100).Take(10);
Console.WriteLine("Before iteration");
foreach (var x in res)
    Console.WriteLine(x);

The result is the following:

Before iteration
Next
16
Next
56
Next
46
Next
58
Next
22
Next
91
Next
77
Next
20
Next
91
Next
92

 

Run, run, run

System.Interactive’s Run operator in EnumerableEx allows execution of the sequence on the spot, in a fashion equivalent to having a foreach-loop. Two overloads exist, one discarding the element consumed from the sequence and another one feeding it in to an Action<T>:

public static void Run<TSource>(this IEnumerable<TSource> source);
public static void Run<TSource>(this IEnumerable<TSource> source, Action<TSource> action);

Rewriting the code above using the second overload will produce similar results:

var res = GetRandomNumbers(100).Take(10);
Console.WriteLine("Before iteration");
res.Run(x => Console.WriteLine(x)); // equivalent to res.Run(Console.WriteLine);

Since Run returns a void, it’s only used for its side-effects, which can be useful from time to time. Previously, a similar affect could be achieved by calling ToArray or ToList, at the cost of burning memory for no good reason. In the above, it wouldn’t even be a viable option in case you simply want to print random numbers ad infinitum, as an infinite sequence would cause the system to run out of memory in a ToArray or ToList context.

Let’s assume for the continuation of this post that GetRandomNumbers doesn’t exhibit a printing side-effect in and of itself:

static IEnumerable<int> GetRandomNumbers(int maxValue)
{
    while (true)
    {
        yield return s_random.Next(maxValue);
    }
}

In this setting, our Run call above effectively adds the side-effect of printing to the screen “from the outside”, at the (consuming) end of the “query”. Using the Do operator, one can inject a side-effect in a lazily evaluated sequence composed of different combinators.

image

 

Adding side-effects using Do

The Do method has the following signature:

public static IEnumerable<TSource> Do<TSource>(this IEnumerable<TSource> source, Action<TSource> action);

Taking in an IEnumerable<T> and producing one, it simply iterates over the source, executing the specified action before yielding the result to the consumer. Other than producing the side-effect during iteration, it doesn’t touch the sequence at all. You can write this operator in a straightforward manner yourself:

static IEnumerable<T> Do<T>(this IEnumerable<T> source, Action<T> action)
{
    foreach (var item in source)
    {
        action(item);
        yield return item;
    }
}

Or you could build it out of other combinator primitives, in particular Select:

static IEnumerable<T> Do<T>(this IEnumerable<T> source, Action<T> action)
{
    return source.Select(item =>
    {
        action(item);
        return item;
    });
}

This is useful primarily for debugging purposes, where you want to “probe” different points of execution in a query. For example, consider the following query expression:

var res = from x in GetRandomNumbers(100).Take(10)
          where x % 2 == 0
          orderby x
          select x + 1;
res.Run(x => Console.WriteLine(x));

Don’t know why it produces the results you’re seeing? Using Do, you can inject “checkpoints”. First, realize the above query desugars into:

var res = GetRandomNumbers(100).Take(10)
          .Where(x => x % 2 == 0)
          .OrderBy(x => x)
          .Select(x => x + 1);

Now we can put Do calls “on the dots” to see the values flowing through the pipeline during consumption of the query result.

var res = GetRandomNumbers(100).Take(10)
          .Do(x => Console.WriteLine("Source  -> {0}", x))
          .Where(x => x % 2 == 0)
          .Do(x => Console.WriteLine("Where   -> {0}", x))
          .OrderBy(x => x)
          .Do(x => Console.WriteLine("OrderBy -> {0}", x))
          .Select(x => x + 1)
          .Do(x => Console.WriteLine("Select  -> {0}", x));

The below shows what’s triggered by the call to Run:

Source  -> 96
Where   -> 96
Source  -> 25
Source  -> 8
Where   -> 8
Source  -> 79
Source  -> 25
Source  -> 3
Source  -> 36
Where   -> 36
Source  -> 51
Source  -> 53
Source  -> 81
OrderBy -> 8
Select  -> 9
9
OrderBy -> 36
Select  -> 37
37
OrderBy -> 96
Select  -> 97
97

For example, 25 produced by the source didn’t survive the Where operator filtering. From the output one can also see that all Where and Source consumption calls precede any OrderBy calls, since the ordering operator eagerly drains its source before carrying out the ordering and passing the results to its consumer.

Looking at the output before the first result, 9, is printed, you can observe the effect of the first MoveNext call on the resulting sequence: the whole source is consulted and fed through the Where operator in order for OrderBy to produce the first (smallest) result. A conceptual diagram illustrating the interception of sequences using Do is shown below:

image

In fact, one can make Do surface through query syntax as well, by providing an extension method overload for e.g. Where (note: this is purely for illustration purposes, and admittedly over-overloading and misusing existing operators :-)):

public static class DoEnumerable
{
    public static IEnumerable<T> Where<T>(this IEnumerable<T> source, Action<T> action)
    {
        return source.Do(action);
    }
}

The resulting usage pattern is the following:

var res = from x in GetRandomNumbers(100).Take(10)
          /*do*/ where Console.WriteLine("Source  -> {0}", x)
          where x % 2 == 0
          /*do*/ where Console.WriteLine("Where   -> {0}", x)
          orderby x
          /*do*/ where Console.WriteLine("OrderBy -> {0}", x)
          select x + 1 into x
          /*do*/ where Console.WriteLine("Select  -> {0}", x)
          select x;

image

 

A lame semi-cooperative scheduler

Let’s first say there’s no good justification (this is the lame part) for doing this sample other than for educational purposes showing use of a sequence purely for its side-effects. The idea of the below is to declare a worker thread with varying priorities for portions of its code. Sure, we could have set thread priorities directly in the code, but the special part of it is feeding back desired priorities to the driver loop (“Start”) of the scheduler that can decide how to implement this prioritization scheme. The cooperative nature is the fact the worker threads yield their run by signaling a new priority, effectively handing over control to the driver loop. I’m calling it semi just because of the following sample implementation relying on preemptive scheduling as provided by the Thread class, though the reader challenge will be to shake off that part.

First of all, work is declared by an iterator that yields priorities followed by the work that will run under that priority. The driver can decide whether or not to call MoveNext, effectively causing the iterator to proceed till the next yield return statement. For example:

static IEnumerable<ThreadPriority> Work1()
{
    int i = 0;
    Action print = () =>
    {
        Console.WriteLine("{0} @ {1} -> {2}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.Priority, i++);
        for (int j = 0; j < 10000000; j++)
            ;
    };
    yield return ThreadPriority.Normal;
    {
        print();
    }
    yield return ThreadPriority.Lowest;
    {
        print();
    }
    yield return ThreadPriority.Normal;
    {
        print();
    }
    yield return ThreadPriority.Highest;
    {
        print();
    }
    yield return ThreadPriority.Highest;
    {
        print();
    }
}

The block-based work item declaration after a yield syntactically groups work items and their priorities. Obviously we fake work to illustrate the point. A driver loop, called Start, can be implemented as lame as relying on the managed Thread type:

static void Start(IEnumerable<ThreadPriority> work)
{
    new Thread(() =>
    {
        work.Do(p => Thread.CurrentThread.Priority = p).Run();
    }).Start();
}

In here, we’re using both Run and Do to respectively run the work and cause the side-effect of adjusting the priority of the thread hosting the work. The reader is invited to cook their own dispatcher with the following signature:

static void Start(params IEnumerable<ThreadPriority>[] workers);

The idea of this one will be to implement a prioritization scheme – just for fun and definitely no profit other than intellectual stimulus – by hand: run all the work on the same thread, with MoveNext calls standing for an uninterruptible quantum. During a MoveNext call, the worker will proceed till the next yield return is encountered, so you may cause an unfair worker to run away and do work forever. This pinpoints the very nature of cooperative scheduling: you need trust in the individual workers. But when you regain control, retrieving the priority for the next work item the worker plans to do, you can make a decision whether you let it go for another quantum (by calling MoveNext) or let another worker from the worker list take a turn (tip: use an ordering operator to select the next worker to get a chance to run). This process continues till all workers have no more work items left, indicated by MoveNext returning false (tip: keep a list of “schedulable” items).

In the scope of this post, the sole reason I showed this sample is because of the use of Do and Run to drive home the point of those operators. Sure, you can achieve the same result (if desired at all) by tweaking the managed thread priority directly in each worker.

 

Next on More LINQ

Dealing with exceptions caused by sequence iteration.

Del.icio.us | Digg It | Technorati | Blinklist | Furl | reddit | DotNetKicks

Filed under: ,

Comments

# Twitter Trackbacks for More LINQ with System.Interactive ??? The Ultimate Imperative - B# .NET Blog [bartdesmet.net] on Topsy.com

Pingback from  Twitter Trackbacks for                 More LINQ with System.Interactive ??? The Ultimate Imperative - B# .NET Blog         [bartdesmet.net]        on Topsy.com

# Dew Drop &#8211; December 27, 2009 | Alvin Ashcraft&#039;s Morning Dew

Pingback from  Dew Drop &#8211; December 27, 2009 | Alvin Ashcraft&#039;s Morning Dew

# .NET System.Interactive and Monadic type systems

Monday, December 28, 2009 3:27 PM by .NET System.Interactive and Monadic type systems

Pingback from  .NET System.Interactive and Monadic type systems

# re: More LINQ with System.Interactive – The Ultimate Imperative

Monday, December 28, 2009 11:12 PM by Josh Einstein

The Run/Do methods are great and very useful. I had created my own extension methods that do the same but I called them Consume/Step respectively. I like your names better.

Another very useful one I posted about recently was a Batch method that takes IEnumerable(T) and a batch size parameter and yields a T[] array containing chunks of the input sequence.

# The Morning Brew - Chris Alcock &raquo; The Morning Brew #506

Tuesday, December 29, 2009 3:57 AM by The Morning Brew - Chris Alcock » The Morning Brew #506

Pingback from  The Morning Brew - Chris Alcock  &raquo; The Morning Brew #506

# The Essence of LINQ – MinLINQ

Friday, January 01, 2010 5:47 AM by B# .NET Blog

Introduction Before reaching the catharsis in the “More LINQ with System.Interactive” series over here

# Linq.Debug.Print &laquo; Tech Tock

Tuesday, January 12, 2010 7:29 AM by Linq.Debug.Print « Tech Tock

Pingback from  Linq.Debug.Print &laquo; Tech Tock

# Reactive Extensions for .NET (Rx) &laquo; Just Justin&#039;s

Saturday, February 06, 2010 3:45 AM by Reactive Extensions for .NET (Rx) « Just Justin's

Pingback from  Reactive Extensions for .NET (Rx)  &laquo; Just Justin&#039;s

# Existing LINQ extension method similar to Parallel.For? - C# Solution - Developers Q &amp; A

Pingback from  Existing LINQ extension method similar to Parallel.For? - C# Solution - Developers Q &amp; A