Subscribe: Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF
Added By: Feedage Forager Feedage Grade B rated
Language: English
add  code  ienumerable  int  iobservable  new  observable  onnext  public static  public  reactive  return  select  subscribe  var 
Rate this Feed
Rate this feedRate this feedRate this feedRate this feedRate this feed
Rate this feed 1 starRate this feed 2 starRate this feed 3 starRate this feed 4 starRate this feed 5 star

Comments (0)

Feed Details and Statistics Feed Statistics
Preview: Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF

Scott Weinstein on .Net

Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF


First impressions of Scala

Mon, 28 Nov 2011 05:05:00 GMT

I have an idea that it may be possible to predict build success/failure based on commit data. Why Scala? It’s a JVM language, has lots of powerful type features, and it has a linear algebra library which I’ll need later.

Project definition and build

Neither maven or the scala build tool (sbt) are completely satisfactory.

This maven **archetype** (what .Net folks would call a VS project template)

mvn archetype:generate `-DarchetypeGroupId=org.scala-tools.archetypes `-DarchetypeArtifactId=scala-archetype-simple  `-DremoteRepositories= `-DgroupId=org.SW -DartifactId=BuildBreakPredictor

gets you started right away with “hello world” code, unit tests demonstrating a number of different testing approaches, and even a ready made `.gitignore` file - nice! But the Scala version is behind at v2.8, and more seriously, compiling and testing was painfully slow. So much that a rapid edit – test – edit cycle was not practical. So Lab49 colleague Steve Levine tells me that I can either adjust my pom to use fsc – the fast scala compiler, or use sbt.

Sbt has some nice features

  • It’s fast – it uses fsc by default
  • It has a continuous mode, so  `> ~test` will compile and run your unit test each time you save a file
  • It’s can consume (and produce) Maven 2 dependencies
  • the build definition file can be much shorter than the equivalent pom (about 1/5 the size, as repos and dependencies can be declared on a single line)

And some real limitations

  • Limited support for 3rd party integration – for instance out of the box, TeamCity doesn’t speak sbt, nor does IntelliJ IDEA
  • Steeper learning curve for build steps outside the default

Side note: If a language has a fast compiler, why keep the slow compiler around? Even worse, why make it the default?

I choose sbt, for the faster development speed it offers.


Scala APIs really like to use punctuation – sometimes this works well, as in the following

 map1 |+| map2 

The `|+|` defines a merge operator which does addition on the `values` of the maps.

It’s less useful here:

http(baseUrl / url >- parseJson[BuildStatus]
sure you can probably guess what `>-` does from the context, but how about `>~` or `>+`?

Language features

I’m still learning, so not much to say just yet. However case classes are quite usefull, implicits scare me, and type constructors have lots of power.


A number of projects, such as and are split between github and google code – github for the src, and google code for the docs. Not sure I understand the motivation here.

Intro to RX

Fri, 27 Aug 2010 02:08:09 GMT

Lab49 colleague Lee Campbell has a nice 7 part write-up on the Reactive Extensions

He says:

it is big in all sorts of ways:

  1. In the way that it tackles the Observer pattern is bold
  2. In the way it tackles concurrency is quite a shift from how I have done it before.
  3. The number of (extension) methods is huge.
  4. The way in which it integrates with LINQ to leverage LINQ's compensability & declarative style
  5. The fact that any .NET dev should care UI, backend algorithm coder or Integrator. It helps all of us.
  6. The future plans are even more grand, but that is a different series all together :-)

The series covers

Bending Time with the Reactive Extensions

Sun, 22 Aug 2010 19:23:27 GMT

The latest releases of the Reactive Extensions for .Net include an abstract VirtualScheduler and a concrete implementation called TestScheduler.

So now it’s possible test time dependent code without relying on the passage of time (or tide).

Here’s a sample of code that would take 3 days to complete in the real

[Fact(Timeout = 1000)]
public void TestScheduler()
    List actual = new List();
    Observable.Interval(TimeSpan.FromDays(1), _testSched)
    Assert.Equal(new[] { 0L, 1, 2 }, actual.ToArray());

Notice that I didn’t use a blocking call, such as


to obtain a the values from the interval. The TestScheduler runs on the current thread, and as a result blocking calls never complete.


Here’s another example where we run for a specific duration. Usefull when testing Observables that never end

public void TestOneElementSlidingWindow()
    List>> actual = new List>>();
    var oneBeat = Observable.Return(1, _testSched).Timestamp(_testSched);
    var sWindow = oneBeat.ToSlidingWindow(_oneSecond, _oneSecond, _testSched);
    sWindow.Subscribe(slw => actual.Add(slw));


    Assert.Equal(2, actual.Count);

    Assert.Equal(1, actual[0].Added.Count());
    Assert.Equal(1, actual[0].Current.Count());
    Assert.Equal(0, actual[0].Removed.Count());

    Assert.Equal(0, actual[1].Added.Count());
    Assert.Equal(0, actual[1].Current.Count());
    Assert.Equal(1, actual[1].Removed.Count());


Code samples updated at

Also - Jeffrey van Gogh promises more to come on #c9

PowerShell and a bit of the Task Parallel Library as a replacement for SSIS

Sat, 24 Apr 2010 18:19:00 GMT

I gave a presentation at today’s SQL Saturday in NY on replacing SSIS with PowerShell.

You can view the presentation, or see below for the two second summary:

  • SSIS is a terrible development tool
  • Many SSIS features can be built with out much effort with PowerShell, C#, and the TPL
  • See the demos

The code is hosted at Currently it has the following capabilities

  • Concurrent bulk data transfer
  • Single pass star-schema populator

Contact me if you’d like to contribute or collaborate on this.

Tracking My Internet Provider Speeds

Sat, 10 Apr 2010 18:52:53 GMT

Of late, our broadband internet has been feeling sluggish. A call to the company took way more hold-time than I wanted to spend, and it only fixed the problem for a short while. Thus a perfect opportunity to play with some new tech to solve a problem, in this case, documenting a systemic issue from a service provider.

The goal – a log a internet speeds, taken say every 15 min. Recording ping time, upload speed, download speed, and local LAN usage.


The solution

  • A WCF service to measure speeds
    • Internet speed was measured via
    • LAN usage was measured by querying my router for packets received and sent
  • A SQL express instance to persist the data
  • A PowerShell script to invoke the WCF service – launched by Windows’ Task Scheduler
  • An OData WCF Data Service to allow me to read the data
  • MS PowerPivot to show a nice viz (scratch that, the beta expired)
  • LinqPad to get the data, export it to excel
  • Tableau Public to show the viz




Samples and Slides from Alt.Net Meet up on the Reactive Extensions

Thu, 28 Jan 2010 03:29:52 GMT

The code samples and PowerPoint deck from my presentation on the RX to the New York ALT.NET group are available (and updated) on MSDN Code samples:

And the slide deck

Converting a polling based API into a streaming API with the Reactive Extensions

Fri, 22 Jan 2010 03:47:56 GMT

Recently my building has been having issues with its boilers, and the heat has been going out for longer than is comfortable. The superintendent that makes a habit of periodically checking on the status of each of the boilers. A workable approach certainly, but figured this would be ideal for a technology assist.


For $9, I purchased a USB thermometer, word on the web is that the software comes with its fairly miserable (and crashed  immediately on machine), but with some Google and reflector, was able to come up with a polling based API to read the temperature

public interface IUsbTEMPer
    double Temperature { get; }

With the RX, converting the API into a stream of data is just one line:

IObservable ts = Observable.Generate(
() => new Notification.OnNext(usbTempReader.Temperature)

And getting some simple alerts is easy too:

ts.Buffer(new TimeSpan(1, 5, 0))
    .Select(fiveminOfTemp => fiveminOfTemp.Average())
    .Where(avgtemp => avgtemp < 65)
    .Subscribe(cold => ToTwiter("buildingstatus account..."));

The Anonymous Implementation pattern (as seen in the Reactive Extensions)

Mon, 11 Jan 2010 04:40:14 GMT

There’s a pattern used in the Reactive Extensions that I’m calling the Anonymous Implementation.

You can see it in use on IObservable’s one method

IDisposable Subscribe(IObserver observer);

Given an Observable which you want to subscribe to, the brute force, naive (or pedantic) approach would be to create a new class implementing IObserver and then subscribe it.

public class BasicConsoleWriteLineObserver: IObserver
    public void OnNext(T value)
    public void OnError(Exception ex) 
        throw ex;
    public void OnCompleted() { }

IObservable stream = ...;
stream.Subscribe(new BasicConsoleWriteLineObserver());

But a simpler method, one that dispenses of all the ceremony of creating a new Type is to use one of the Observer factory methods, like so:

IObserver basicCWLobserver = Observer.Create((int value) => Console.WriteLine(value));

Observer.Create (and it’s 4 overloads) will create an anonymous implementation of the interface needed. Of course you can skip a step or two and use the extension methods on IObservable to get even shorter versions of the same:


This approach is great, for all but the most complex implementations, there’s no need to ever implement IObserver.  And as they say, less code is better code.


There are a number of other interfaces that I wish used this approach. IValueConverter, and IComparer come to mind first.

16 Ways To Create IObservables without implementing IObservable

Mon, 11 Jan 2010 02:20:45 GMT

The Reactive Extensions for .Net offers plenty of ways to create IObservables


Some primitives

IObservable obs = Observable.Empty();
IObservable obs = Observable.Return(0);
IObservable obs = Observable.Throw(new Exception());

Simple streams

IObservable obs = Observable.Interval(new TimeSpan(0, 0, 1));
IObservable obs = Observable.Timer(DateTimeOffset.Now.AddHours(1)); // Plus 7 more overloads
IObservable obs = Observable.Repeat(1); // Plus 7 more overloads
IObservable obs = Observable.Range(0, 1);


From async data

//From an Action or Func
Observable.Start(() => 1);

//From Task

//From AsyncPattern
// typical use case is IO or Web service calls
Func sampleFunc = (a,b) => 1d;
Func> funcObs = 
IObservable obs = funcObs(1, 0);

From Events

public event EventHandler AnEvent;
IObservable> fromEventObs = 
Observable.FromEvent(h => this.AnEvent += h, 
                                      h => this.AnEvent -= h);
From Existing Collections
IEnumerable ie = new int[] {};
observable = ie.ToObservable();

By Generate()

There are 20 overloads to generate. See some prior examples here

By Create()

This creates a cold stream

IObservable observable = Observable.Create(o =>
                                    return () => { };

To make a hot stream via Create()

List> _subscribed = new List>();
private CreateHot()
    observable = Observable.Create(o =>
        return () => _subscribed.Remove(o);

private void onNext(int val)
    foreach (var o in _subscribed)

But rather than using create, a subject provides a cleaner (thread safe and tested) way of doing the above

var subj = new Subject();
observable = subj.Hide();

Streaming OLAP with the Reactive Extensions (RX) for .Net

Sun, 03 Jan 2010 01:28:06 GMT

Streaming OLAP is something that comes up over and over again in the “CEP space” – using the Reactive Extensions for .Net this demo shows the basics; filtering, grouping, aggregates, and concurrent queries. To set the context, the idea here is focus on the “query” side, so the UI is aggressively simple – no nifty visualizations here, just enough to show working code. I chose a non-financial domain, filesystem changes in this case, simply because the simplified equities example is a bit overused. Here’s a screenshot and download link  Let’s walk through the code: Getting the data The .Net class FileSystemWatcher will report changes to the filesystem as events, so to get a stream of filesystem changes we only need do the following: get all the fixed drives create a new FileSystemWatcher for each convert the Changed, Deleted, and Created events to IObservables using FromEvent() Merge() those into a single IObservable map the EventArgs to to a more query friendly class public static IObservable GetFileSystemStream() { var fsEventTypes = new string[] { "Changed", "Deleted", "Created" }; IEnumerable>> fsEventsAsObservables = DriveInfo.GetDrives() .Where(di => di.DriveType == DriveType.Fixed) .Select(drive => new FileSystemWatcher(drive.RootDirectory.FullName) { … }) .SelectMany(fsw => fsEventTypes.Select(eventType => Observable.FromEvent…; return Observable.Merge(fsEventsAsObservables) .Select(fsea => { var fi = new FileInfo(fsea.EventArgs.FullPath); return new FileChangeFact { ChangeType = fsea.EventArgs.ChangeType, Path = fsea.EventArgs.FullPath, IsContainer = !fi.Exists, Length = fi.Exists ? fi.Length : 0, Extension = String.IsNullOrEmpty(fi.Extension) ? "(none)" : fi.Extension }; }); } Calculating Aggregates, take 1 The Scan() operator is ideal for computing aggregates in a streaming olap scenario. Unlike traditional queries where vectors are aggregated into a single value, we want to computing running values. So to compute a few of the most common query aggregates, Count, Sum, Mean, and StdDev we can do the following: public static IObservable Mean(this IObservable source) { var temp = new { N = 0, Mean = 0d }; return source.Scan(temp, (cur, next) => { var n = cur.N + 1; var delta = next - cur.Mean; var meanp = cur.Mean + delta / n; return new { N = n, Mean = meanp, }; }).Select(it => it.Mean); } public static IObservable StdDev(this IObservable source) { var temp = new { N = 0, Mean = 0d, M2 = 0d }; return source.Scan(temp, (cur, next) => { var n = cur.N + 1; var delta = next - cur.Mean; var meanp = cur.Mean + delta / n; return new { N = n, Mean = meanp, M2 = cur.M2 + delta * (next - meanp) }; }).Select(it => Math.Sqrt(it.M2 / (it.N))); } var fss = GetFileSystemStream(); fss.Select(fcf => (double)fcf.Length) .Scan(0,(c, _) => c + 1) // Count .Zip(lenxs.Scan(0d,(c, n) => c + n), (cnt, sum) => new FileChangeAggregate(){Sum=sum,Count... .Zip(lenxs.Mean(), (fca, mean) => { fca.Mean = mean; return fca; }) .Zip(lenxs.StdDev(), (fca, stddev) => { fca.StdDev = stddev; return fca; }) //... subscribe()[...]

CEP Style Sliding windows in the RX – Take 2

Mon, 30 Nov 2009 04:01:14 GMT

The bug I mentioned in my first attempt at a sliding window was the minor issue that the aggegates never went down to 0, even if the window had emptied out.

The problem line of code was cur.GroupBy(tsst => tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.

Here’s the fix:

public IObservable GetVWAPWS(IObservable> oticks)
    var existingWindows = new ConcurrentDictionary();
    return oticks
              .ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500))
              .Select(sl => sl.Current)
              .SelectMany(cur =>
                      IEnumerable grouped = cur.GroupBy(tsst => tsst.Value.Symbol)
                          .Select(grp =>
                                  IEnumerable ticks = grp.Select(tsst2 => tsst2.Value);
                                  var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
                                  var totalVolume = ticks.Sum(tk => tk.Size);
                                  return new VWAPItem(grp.Key, totalAmount, 
								totalAmount / totalVolume);
                      foreach (var grpd in grouped)
                          existingWindows[grpd.Symbol] = 1;
                     IEnumerable< IEnumerable> outerJoin = existingWindows
                                                            key => key.Key, 
                                                            grped => grped.Symbol,
                                                            (key, item) => 
								item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0)));
                     return outerJoin.SelectMany(x => x);

Sliding Windows via the Reactive Framework

Wed, 25 Nov 2009 06:13:48 GMT

A few months ago, playing with CTP 2 of StreamInsight, I created a small VWAP demo on a sliding window. Now that a proper CTP of the RX is available, I wanted to see how much effort the same demo would be without the CEP infrastructure of StreamInsight. I’ll admit that this was a little bit harder to write then I expected – and there’s still at least one bug remaining (updated) , but the code for actually computing the VWAPS feels much cleaner in the RX version then it did in the StreamInsight version. The debugability (which is really about transparency) of RX is a welcome difference to most CEP systems. So here’s the code: The generation of stock ticks remained nearly identical – however instead of timestamping by hand, I used to Timestamp() extension method. And to allow multiple observers to the same IObservable, the ticks are routed to a Subject. public IObservable> GetTicks() { var subj = new Subject>(); var gen = Observable.Generate( 0 , ii => ii < 1000 // produce 1000 ticks , ii => new StockTick() // next value ... ) .Timestamp(); gen.Subscribe(tsst => subj.OnNext(tsst)); return subj; } Compute VWAP on a 10 second sliding window public IObservable GetVWAPWS(IObservable> oticks) { return oticks .ToSlidingWindow(new TimeSpan(0, 0, 0,10), new TimeSpan(0, 0, 0, 0, 500)) .Select(sl => sl.Current) .SelectMany(cur => cur.GroupBy(tsst => tsst.Value.Symbol) .Select(grp => { IEnumerable ticks = grp.Select(tsst2 => tsst2.Value); var totalAmount = ticks.Sum(tk => tk.Size * tk.Price); var totalVolume = ticks.Sum(tk => tk.Size); return new VWAPItem(grp.Key, totalAmount, totalVolume, totalAmount / totalVolume); })); } And the code for ToSlidingWindow() public static IObservable>> ToSlidingWindow( this IObservable> source, TimeSpan size, TimeSpan resolution) { Func>, TimeoutJoinItem, SlidingWindow>> windowing = (window, item) => { Func, bool> checkTimestamp = cwi => cwi.Timestamp.Add(size) <= item.ComparisonTimestamp; var newCurrent = window.Current.SkipWhile(checkTimestamp); var removed = window.Current.TakeWhile(checkTimestamp); var added = Enumerable.Repeat(item.TSItem, (item.IsTimeout) ? 0 : 1); return new SlidingWindow>(newCurrent.Concat(added), added, removed); }; DateTime priorleft = DateTime.MinValue; return source.CombineLatest(Observable.Timer(resolution, resolution).Timestamp(), (left, right) => { bool isTimeout = left.Timestamp == priorleft; priorleft = left.Timestamp; return new TimeoutJoinItem(left, (isTimeout)? right.Timestamp: left.Timestamp, isTimeout); }).Scan(new SlidingWindow>(), windowing) .Where(sl => sl.Added.Count() > 0 || sl.Remov[...]

Reactive Framework available from DevLabs

Wed, 18 Nov 2009 04:23:08 GMT

Downloads of the Reactive Framework (RX) can now be found at MS DevLabs. Versions for 3.5 SP1, 4.0 Beta, and Silverlight 3 are available. Interestingly, the API size appears to be substantially larger than the preview which was leaked as part of the Silverlight 3 Toolkit. That DLL was all of 84KB, the current release is weighs in at 283KB.


In regards to CEP, the comparisons between StreamInsight and RX are interesting

RX   StreamInsight
Low – it’s managed code all the way down Leaking abstraction High; Linq2SI is like Linq2SQL, except the underling SI implementation isn’t well understood
Limited by CLR and GC Performance High b/c of native code
None out of box Windowing support Explicit
Easy Adaptor support Not hard, but not trivial

From the ReactiveFramework to StreamInsight and Back

Tue, 25 Aug 2009 05:01:05 GMT

In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.   First we’ll generate the ticks: System.Collections.Generic.IObservable stockTicks = System.Linq.Observable.Generate( new Random() // inital state , rnd => true // continue , rnd => new StockTick() // next value { Price = rnd.NextDouble() * 1000, Side = Sides[rnd.Next(Sides.Length - 1)], Size = (long)(rnd.NextDouble() * 1000), Symbol = Symbols[rnd.Next(Symbols.Length - 1)], Timestamp = DateTime.Now } , rnd => (int)(rnd.NextDouble() * 2000) // waitInterval , rnd => rnd // iterate ); And now convert to a CEP stream: var cepInput = stockTicks.ToCEP() .ToCepStream(tick => tick.Timestamp); Where ToCep() is just the inverse of ToRx(), defined previously. public static S.IObservable ToCEP(this RX.IObservable rxSource) { return new CEPAnonymousObservable( (S.IObserver cepObserver) => RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal))); } Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort var alteredDurationStream = cepInput .ToPointEventStream() .AlterEventDuration(tick => TimeSpan.FromMinutes(5)); var fiveMinVWaps = from fivemin in alteredDurationStream group fivemin by fivemin.Symbol into fGrp from evwindow in fGrp.Snapshot() select new { Symbol = fGrp.Key, TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price), TotalVolume = evwindow.Sum(fmin => fmin.Size), }; var fiveMinVWaps2 = from fivemin in fiveMinVWaps select new VWAPItem() { Symbol = fivemin.Symbol, VWAP = fivemin.TotalAmount / fivemin.TotalVolume, Timestamp = DateTime.Now, }; Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot(). Now that we have the data, we can convert to back to a RX stream: PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp"); var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX(); And update an ObservableCollection vwaps.Post(_sc).Subscribe(item => { var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault(); if (exists == null) CEPOS1.Add(item); else exists.CopyFrom(item); });   Which displays on a UI [...]

Routing StreamInsight output streams to a UI

Sun, 23 Aug 2009 21:01:48 GMT

One compelling feature of StreamInsight is it’s in-process hosting model. In addition to reducing the complexity of server side installs, it’s now possible to have a  CEP engine in the client UI. The simplest way of getting CEP streams onto the UI would be the Reactive Framework methods. Something like queryOutputStream .ToObservable(...) .Post(syncContext) .Subscribe(item=> collection.Add(item) ); But in the CTP that won’t work. As I discovered a few days ago The IObservable used in StreamInsight is defined in a different namespace and assembly than the IObservable in the System.Reactive. Furthermore the StreamInsight api lacks the base classes and extension methods defined in System.Reactive. I didn’t want to go the normal route of creating an implementation of IObserver, on say a ViewModel, route the data through the dispatcher on onto a collection, as while it would have the benefits of simplicity and it would work, it would mean giving up on all the goodness in System.Reactive. The first method I tried in an effort to convert a CEP IObservable into an RX IObservable didn’t work, but was instructive nonetheless. Using StreamInsight’s own I/O adapter API, I would create an “Eventing” Adapter which would raise an conventional .Net event on an object of my choosing, then using the ReactiveFramework, convert that event to an RX IObservable.  But it’s not easy (or possible) to do. Instances of output adapters are created by OutputAdapterFactories, which in turn are created by Factory methods. You can send in a configuration object, but it needs to be XML serializable, so there’s no sending in of Action<> delegates.   But it turns out that it’s not hard to convert from a CEP IObservable to an RX IObservable. First you need a CEP AnonymousObserver using S = System; internal class AnonymousObserver : S.IObserver { private bool isStopped; private S.Action _onCompleted; private S.Action _onError; private S.Action _onNext; public AnonymousObserver(S.Action onNext, S.Action onError) : this(onNext, onError, () => { }) { } public AnonymousObserver(S.Action onNext, S.Action onError, S.Action onCompleted) { _onNext = onNext; _onError = onError; _onCompleted = onCompleted; } public void OnCompleted() { if (!isStopped) { isStopped = true; _onCompleted(); } } public void OnError(S.Exception exception) { if (!isStopped) { isStopped = true; _onError(exception); } } public void OnNext(T value) { if (!isStopped) _onNext(value); } } Then an extension method taking a CEP IObservable, returning a RX AnonymousObservable, subscribing to it via the CEP IObserver and on the OnNext, calling the returning RX IObservable’s on OnNext. Like so: using RX = System.Collections.Generic; using S = System; public static class CEPExtMethods { public static RX.IObservable ToRX(this S.IObservable source) { return new AnonymousObservable( (RX.IObserver rxObserver) => source.Subscribe(new AnonymousObserver( nextVal => rxObserver.OnNext(nextVal), rxObserver.OnError ))); } } To use it: var queryOutputStream = CreateQueryTemplate(input); var queryO[...]

A first look at MS StreamInsight

Thu, 20 Aug 2009 12:23:05 GMT

This morning I was hoping to take a few minutes to modify one of the examples in the StreamInsight CTP and send an output stream to a UI, rather than the text files used in the examples. I thought this would be easy, as the readme states that there’s

“An alpha version of the StreamInsight libraries for development using the IObservable/IObserver programming paradigm.”

But it wasn’t. The IObservable used in StreamInsight is defined in a different namespace than the IObservable in the System.Reactive and the StreamInsight api lacks the base classes and extension methods defined in System.Reactive. At this time, the two APIs do not play well with each other.


Some thoughts on how to get around this temporary inconsistency:

  • Recompile System.Reactive to use StreamInsight’s IObservable/IObserver
  • Create a type converter between the two IObservable/IObservers
  • Create a StreamInsight output adapter which just raises a .Net event, then use the RX method of converting events to IObservables

Perhaps tonight.

Exploring the Reactive Framework part II

Wed, 12 Aug 2009 04:15:16 GMT

Talk around the water cooler is that it might be possible to use the Reactive Framework for some lightweight CEP. I’ll correct some of the (big) mistakes from my last post and build up a “jumping” window extension method for IObservables.   In my last post I build a simple grouping method, but in it I immediately turned the push style of processing into a pull style by using the GetEnumerator() method. This is a bad idea for two key reasons, a) it takes the inherit elegance of the RX reduces it to a for loop, and b) it commits a cardinal sin of multi-threading and reserves a thread for a primarily blocking operation.   Here’s an improved version public static IObservable> ToWindow( this IObservable source, Func, bool> grouper) { return RXGrouping.ToWindow(source, val => val, grouper); } public static IObservable> ToWindow( this IObservable source, Func selector, Func, bool> grouper) { List res = new List(); return new AnonymousObservable>( observer => source.Subscribe( nextVal => { try { if (!grouper(nextVal, res)) { observer.OnNext(res); res = new List(); } res.Add(selector(nextVal)); } catch (Exception exception) { observer.OnError(exception); return; } } ,observer.OnError ,observer.OnCompleted)); } The mistake in the prior version stemmed in part from thinking that I needed to ask for the next value, but of course the RX will supply the next value when it’s available. To use it: TimeSpan windowDuration = new TimeSpan(0,0,10); generatedNums // add a Timestamp to our raw data .Select(val => new { Timestamp = DateTime.Now, Value = val }) // create a 5 min "jumping" window .ToWindow((lastVal, seq) => (seq.Count() == 0) || (lastVal.Timestamp - seq.First().Timestamp < windowDuration)) // create item for display .Select(seq => new { Timestamp = seq.First().Timestamp , Values = seq.Select(a => a.Value).ToArray() , Average = seq.Average(a => a.Value) }) // marshal and add to list .Post(sc).Subscribe(wv => WindowVals.Add(wv));   And the results   Next we’ll look into creating a sliding window.[...]

Exploring the Reactive Framework (RX)

Wed, 29 Jul 2009 05:07:18 GMT

A few days ago, intentionally or not, a version of the Reactive Framework was released into the wild. Let’s see how we can use the RX for computations on a stream of data. As an example we’ll take a stream of ints and produce the averages in groups of five.   Here’s the primary stream of numbers, using the static Generate()  method Random rnd = new Random(); var generatedNums = Observable.Generate( 0 // seed , d => true // condition to continue , d => d % 12 //generated value , d => (int)(rnd.NextDouble() * 300) //delay , d => d + 1 // modify value for next iter ); And to consume the stream by adding the values into an ObservableCollection generatedNums .Post(sc) // move onto UI thread .Subscribe(num => Numbers.Add(num) // add numbers to observable collection ); Computing the average, in groups of 5 turns out to be harder, as the Reactive FX doesn’t seem to have a GroupBy() method at this time. Here’s what I came up with: generatedNums .Grouper(a => a, (a,list) => list.Count() < 5) // group into lists of 5, returning an IObservable> .Select(list => list.Average()) // take the average of the list, so project IObservable> to IObservable .Post(sc).Subscribe(mean => Averages.Add(mean) // move onto UI and add to observable collection ); And the implementation for “Grouper()”  public static IObservable> Grouper( this IObservable source, Func selector , Func, bool> grouper) { return new AnonymousObservable>( observer => source.Subscribe(x => { try { using (var er = source.GetEnumerator()) while (er.MoveNext()) { bool needsMove = false; var res = new List(); while (grouper(er.Current, res) && ((needsMove) ? er.MoveNext() : true)) { needsMove = true; res.Add(selector(er.Current)); } observer.OnNext(res); } } catch (Exception exception) { observer.OnError(exception); return; } }, new Action(observer.OnError), new Action(observer.OnCompleted))); }[...]

Preview of the Reactive Framework available via Silverlight Toolkit

Thu, 23 Jul 2009 21:49:03 GMT

Via Jafar Husain - it appears that there’s a early release of the Live Labs Reactive Framework (& with Brian Beckman and Erik Meijer) in the latest Silverlight Toolkit


In addition to some of the standard LINQ operators (Select, Where, Aggregate), some new operators look quite promising  -

  • ForkJoin
  • Merge
  • Delay
  • HoldUntilChanged
  • Latest
  • Merge
  • Throttle

When Add(ing)-Type, choose your method signatures wisely

Fri, 08 May 2009 18:44:18 GMT

  Powershell V2 has some great new features, in particular Add-Type and Remoting features are likely to be quite popular and work together without much issue. That said, there are edge cases which illustrate how the types returned from remoting calls. The following script demonstrates the issue $csCode = @" using System; using System.Collections.Generic; using System.Linq; namespace Demo { public static class D { public static int Add(int a, int b) { return a + b; } public static int AddArray(int[] ints) { return ints.Sum(); } public static int AddEnumerable(IEnumerable ints) { return ints.Cast().Sum(); } public static int AddEnumerable(IEnumerable ints) { return ints.Sum(); } } } "@ Add-Type -TypeDefinition $csCode -Language CSharpVersion3 $oneRemote = Invoke-Command -ComputerName localhost -ScriptBlock { return 1 } $listRemote = Invoke-Command -ComputerName localhost -ScriptBlock { return (1,2,3) } $one = &{return 1} if ($one -eq $oneRemote) { Write-Host "1 == 1" } $v = [Demo.D]::Add($one,$oneRemote) Write-output "One + OneRemote = $v" ; $v=$null $v = [Demo.D]::AddArray(($one,$oneRemote)) Write-output "One + OneRemote via array = $v" ; $v=$null $v = [Demo.D]::AddArray($listRemote) Write-output "One + OneRemote via remote array = $v" ; $v=$null $v = [Demo.D]::AddEnumerable((1,2,3,4)) Write-output "One + OneRemote via local IEnumerable = $v" ; $v=$null $v = [Demo.D]::AddEnumerable($listRemote) Write-output "One + OneRemote via remote IEnumerable = $v"; $v=$null $oneRemote | Get-Member The output from the above is 1 == 1 One + OneRemote = 2 One + OneRemote via array =  2 One + OneRemote via remote array =  6 One + OneRemote via local IEnumerable =  10 # so far as expected   Exception calling "AddEnumerable" with "1" argument(s): "Specified cast is not valid." At typeDemo.ps1:49 char:29 + $v = [Demo.D]::AddEnumerable <<<< ($listRemote)    TypeName: System.Int32 Name               MemberType   Definition                                                                                                                                                                        ----               ----------   ----------[...]