No estoy seguro de si esto es posible, pero si lo es, probablemente no lo esté haciendo bien. Supongamos que tengo un búfer compartido que está vinculado a muchos consumidores (ActionBlocks). Cada consumidor debe consumir datos que satisfagan un predicado utilizado para vincularlo al búfer. Por ejemplo, ActionBlock1 debe consumir los números que satisfacen x => x % 5 == 0
, ActionBlock2 debe consumir solamente x => x % 5 == 1
etc.Vinculación de ActionBlocks creados dinámicamente a un BufferBlock
Esto es lo que tengo:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
for (int i = 0; i < NumProductionLines; i++)
{
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
}
return productionQueue;
}
Y entonces me llaman:
Random rnd = new Random();
ITargetBlock<int> temp = BuildPipeline(5);
while (true)
{
temp.Post(rnd.Next(255));
}
Sin embargo, esta No funciona. No se muestra ninguna salida en la consola. Si modifico BuildPipeline
método:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));
productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
productionQueue.LinkTo(productionLine5, x => x % 5 == 4);
return productionQueue;
}
el código hace lo que se espera que haga.
¿Alguien puede arrojar luz sobre por qué dinámicamente crear y vincular bloques de acción no funciona?
P.S. Si introduzco el código inmediatamente después de ITargetBlock<int> temp = BuildPipeline(5);
, la temperatura muestra que hay 5 objetivos vinculados al búfer. Y Id de cada objetivo es diferente.
Gracias de antemano
EDIT: Añadido cambios sugeridos por svick pero todavía no es bueno:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
for (int i = 0; i < NumProductionLines; i++)
{
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
int j = i;
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
}
ActionBlock<int> discardedLine = new ActionBlock<int>(num => Console.WriteLine("Discarded: {0}", num));
productionQueue.LinkTo(discardedLine);
return productionQueue;
}
Ahora sólo procesa los datos segunda línea de producción (el que está satisfaciendo x 5% == 1 predicado). Y los datos no satisface el predicado, es decir consigo los números que terminan en 9 y 7.
EDIT: el código de trabajo sería algo como lo siguiente:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
for (int i = 0; i < NumProductionLines; i++)
{
int j = i;
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", j + 1, num));
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
}
productionQueue.LinkTo(DataflowBlock.NullTarget<int>());
return productionQueue;
}
gracias, copiando i a la variable local lo resolvió. – Dimitri
@Dimitri Como probablemente habrás notado, debes usar la copia en el bloque lambda también. Arreglé el código en mi respuesta. – svick
Sí, reemplacé todas las apariciones de i, gracias. Además, mi código contenía errores en el bucle for: en lugar del predicado codificado, depende de la variable. – Dimitri