2010-06-29 21 views
5

hola quiero hilos para colaborar con un productor y un consumidor. el consumidor es bastante lento, y el productor es muy rápido y funciona en ráfagas.C# comunicación entre hilos

por ejemplo, el consumidor puede procesar un mensaje cada 20 segundos, y el productor puede producir 10 mensajes en un segundo, pero lo hace aproximadamente una vez para que el consumidor pueda ponerse al día.

Quiero algo como:

Stream commonStream; 
AutoResetEvent commonLock; 

void Producer() 
{ 
    while (true) 
    { 
    magic.BlockUntilMagicAvalible(); 
    byte[] buffer = magic.Produce(); 
    commonStream.Write(buffer); 
    commonLock.Set(); 
    } 
} 

void Consumer() 
{ 
    while(true) 
    { 
    commonLock.WaitOne(); 
    MagicalObject o = binarySerializer.Deserialize(commonStream); 
    DoSomething(o); 
    } 
} 
+0

Qué versión de .Net estás usando, hay algunas cosas nuevas en v4 para exactamente esto –

+0

.Net 3.5; Los comentarios deben tener al menos 15 caracteres de longitud. –

Respuesta

11

Si tiene .Net 4.0 o más alto que puede hacerlo de esta manera mediante el uso de un BlockingCollection

int maxBufferCap = 500; 
BlockingCollection<MagicalObject> Collection 
          = new BlockingCollection<MagicalObject>(maxBufferCap); 
void Producer() 
{ 
    while (magic.HasMoreMagic) 
    { 
     this.Collection.Add(magic.ProduceMagic()); 
    } 
    this.Collection.CompleteAdding(); 
} 

void Consumer() 
{ 
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable()) 
    { 
     DoSomthing(magicalObject); 
    } 
} 

La línea foreach va a dormir si no hay datos en la memoria intermedia, también se activará automáticamente cuando se agregue algo a la colección.

El motivo por el que establezco el búfer máximo es que si su productor es mucho más rápido que el consumidor, puede terminar consumiendo mucha memoria a medida que más objetos entran en la colección. Al configurar un tamaño de búfer máximo a medida que crea la colección de bloqueo cuando se alcanza el tamaño de búfer, la llamada Add en el productor se bloqueará hasta que el consumidor elimine un elemento de la colección.

Otra ventaja de la clase BlockingCollection es que puede tener tantos productores y consumidores como desee, no necesita ser una relación 1: 1. Si DoSomthing admite que usted podría tener un bucle foreach por núcleo del ordenador (o incluso el uso Parallel.ForEach y utilizar el enumerables consume como fuente de datos)

void ConsumersInParalell() 
{ 
    //This assumes the method signature of DoSomthing is one of the following: 
    // Action<MagicalObject> 
    // Action<MagicalObject, ParallelLoopState> 
    // Action<MagicalObject, ParallelLoopState, long> 
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing); 
} 
+2

Tenga en cuenta que el TPL ha sido retransmitido a .NET 3.5: http://codeblog.theg2.net/2010/02/tpl-and-parallelforeach-in- net-35-usage.html –

0

Usted puede conseguir lo que desea usando una cola y temporizador. El productor agrega valores a la cola e inicia el temporizador del consumidor. El evento transcurrido del temporizador del consumidor (que está en un hilo de Threadpool) detiene el temporizador, y pasa por la cola hasta que se vacía, luego desaparece (no hay sondeo innecesario). El productor puede agregar a la cola mientras el consumidor todavía está funcionando.

System.Timers.Timer consumerTimer; 
Queue<byte[]> queue = new Queue<byte[]>(); 

void Producer() 
{ 
    consumerTimer = new System.Timers.Timer(1000); 
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed); 
    while (true) 
    { 
     magic.BlockUntilMagicAvailable(); 
     lock (queue) 
     { 
      queue.Enqueue(magic.Produce()); 
      if (!consumerTimer.Enabled) 
      { 
       consumerTimer.Start(); 
      } 
     } 
    } 
} 

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
{ 
    while (true) 
    { 
     consumerTimer.Stop(); 
     lock (queue) 
     { 
      if (queue.Count > 0) 
      { 
       DoSomething(queue.Dequeue()); 
      } 
      else 
      { 
       break; 
      } 
     } 
    } 
} 
+0

su fragmento no es seguro para hilos ... y el mío no implica sondeo –

+0

¿Qué no es seguro para los hilos? Y no sondea: el temporizador es de una sola vez y solo se activa cuando el productor agrega a la cola. –

-1

Uso Mutex's. La idea es que ambos se ejecuten en diferentes hilos. El subproceso Consumidor se inicia bloqueado por un mutex, donde permanecerá indefinidamente hasta que lo libere el Productor. Luego procesará los datos en paralelo, dejando que el Productor continúe. El consumidor se volverá a bloquear cuando se complete.

(Código de inicio de la rosca, y otros bits de calidad se han omitido por razones de brevedad.)

// Pre-create mutex owned by Producer thread, then start Consumer thread. 
Mutex mutex = new Mutex(true); 
Queue<T> queue = new Queue<T>(); 

void Producer_AddData(T data) 
{ 
    lock (queue) { 
    queue.Enqueue(GetData()); 
    } 

    // Release mutex to start thread: 
    mutex.ReleaseMutex(); 
    mutex.WaitOne(); 
} 

void Consumer() 
{ 
    while(true) 
    { 
    // Wait indefinitely on mutex 
    mutex.WaitOne(); 
    mutex.ReleaseMutex(); 

    T data; 
    lock (queue) { 
     data = queue.Dequeue(); 
    } 
    DoSomething(data); 
    } 

}

Esto ralentiza el Productor por unos pocos milisegundos mientras se espera a que el consumidor para despertar y libera el mutex. Si puedes vivir con eso.

+0

Usar un 'BlockingCollection' es mucho mejor. En primer lugar, es mucho más obvio cuando es correcto que usar mutexes, y a diferencia de su modelo, el productor y el consumidor pueden trabajar en paralelo; usted se está asegurando de que su código * * produzca * o * consuma, pero nunca ambos. Tampoco se adapta bien a más de un productor o más de un consumidor, a diferencia de una colección de bloqueo donde hacerlo es trivial. Podría utilizar un enfoque mutex más complejo que tenga los beneficios de una colección de bloqueo, pero sería mucho * trabajo y sería mucho menos legible/mantenible. – Servy

+0

BlockingColletion no está disponible para mí, ya que no puedo ejecutar 4.5. Si pudiera, esta sería probablemente la solución correcta. Sin embargo, este código se ejecuta en paralelo. Puede que no haya sido claro, pero los dos están en diferentes hilos. Utilizo esto para ejecutar pesadas consultas SQL en un hilo, mientras recopilo datos en otro hilo y funciona bien para mí. – Ben

+0

BlockingCollection se agregó en 4.0, no en 4.5. – Servy