Share via


Rx ensures Unsubscribe

I recently had an ObservableCollection which did not currently contain the item I was interested in, but it would sometime in the near future. The questions is how best to get notified that the item is available so I could access its full property set?  I only had the item’s ID and the collection’s CollectionChanged event to work with.

My first instinct was to just go with simple lambda expression subscribed to the CollectionChanged event in order to filter for the item’s appearance in the collection

  1. public void WaitForMovie_UsingLambda(int ID, ObservableCollection<MovieData> collection)
  2. {
  3.     collection.CollectionChanged += new NotifyCollectionChangedEventHandler(
  4.         (sender, e) =>
  5.         {
  6.             if (e.Action == NotifyCollectionChangedAction.Add)
  7.             {
  8.                 var movie = ((MovieData)e.NewItems[0]);
  9.                 if (movie.ID == movieSearchID)
  10.                 {
  11.                     DoSomething(movie);
  12.                 }
  13.             }
  14.         });
  15. }

The problem with this pattern is that you can’t unsubscribe from the event notification. Every time something is added (even after we have found our item) this logic will execute wasting CPU processing power. In addition, this also has the potential to create memory issues/leaks because objects may remain alive longer since the subscription will exist as long as the collection does. If the subscription has side effects (there are none in this simple case) then they are essentially permanent.

We can go with a basic delegate structure which allows us to unsubscribe but the logic gets a little more complicated and we have to spread it out into multiple sections.

  1. private int movieSearchID;
  2. private ObservableCollection<MovieData> movieSearchCollection;
  3.  
  4. public void WaitForMovie_UsingDelegate(int ID, ObservableCollection<MovieData> collection)
  5. {
  6.     movieSearchID = ID;
  7.     collection.CollectionChanged += new NotifyCollectionChangedEventHandler(moveCollectionChanged);
  8. }
  9.  
  10. void moveCollectionChanged(object sender, NotifyCollectionChangedEventArgs e)
  11. {
  12.     if (e.Action == NotifyCollectionChangedAction.Add)
  13.     {
  14.         var movie = ((MovieData)e.NewItems[0]);
  15.         if (movie.ID == movieSearchID)
  16.         {
  17.             DoSomething(movie);
  18.             movieSearchCollection.CollectionChanged -= new NotifyCollectionChangedEventHandler(moveCollectionChanged);
  19.         }
  20.     }
  21. }

But now we’ve lost the encapsulation the lambda provided us so its harder to read. We also have to be careful because the code isn’t reentrant due to the state variables.

Now lets look at the Reactive Extensions solution

  1. public void WaitForMovie_UsingRx(int ID, ObservableCollection<MovieData> collection)
  2. {
  3.     var changes = Observable.FromEventPattern<NotifyCollectionChangedEventArgs>(collection, "CollectionChanged");
  4.     var itemWeWant = from change in changes
  5.                      where change.EventArgs.Action == NotifyCollectionChangedAction.Add
  6.                      let movie = ((MovieData)change.EventArgs.NewItems[0])
  7.                      where movie.ID == ID
  8.                      select movie;
  9.     itemWeWant.Take(1).Subscribe( movie => DoSomething(movie));
  10. }

The Take(1) ensures we only have a single result. This causes the observable to complete after that one result. When observables complete, the subscription to the event is unsubscribed. Plus we don’t have any state to manage ourselves and the code is short and concise. It is readable and directly expresses our intended purpose to the reader.

This is an excellent pattern for any type of .NET event where you expect to receive a specific number of notifications instead of a permanent stream of notifications and you don’t want to worry about managing state or cleaning up your delegates.

Implementation detail notes

  1. In this case I know that only one item is being added to the collection at a time. This allows me to just look at NewItems[0] instead of the whole NewItems collection. If that wasn’t the case we could add logic to filter the entire NewItems collection as well, but it wouldn’t change the overall point.
  2. If you have read some of my other blog posts on Rx you might notice that sometimes I use FromEventPattern and sometimes I use FromEvent. They are essentially the same. The reason it varies depends on what type of project I am working on. FromEvent is used in the phone’s Rx library while the desktop Rx library has evolved since the phone was released and has changed the name of the method to FromEventPattern.

Comments

  • Anonymous
    November 08, 2011
    Rx is a great library that brings composability to events, I like it.That said for simple use cases like your's I'd rather stick with a variation on your 1st try:public void WaitForMovie_UsingLambda(int ID, ObservableCollection<MovieData> collection){    var handler = new NotifyCollectionChangedEventHAndler((sender, e) =>        {            if (e.Action != NotifyCollectionChangedAction.Add) return;            var movie = ((MovieData)e.NewItems[0]);            if (movie.ID != movieSearchID) return;            collection.CollectionChanged -= handler;            DoSomething(movie);        };    collection.CollectionChanged += handler;}
  • Anonymous
    November 08, 2011
    The comment has been removed