2012-08-14 36 views
16

Estoy usando un BlockingCollection para implementar un patrón de productor/consumidor. Tengo un bucle asíncrono que llena la colección con los datos que se procesarán, que luego pueden ser accedidos por el cliente mucho más tarde. Los paquetes llegan escasamente y me gustaría que la votación se realice sin usar una llamada de bloqueo para tomar.Toma asincrónica de la colección de bloqueo

En esencia, estoy buscando algo así como un BeginTake y EndTake que no existe en la colección de bloqueo para que pueda hacer uso del grupo de subprocesos interno en una devolución de llamada. No tiene que ser un BlockingCollection de ninguna manera. Cualquier cosa que haga lo que necesito sería genial.

Esto es lo que tengo ahora. _bufferedPackets es una BlockingCollection<byte[]>:

public byte[] Read(int timeout) 
{ 
    byte[] result; 
    if (_bufferedPackets.IsCompleted) 
    { 
     throw new Exception("Out of packets"); 
    } 
    _bufferedPackets.TryTake(out result, timeout);  
    return result; 
} 

me gustaría que esto es algo como esto, en pseudocódigo:

public void Read(int timeout) 
{ 
    _bufferedPackets.BeginTake(result => 
     { 
      var bytes = _bufferedPackets.EndTake(result); 
      // Process the bytes, or the resuting timeout 
     }, timeout, _bufferedPackets); 
} 

¿Cuáles son mis opciones para este? No quiero colocar ningún hilo en estado de espera, ya que hay muchas otras cosas IO que procesar, y me quedaría sin hilos bastante rápido.

Actualización: He reescrito el código en cuestión de utilizar el proceso asíncrono de manera diferente, cambiando esencialmente devoluciones de llamada en base a si hay una solicitud de espera dentro del límite de tiempo de espera. Esto funciona bien, pero igual sería increíble si hubiera una forma de hacerlo sin tener que recurrir a temporizadores e intercambiar lambdas, lo que potencialmente causa condiciones de carrera y es difícil de escribir (y comprender). También he resuelto esto con una implementación propia de una cola asíncrona, pero aún sería genial si hubiera una opción más estándar y mejor probada.

+0

Por el momento, creo que ninguna colección de TPL proporciona métodos asíncronos, excepto ObservableCollection a la IU. Qué piensas ? –

+0

Puede envolver esto en una 'Tarea task = Task.Factory.StartNew (() => {// Su código devuelve byte []});' sin embargo, esto no es sencillo y debe haber una mejor manera .. – MoonKnight

+0

El empaquetado de una tarea consumirá una tarea que se bloqueará en un identificador de espera. Dado que hay muchas tareas en marcha que ocuparán una tarea para siempre, lo que hará que me queden sin tareas en la piscina desafortunadamente. – Dervall

Respuesta

0

Así que no parece haber una opción incorporada para esto, salí y traté de hacer mi mejor esfuerzo para hacer lo que quería como experimento. Resulta que hay mucho por hacer para hacer que esto funcione más o menos como otros usuarios del viejo patrón asincrónico.

public class AsyncQueue<T> 
{ 
    private readonly ConcurrentQueue<T> queue; 
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult 
    { 
     public bool IsCompleted { get; set; } 
     public WaitHandle AsyncWaitHandle { get; set; } 
     public object AsyncState { get; set; } 
     public bool CompletedSynchronously { get; set; } 
     public T Result { get; set; } 

     public AsyncCallback Callback { get; set; } 
    } 

    public AsyncQueue() 
    { 
     dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>(); 
     queue = new ConcurrentQueue<T>(); 
    } 

    public void Enqueue(T item) 
    { 
     DequeueAsyncResult asyncResult; 
     while (dequeueQueue.TryDequeue(out asyncResult)) 
     { 
      if (!asyncResult.IsCompleted) 
      { 
       asyncResult.IsCompleted = true; 
       asyncResult.Result = item; 

       ThreadPool.QueueUserWorkItem(state => 
       { 
        if (asyncResult.Callback != null) 
        { 
         asyncResult.Callback(asyncResult); 
        } 
        else 
        { 
         ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set(); 
        } 
       }); 
       return; 
      } 
     } 
     queue.Enqueue(item); 
    } 

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state) 
    { 
     T result; 
     if (queue.TryDequeue(out result)) 
     { 
      var dequeueAsyncResult = new DequeueAsyncResult 
      { 
       IsCompleted = true, 
       AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
       AsyncState = state, 
       CompletedSynchronously = true, 
       Result = result 
      }; 
      if (null != callback) 
      { 
       callback(dequeueAsyncResult); 
      } 
      return dequeueAsyncResult; 
     } 

     var pendingResult = new DequeueAsyncResult 
     { 
      AsyncState = state, 
      IsCompleted = false, 
      AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
      CompletedSynchronously = false, 
      Callback = callback 
     }; 
     dequeueQueue.Enqueue(pendingResult); 
     Timer t = null; 
     t = new Timer(_ => 
     { 
      if (!pendingResult.IsCompleted) 
      { 
       pendingResult.IsCompleted = true; 
       if (null != callback) 
       { 
        callback(pendingResult); 
       } 
       else 
       { 
        ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set(); 
       } 
      } 
      t.Dispose(); 
     }, new object(), timeout, Timeout.Infinite); 

     return pendingResult; 
    } 

    public T EndDequeue(IAsyncResult result) 
    { 
     var dequeueResult = (DequeueAsyncResult) result; 
     return dequeueResult.Result; 
    } 
} 

no estoy muy seguro acerca de la sincronización de la propiedad IsComplete, y no estoy demasiado caliente en la forma en que el dequeueQueue solamente se limpie en posteriores llamadas Enqueue. No estoy seguro de cuándo es el momento adecuado para señalar las manijas de espera, pero esta es la mejor solución que tengo hasta ahora.

Por favor, no considere este código de calidad de producción de ninguna manera.Solo quería mostrar el sentido general de cómo llegué para mantener todos los hilos girando sin esperar bloqueos. Estoy seguro de que está lleno de todo tipo de casos extremos y errores, pero cumple con los requisitos y quería devolver algo a las personas que se topan con la pregunta.

+0

No entiendo completamente su modelo de subprocesamiento. Usted accede en EndDeque el Resultado, independientemente de si la devolución de llamada se ha completado o se ha agotado el tiempo de espera.Si recorre la propiedad IsCompleted hasta que obtenga una respuesta, todavía está bloqueando un hilo. –

+0

No se bloqueará porque se está ejecutando en un temporizador, que no iniciará un subproceso, sino que solo pondrá en cola las tareas del grupo. También se requiere Enddequeue en los tiempos de espera, el resultado sería el predeterminado en esos casos. – Dervall

+0

Me refiero a quién llama EndDequeue? El patrón Begin/End asegura que puede generar trabajo, pero en algún momento debe llamar al método End correspondiente. Lo que hace actualmente es registrar las devoluciones de llamadas y llamarlas durante el método Enqueue y esperar un poco a través del temporizador si, mientras tanto, llegaron algunos datos. Puedes hacerlo sin temporizadores simplemente con una cola de observadores donde en Enqueue almacenas su tiempo de secuencia. Cuando llegan los datos, deque a un observador demasiado antiguo y los omite y llama a la devolución de llamada que aún no ha caducado. –

0

Es posible que esté malinterpretando su situación, pero ¿no puede usar una colección no bloqueante?

creé este ejemplo para ilustrar:

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace AsyncTakeFromBlockingCollection 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var queue = new ConcurrentQueue<string>(); 

      var producer1 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("======="); 
        Thread.Sleep(10); 
       } 
      }); 

      var producer2 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("*******"); 
        Thread.Sleep(3); 
       } 
      }); 

      CreateConsumerTask("One ", 3, queue); 
      CreateConsumerTask("Two ", 4, queue); 
      CreateConsumerTask("Three", 7, queue); 

      producer1.Wait(); 
      producer2.Wait(); 
      Console.WriteLine(" Producers Finished"); 
      Console.ReadLine(); 
     } 

     static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue) 
     { 
      Task.Factory.StartNew(() => 
      { 
       while (true) 
       { 
        string result; 
        if (queue.TryDequeue(out result)) 
        { 
         Console.WriteLine(" {0} consumed {1}", taskName, result); 
        } 
        Thread.Sleep(sleepTime); 
       } 
      }); 
     } 
    } 
} 

Aquí está la salida del programa

enter image description here

Creo que el BlockingCollection está destinado a envolver una colección concurrente y proporcionar un mecanismo para permitir que múltiples consumidores a bloquear; esperando productores Este uso parece ser contrario a sus requisitos.

Encontré este article about the BlockingCollection class para ser útil.

+0

No puedo hacer eso desafortunadamente. Hay muchas colas separadas que se llenan muy escasamente después de las terminaciones IO. Puede haber miles de estos. Estos solo se consumen cuando ocurre otro evento IO, que se ejecuta en una devolución de llamada de finalización IO y, por lo tanto, no puede bloquear. En esencia, hay productores y consumidores infrecuentes, todos ejecutando la finalización de IO. El consumidor necesita saber si la colección ha tenido elementos agregados dentro de un tiempo de espera determinado sin colocarse en una llamada de bloqueo. – Dervall

+0

¿Cómo se asignan los consumidores a las colas? Un consumidor por cada cola? ¿Los consumidores iteran a través de las colas en busca de productos? – rtev

+0

Los consumidores llegan de forma asíncrona a las devoluciones de llamada de IO y se les exige que saquen cosas de la colección o esperen un período de tiempo determinado antes de abandonar si la colección no se llena con datos durante ese tiempo. Sin iteración, básicamente es una cola – Dervall

0

Estoy bastante seguro de que BlockingCollection<T> no puede hacer esto, tendrá que hacer las suyas. Se me ocurrió esto:

class NotifyingCollection<T> 
{ 
    private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>(); 
    private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>(); 

    private object _lock = new object(); 

    public void Add(T item) 
    { 
     _overflow.Enqueue(item); 
     Dispatch(); 
    } 

    private void Dispatch() 
    { 
     // this lock is needed since we need to atomically dequeue from both queues... 
     lock (_lock) 
     { 
      while (_overflow.Count > 0 && _subscribers.Count > 0) 
      { 
       Action<T> callback; 
       T item; 

       var r1 = _overflow.TryDequeue(out item); 
       var r2 = _subscribers.TryDequeue(out callback); 

       Debug.Assert(r1 && r2); 
       callback(item); 
       // or, optionally so that the caller thread's doesn't take too long ... 
       Task.Factory.StartNew(() => callback(item)); 
       // but you'll have to consider how exceptions will be handled. 
      } 
     } 
    } 

    public void TakeAsync(Action<T> callback) 
    { 
     _subscribers.Enqueue(callback); 
     Dispatch(); 
    } 
} 

He utilizado el hilo que llama TakeAsync() o Add() para servir como el hilo de devolución de llamada. Cuando llame al Add() o TakeAsync(), intentará enviar todos los elementos en cola a las devoluciones de llamada en cola. De esta forma, no se crea ningún hilo que permanezca allí dormido, esperando que se lo señale.

Ese bloqueo es feo, pero podrá enrutar y suscribirse en varios hilos sin bloqueo. No pude encontrar la forma de hacer el equivalente a , solo dequeue uno si hay algo disponible en la otra cola sin usar ese bloqueo.

Nota: Solo probé esto mínimamente, con algunos hilos.

+0

Bueno, sí. El único problema es que crearás un hilo bloqueado usando tu enfoque, que sería peligroso considerando una carga muy alta. Los bloques de llamada 'TryTake'. En una situación en la que tenga miles de estas llamadas, se le acabarán los hilos en el grupo de subprocesos de tareas y se bloqueará su aplicación. – Dervall

+0

¡Ah! no querías ningún hilo de bloqueo por completo; Pensé que solo querías asegurarte de que la cadena de llamadas no se bloqueara. Ahora lo entiendo. – atanamir

+0

OK, modifiqué mi respuesta con una que creo que hará lo que está buscando ... – atanamir

Cuestiones relacionadas