Vlad's blog

In programming veritas

Introduction to TPL Dataflow

leave a comment »

TPL Dataflow Library is one of underestimated libraries which have not gained as much popularity as other TPL components. I believe the main reason is a fact that Microsoft did not include Dataflow Library in the .NET Framework 4.5, you should install it separately. Anyway in this post I am going to quickly describe main features of TPL Dataflow and answer a question why and when you need to use this library.

TPL Dataflow Library is designed to help programmers write concurrent enabled applications. You may say that this is what TPL is all about. True, but Dataflow Library is supposed to be used in very specific scenarios. Imagine you are developing an application that has to process many request which is typical scenario for server side applications. And processing of a request involves many concurrent tasks: Web API calls, database transactions, file IO operations, image processing etc. Also you want to be able to control resource utilization in order to keep your server responsive. Certainly, you can do everything described above using TPL tasks, develop own queues for message prioritization and use synchronization primitives to protect shared data. TPL Dataflow provides a programming model that hides low level details of managing concurrent workflows and allows you to focus on your business logic.

Programming Model

The main concept in Dataflow is a block. You can consider a block as data structure that can buffer and process data. Normally your Dataflow application is a collection of linked blocks forming a workflow specific for your needs. Actual communication between blocks is organized in a form of messages passed from one block (source) to another (target). There are different types of blocks: data transform block, buffer block, action block. You can build a simple pipeline which is a linear sequence of blocks or network, which is a graph of blocks.

Common blocks

Transform block

Transform block is used to receive input value and return transformed value. In the example below TransformBlock receives user names and returns user photos.

var photoService = new PhotoService();
var transformBlock = new TransformBlock<string, Photo>(async userName =>
{
 Photo userPhoto = await photoService.GetUserPhoto(userName);

 return userPhoto;
});

Action block

Action block is receiving data but don’t pass it to another block. Usually this is the last block in a chain. You can used it, for instance, to update user interface.

var usersGallery = new UsersGallery();
var actionBlock = new ActionBlock<Photo>(photo => usersGallery.AddPhoto(photo));

Linking blocks

Blocks themselves are useless unless you link blocks in a pipeline or network. Code below will link transformBlock to actionBlock. So after photos are received from  photo service they are passed to actionBlock to update user interface.

transformBlock.LinkTo(actionBlock);

Starting point

So we have blocks for performing individual message processing, we linked blocks in a pipeline. How messages are passed to the first block in a network? Each block has method Post for posting messages.

for (int i = 0; i < 10; i++)
{
  transformBlock.Post("User" + i);
}

Finishing processing

In many scenarios we want to know when our pipeline completely processed some portion of data. In order to do that we need to call method Complete() of the first block in pipeline.

transformBlock.Complete();

This method will tell Dataflow engine not expect other messages. If we want to know when processing is finished we can Completion.Wait() for terminal block.

actionBlock.Completion.Wait();

In this example we are waiting for the last block completion task.

Complete example

var usersGallery = new UsersGallery();

var photoService = new PhotoService();

var transformBlock = new TransformBlock<string, Photo>(async userName =>
{
 Photo userPhoto = await photoService.GetUserPhoto(userName);

 return userPhoto;
});

var actionBlock = new ActionBlock<Photo>(photo => usersGallery.AddPhoto(photo));

transformBlock.LinkTo(actionBlock);

for (int i = 0; i < 10; i++)
{
 transformBlock.Post("User" + i);
}

transformBlock.Complete();

actionBlock.Completion.Wait();

Other usefull blocks

Buffer block

Buffer block represents a queue where you can store data while other blocks are busy with some work. In our example with users photos we call Web service to get user photo. If we don’t want to pollute service with dozens of requests we might decide to wait until Web request is complete. If we don’t want to loose messages we need a simple FIFO queue to store messages.

var bufferBlock = new BufferBlock<string>();

var transformBlock = new TransformBlock<string, Photo>(async userName =>
{
 Photo userPhoto = await photoService.GetUserPhoto(userName);

 return userPhoto;
});

bufferBlock.LinkTo(transformBlock);

Batch block

Batch block is useful when you want to group messages before passing them to the next block. In our example we might decide to send a batch of users to get list of users photos in one Web service call.

var batchBlock = new BatchBlock<string>(10)

var transformBlock = new TransformBlock<string[], Photo[]>(async userName =>
{
});

batchBlock.LinkTo(transformBlock);

Notice that TransformBlock is now receiving array of strings rather than one string.

Advanced technics

Maximum Degree of Parallelism

Often you want to control how many messages must be processed concurrently. For example we might want to limit maximum amount of pending Web request. You can do that when constructing block instance.

var transformBlock = new TransformBlock<string[], Photo[]>(async userName =>
{
}, new ExecutionDataflowBlockOptions() {MaxDegreeOfParallelism = 5});

Dataflow Block Completion

We use Dataflow to implement particular workflow. Since we don’t have a control over message passing through  chain of blocks we need to know when workflow is completed. In order to do that we need to do two things. First we need inform Dataflow that we are not going to send further messages by invoking method Complete(). Also we use Completion property of one of terminal blocks to wait until messages are completely processed.

for (int i = 0; i < 10; i++)
{
 transformBlock.Post("User" + i);
}

transformBlock.Complete();

actionBlock.Completion.Wait();

Exception handling

Dataflow is able to propagate exception raised in one block through all set of blocks and handle it as AggregateException in Completion.Wait() method. Main problem with that approach is a fact that pipeline stops processing messages followed by message that raised an exception. So if you don’t expect such behavior  you need to handle exception explicitly.

var transformBlock = new TransformBlock<string, Photo>(async userName =>
{
try
{
  Photo userPhoto = await photoService.GetUserPhoto(userName);

  return userPhoto;
}
catch (Exception)
{
  // Don't allow exception to break workflow unless you really need that!
}

return null;
});

Linking blocks: alternative

LinkTo() is common way to link blocks but it has some limitation. It is impossible to organize if/else logic depending on some condition. For example, if we already have user photo on the disk there is no need to call Web service.  So we have to bypass transform block for downloading image.

var ifElseBlock = new ActionBlock<string>(userName =>
{
  Photo photo = GetPhotoFromCache(userName);

  if (photo != null)
  {
    actionBlock.Post(photo);
  }
  else
  {
    transformBlock.Post(userName);
  }
});

transformBlock.LinkTo(actionBlock, new DataflowLinkOptions() {PropagateCompletion = true});

for (int i = 0; i < 10; i++)
{
  ifElseBlock.Post("User" + i);
}

ifElseBlock.Complete();

actionBlock.Completion.Wait();

Conclusion

TPL Dataflow is undoubtedly powerful tool for managing concurrency in complex workflows but there are some caveats. The main problem is a high entry level. In order to code something more complex than trivial producer consumer pattern you have to invest a significant amount of time in learning basics. Since real dataflow is hidden from a developer sometimes it is hard to find a reason of some issue. Rather than tracing an issue in a debugger you have to guess why your workflow was processed this way or another. Also documentation could be more comprehensive. I am not saying it is bad and there are a lot of questions answered on StackOverflow but as I said at the beginning this is not very popular library. So my recommendation is to think twice before making a decision, probably you over complicate your problem. Use TPL dataflow for really complex worfklows where you want to have a control on degree of parallelism or use batching strategies. Also message passing model is a good alternative of using shared data structures.

Advertisements

Written by vsukhachev

March 5, 2017 at 12:32 pm

Posted in Development

Tagged with ,

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: