Data flow block foundation

Posted on

>>Return to C ා concurrent programming

  • 1. Introduction
  • 2. Link data flow block
  • 3. Transfer error information
  • 4. Disconnect
  • 5. Limit flow
  • 6. Parallel processing of data flow block
  • 7. Create a custom data flow block

1. Introduction

TPLThe dataflow library is powerful and can be used to creategrid(mesh) andThe Conduit(pipeline), and through themAsynchronous modeSend data.

Primary namespace:System.Threading.Tasks.Dataflow

2. Link data flow block

establishgridThe data flow blocks need to be mutualconnectGet up.

public static void LinkBlockRun()
{
    System.Console.WriteLine("Building Block link.");
    TransformBlock multiplyBlock = new TransformBlock(item =>
    {
        System.Console.WriteLine("first block.");
        Thread.Sleep(500);
        return item * 2;
    });
    var subtractBlock = new TransformBlock(item =>
    {
        System.Console.WriteLine("last block.");
        Thread.Sleep(500);
        return item - 2;
    });
    var options = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };
    multiplyBlock.LinkTo(subtractBlock, options);

    System.Console.WriteLine("Builded Block link.");

    var task = Task.Run(async () =>
    {
        System.Console.WriteLine("Posting");

        for (int i = 0; i < 3; i++)
        {
            multiplyBlock.Post(i);
        }

        System.Console.WriteLine("Posted");

        //The completion of the first block is automatically passed to the second block. 
        //It is invalid to post after completion
        multiplyBlock.Complete();

        await multiplyBlock.Completion;
        //Link used up
        System.Console.WriteLine("Block link Ended.");
    });

    task.Wait();
}

The output is:

Building Block link.
Builded Block link.
Posting
Posted
first block.
first block.
last block.
first block.
last block.
last block.
Block link Ended.

3. Transfer error information

public static void BlockErrorRun()
{
    Task.Run(async () =>
    {
        try
        {
            //Single block exception type
            var block = new TransformBlock(item =>
              {
                  if (item == 1)
                      throw new InvalidOperationException("Blech.");
                  return item * 2;
              });
            block.Post(1);
            await block.Completion;

        }
        catch (InvalidOperationException ex)
        {
            System.Console.WriteLine(ex.GetType().Name);
        }

        try
        {
            //Connected block exception type
            var multiplyBlock = new TransformBlock(item =>
             {
                 if (item == 1)
                     throw new InvalidOperationException("Blech.");
                 return item * 2;
             });
            var subtractBlock = new TransformBlock(item => item - 2);
            multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });
            multiplyBlock.Post(1);
            await subtractBlock.Completion;
        }
        catch (AggregateException ex)
        {
            System.Console.WriteLine(ex.GetType().Name);
        }

    }).Wait();
}

The output is:

InvalidOperationException
AggregateException
  • For the mostsimpleThe best way is to pass on the mistakes, and then deal with them once again.
  • For changecomplexAfter the completion of data flow, each data flow block needs to be checked.

4. Disconnect

public static void BlockDisposeRun()
{
    var multiplyBlock = new TransformBlock(item =>
    {
        System.Console.WriteLine("first block.");
        Thread.Sleep(500);
        return item * 2;
    });
    var subtractBlock = new TransformBlock(item =>
    {
        System.Console.WriteLine("last block.");
        Thread.Sleep(500);
        return item - 2;
    });

    IDisposable link = multiplyBlock.LinkTo(subtractBlock);
    multiplyBlock.Post(1);
    multiplyBlock.Post(2);
    //Unlink data flow blocks.
    //In the previous code, the data may or may not have been passed through the link. 
    //In practice, consider using code blocks instead of calling dispose. 
    link.Dispose();
    Thread.Sleep(1200);
}

The output is:

first block.
first block.

5. Limit flow

With data flow blockBoundedCapacityProperty to limit the throttling of the target block.BoundedCapacityThe default setting for isDataflowBlockOptions.Unbounded

Problems solved:

  • To prevent too much and too fast data, the first target block has to buffer all data before it can process data
public static void BlockBoundedCapacityRun()
{
var sourceBlock = new BufferBlock();
var options = new DataflowBlockOptions
{
BoundedCapacity = 10
//BoundedCapacity = DataflowBlockOptions.Unbounded
};
var targetBlockA = new BufferBlock(options);
var targetBlockB = new BufferBlock(options);
sourceBlock
.LinkTo(targetBlockA);
sourceBlock
.LinkTo(targetBlockB);

for (int i = 0; i < 31; i++)
{
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Post:{i % 10}");
sourceBlock
.Post(i % 10);
}
//Fill the water pipe with 31 water drops
//Due to the restriction of branch flow, targetblocka and targetblockb get 10 water drops each
var task = Task.Run(() =>
{
int i = 0;

System.Console.WriteLine ("process the water drop of targetblocka first, where the circular receiving will dry the water drop, but the water drop in targetblockb cannot be received");
do
{
IList res;
if (targetBlockA.TryReceiveAll(out res))
{
i
+= res.Count;
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcA:{string.Join(",", res)} {i}");
}
else
{
break;
}
Thread.Sleep(100);
} while (true);

i = 0;

System.Console.WriteLine ("handle the water drops of targetblockb, only the buffered water drops are left");
do
{
IList res;
if (targetBlockB.TryReceiveAll(out res))
{
i
+= res.Count;
System.Console.WriteLine

Leave a Reply

Your email address will not be published.