Blog moved to http://www.andrevdm.com/

Saturday 21 November 2009

BlockingCollection & parallel yields

.Net 4 has many new features. My favourite at the moment are the classes in the System.Collections.Concurrent namespace. System.Collections.Concurrent has four thread safe collections; ConcurrentQueue<>, ConcurrentStack<>, ConcurrentDictionary<> and BlockingCollection<>.
Here are two articles that discuss these collections

I’m finding the BlockingCollection’s GetConsumingEnumerable() method incredibly useful. What it does is return an IEnumerable<> that removes items from the collection and blocks while the collection is empty. This combined with the fact that the BlockingCollection is thread safe makes for very simple code.

As an example image that you have this method that downloads data from several locations (DownloadSourceStreams), parses the data (ParseStream) and yields the processed results (ImportData)

public override IEnumerable ImportData()
{
 foreach( Stream inputStream in DownloadSourceStreams() )
 {
  ParsedData data = ParseStream( inputStream );
  yield return data;
  }
}

public IEnumerable DownloadSourceStreams()
{
 foreach( string url in m_sourceUrls )
 {
  yield return DownloadStreamFrom( url );
 }
}


It would make sense to download all the data in parallel. However you can’t simply make the foreach in ImportData() a Parallel.ForEach since you can't yield from within a Parallel.ForEach. So what you need to do is download the data in parallel and then have a single point for yielding all the results. The BlockingCollection makes this easy


private BlockingCollection m_imported = new BlockingCollection();

public override IEnumerable ImportData()
{
 Task.Factory.StartNew( ParallelImportData );
 return m_imported.GetConsumingEnumerable();
}

private void ParallelImportData()
{
 Parallel.ForEach( DownloadSourceStreams(), inputStream =>
 {
  ParsedData data = ParseStream( inputStream );
  m_imported.Add( data );
  } );
  
  m_imported.CompleteAdding();
}

public IEnumerable DownloadSourceStreams()
{
 foreach( string url in m_sourceUrls )
 {
  yield return DownloadStreamFrom( url );
 }
}



The code now works as follows


  • ImportData() starts a new task to import the data (line 5) and then returns the consuming enumerable (line 6)
  • ParallelImportData() is then called on a separate thread/task and does the downloads in parallel by using Parallel.Foreach (line 11). Each imported item is then added to the blocking collection (line 14).
  • When the import has been completed the BlockingCollection’s CompletedAdding() method is called.




When the consumer calls ImportData() it gets an IEnumerable (consuming enumerable) that blocks while there is no data in the collection. As soon as there is data it iterates over it and removes it from the base collection. This continues until CompletedAdding is called.


What is striking about this code is that all of this happens without any explicit locking, the blocking collection handles it all for you.