2010-07-19 22 views
7

System.Collections.Concurrent tiene algunas colecciones nuevas que funcionan muy bien en entornos multiproceso. Sin embargo, son un poco limitados. O bloquean hasta que un artículo esté disponible o devuelven default(T) (métodos TryXXX).¿Colección concurrente sin bloqueo?

Necesito una colección que sea segura para subprocesos, pero en lugar de bloquear la cadena de llamada usa una devolución de llamada para informarme que al menos hay un elemento disponible.

Mi solución actual es usar un BlockingCollection, pero usar el APM con un delegado para obtener el siguiente elemento. En otras palabras, creo un delegado en un método que Take s de la colección, y ejecuto ese delegado usando BeginInvoke.

Desafortunadamente, tengo que mantener un montón de estado dentro de mi clase para lograr esto. Peor aún, la clase no es segura para subprocesos; solo puede ser usado por un solo hilo. Estoy bordeando el mantenimiento, que preferiría no hacer.

Sé que hay algunas bibliotecas que hacen que lo que estoy haciendo aquí sea bastante simple (creo que el Marco reactivo es uno de estos), pero me gustaría lograr mis objetivos sin agregar referencias fuera de la versión 4 del marco.

¿Hay algún patrón mejor que pueda usar que no requiera referencias externas que logren mi objetivo?


tl; dr:

¿Hay patrones que satisfacen el requisito:

"Tengo que señalar una colección que estoy listo para el siguiente elemento, y tener la colección ejecutar una devolución de llamada cuando llega el siguiente elemento, sin ningún hilo bloqueado ".

+0

¿Esto será seguro para subprocesos? ¿Qué impide que el elemento disponible deje de estar disponible antes de invocar al delegado? ¿Y cuál es su objetivo general (es decir, un sistema de colas)? –

+0

@Adam Buen punto para consumir el artículo. El delegado toma el elemento eliminado de la colección. Por lo tanto, la ejecución del delegado se bloquea hasta que un elemento sea 'Take'-en de la colección, y ese elemento es el' objeto 'que se pasa a EndInvoke. El objetivo general es un poco intrincado; esencialmente tengo que ralentizar un flujo de trabajo hasta que el artículo esté disponible. No puede bloquear la ejecución del flujo de trabajo, por lo tanto, simplemente 'Tomar' un elemento no funcionará cuando la llamada se bloquee. Tengo que crear un marcador, luego pasarlo a una extensión. La extensión invoca al delegado, reanudando el marcador dentro de la devolución de llamada. – Will

+0

Desafortunadamente, tengo poca experiencia con los flujos de trabajo: intente agregar ese detalle a su pregunta y podría despertar el interés de alguien :-) –

Respuesta

4

Creo que tengo dos soluciones posibles. No estoy particularmente satisfecho con ninguno de los dos, pero al menos ofrecen una alternativa razonable al enfoque de APM.

El primero no cumple con su requisito de no bloquear el hilo, pero creo que es bastante elegante porque puede registrar devoluciones de llamada y se llamarán en modo de contramarcha, pero todavía tiene la capacidad de llamar al Take o TryTake como lo haría normalmente para un BlockingCollection. Este código fuerza que las devoluciones de llamadas se registren cada vez que se solicita un artículo. Ese es el mecanismo de señalización para la colección. Lo bueno de este enfoque es que las llamadas al Take no se mueren de hambre como lo hacen en mi segunda solución.

public class NotifyingBlockingCollection<T> : BlockingCollection<T> 
{ 
    private Thread m_Notifier; 
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
     m_Notifier = new Thread(Notify); 
     m_Notifier.IsBackground = true; 
     m_Notifier.Start(); 
    } 

    private void Notify() 
    { 
     while (true) 
     { 
      Action<T> callback = m_Callbacks.Take(); 
      T item = Take(); 
      callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
     } 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     m_Callbacks.Add(callback); 
    } 
} 

El segundo cumple con su requisito de no bloquear el hilo. Observe cómo transfiere la invocación de la devolución de llamada al grupo de subprocesos. Hice esto porque estoy pensando que si se ejecuta de forma sincronizada, los bloqueos se mantendrían más tiempo, lo que provocaría el cuello de botella de Add y RegisterForTake. Lo he examinado detenidamente y no creo que se pueda bloquear en vivo (tanto un elemento como una devolución de llamada están disponibles, pero la devolución de llamada nunca se ejecuta), pero es posible que desee revisarlo usted mismo para verificarlo. El único problema aquí es que una llamada al Take se moriría de hambre ya que las devoluciones de llamadas siempre tienen prioridad.

public class NotifyingBlockingCollection<T> 
{ 
    private BlockingCollection<T> m_Items = new BlockingCollection<T>(); 
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
    } 

    public void Add(T item) 
    { 
     lock (m_Callbacks) 
     { 
      if (m_Callbacks.Count > 0) 
      { 
       Action<T> callback = m_Callbacks.Dequeue(); 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Items.Add(item); 
      } 
     } 
    } 

    public T Take() 
    { 
     return m_Items.Take(); 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     lock (m_Callbacks) 
     { 
      T item; 
      if (m_Items.TryTake(out item)) 
      { 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Callbacks.Enqueue(callback); 
      } 
     } 
    } 
} 
+0

Gracias por la respuesta, pero no es lo que estoy buscando. Esto es lo que estoy haciendo actualmente, pero con el APM insertado en la colección (el código que ha proporcionado). Creo que el quid de mi problema es que APM no se ajusta a mis requisitos, es solo la implementación que he usado. Mis requisitos exigen un patrón que proporcione una solución a la pregunta: "¿Cómo puedo señalizar a una colección que estoy listo para el siguiente elemento, y hacer que la colección ejecute una devolución de llamada cuando llegue ese siguiente elemento, sin ningún hilo bloqueado?" – Will

+0

Me imaginé que no era lo que buscabas. Es un problema interesante sin embargo. Lástima 'Agregar' no es' virtual', de lo contrario, podría haber sido capaz de insertar la notificación allí de alguna manera. Tal vez podría utilizar una de las implementaciones de cola de bloqueo como punto de partida. El problema es que debe tener cuidado con la forma en que entrega esa notificación, de lo contrario, otro consumidor habrá capturado primero el artículo. Podría jugar con esto hoy si tengo tiempo. Publique una respuesta usted mismo si lo resuelve. No sé ... puede que le resulte más fácil despejar y solo hacer referencia a otra biblioteca. –

+0

La notificación debe contener el siguiente elemento y debe ser controlada por el notificador. Quizás la idea de que esto sea una colección es errónea; solo a través de este mecanismo se puede suministrar el siguiente artículo, evitando así la cuestión de que dos observadores compitan por un solo artículo. En otras palabras, un observador no puede usar el mecanismo A para obtener el siguiente elemento (es decir, 'T Pop()') mientras que el otro se ha registrado para una devolución de llamada. – Will

3

¿Qué tal algo así? (La nomenclatura probablemente podría usar algo de trabajo. Y tenga en cuenta que esto no se ha probado.)

public class CallbackCollection<T> 
{ 
    // Sychronization object to prevent race conditions. 
    private object _SyncObject = new object(); 

    // A queue for callbacks that are waiting for items. 
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>(); 

    // A queue for items that are waiting for callbacks. 
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>(); 

    public void Add(T item) 
    { 
     Action<T> callback; 
     lock (_SyncObject) 
     { 
      // Try to get a callback. If no callback is available, 
      // then enqueue the item to wait for the next callback 
      // and return. 
      if (!_Callbacks.TryDequeue(out callback)) 
      { 
       _Items.Enqueue(item); 
       return; 
      } 
     } 

     ExecuteCallback(callback, item); 
    } 

    public void TakeAndCallback(Action<T> callback) 
    { 
     T item; 
     lock(_SyncObject) 
     { 
      // Try to get an item. If no item is available, then 
      // enqueue the callback to wait for the next item 
      // and return. 
      if (!_Items.TryDequeue(out item)) 
      { 
       _Callbacks.Enqueue(callback); 
       return; 
      } 
     } 
     ExecuteCallback(callback, item); 
    } 

    private void ExecuteCallback(Action<T> callback, T item) 
    { 
     // Use a new Task to execute the callback so that we don't 
     // execute it on the current thread. 
     Task.Factory.StartNew(() => callback.Invoke(item)); 
    } 
} 
+0

Acaba de actualizarse y vio @ Brian's NotifyingBlockingCollection. Parece que él y yo encontramos aproximadamente la misma solución al mismo tiempo. –

+0

Sí, definitivamente pensábamos lo mismo aquí, especialmente la parte sobre cómo obtener la invocación de la devolución de llamada fuera del hilo actual. –