2011-05-23 16 views



Una posible implementación es usar un buffer en anillo con punteros de lectura y escritura separados. En cada operación de lectura/escritura, copie el puntero opuesto (debe ser seguro para subprocesos) en su contexto local y luego realice lecturas o escrituras por lotes.

en cada lectura o escritura actualizas el puntero y pulsas un evento.

Si el hilo de lectura o escritura llega a donde no tiene más trabajo que hacer espere en el otro evento de hilos antes de volver a leer el puntero apropiado.


Si utiliza System.Collections.Queue hilo de seguridad está garantizado de esta manera:

var queue = new Queue(); 
Queue.Synchronized(queue).Enqueue(new WorkItem()); 
Queue.Synchronized(queue).Enqueue(new WorkItem()); 

si usted quiere uso System.Collections.Generic.Queue<T> continuación, crear su propia clase contenedora. Hice esto allready con System.Collections.Generic.Stack<T>:

using System; 
using System.Collections.Generic; 

public class SomeStack 
    private readonly object stackLock = new object(); 

    private readonly Stack<WorkItem> stack; 

    public ContextStack() 
     this.stack = new Stack<WorkItem>(); 

    public IContext Push(WorkItem context) 
     lock (this.stackLock) 

     return context; 

    public WorkItem Pop() 
     lock (this.stackLock) 
      return this.stack.Pop(); 

puede implementar una cola de flujos seguros usando operaciones atómicas. Una vez escribí la siguiente clase para un juego multijugador. Permite múltiples hilos para escribir de forma segura a la cola, y un único otro hilo para leer de forma segura de la cola:

/// <summary> 
/// The WaitFreeQueue class implements the Queue abstract data type through a linked list. The WaitFreeQueue 
/// allows thread-safe addition and removal of elements using atomic operations. Multiple threads can add 
/// elements simultaneously, and another thread can remove elements from the queue at the same time. Only one 
/// thread can remove elements from the queue at any given time. 
/// </summary> 
/// <typeparam name="T">The type parameter</typeparam> 
public class WaitFreeQueue<T> 
    // Private fields 
    // ============== 
    #region Private fields 
    private Node<T> _tail; // The tail of the queue. 
    private Node<T> _head; // The head of the queue. 

    // Public methods 
    // ============== 
    #region Public methods 
    /// <summary> 
    /// Removes the first item from the queue. This method returns a value to indicate if an item was 
    /// available, and passes the item back through an argument. 
    /// This method is not thread-safe in itself (only one thread can safely access this method at any 
    /// given time) but it is safe to call this method while other threads are enqueueing items. 
    /// If no item was available at the time of calling this method, the returned value is initialised 
    /// to the default value that matches this instance's type parameter. For reference types, this is 
    /// a Null reference. 
    /// </summary> 
    /// <param name="value">The value.</param> 
    /// <returns>A boolean value indicating if an element was available (true) or not.</returns> 
    public bool Dequeue(ref T value) 
     bool succeeded = false; 
     value = default(T); 

     // If there is an element on the queue then we get it. 
     if (null != _head) 
      // Set the head to the next element in the list, and retrieve the old head. 
      Node<T> head = System.Threading.Interlocked.Exchange<Node<T>>(ref _head, _head.Next); 

      // Sever the element we just pulled off the queue. 
      head.Next = null; 

      // We have succeeded. 
      value = head.Value; 
      succeeded = true; 

     return succeeded; 

    /// <summary> 
    /// Adds another item to the end of the queue. This operation is thread-safe, and multiple threads 
    /// can enqueue items while a single other thread dequeues items. 
    /// </summary> 
    /// <param name="value">The value to add.</param> 
    public void Enqueue(T value) 
     // We create a new node for the specified value, and point it to itself. 
     Node<T> newNode = new Node<T>(value); 

     // In one atomic operation, set the tail of the list to the new node, and remember the old tail. 
     Node<T> previousTail = System.Threading.Interlocked.Exchange<Node<T>>(ref _tail, newNode); 

     // Link the previous tail to the new tail. 
     if (null != previousTail) 
      previousTail.Next = newNode; 

     // If this is the first node in the list, we save it as the head of the queue. 
     System.Threading.Interlocked.CompareExchange<Node<T>>(ref _head, newNode, null); 
    } // Enqueue() 

    // Public constructor 
    // ================== 
    #region Public constructor 
    /// <summary> 
    /// Constructs a new WaitFreeQueue instance. 
    /// </summary> 
    public WaitFreeQueue() { } 

    /// <summary> 
    /// Constructs a new WaitFreeQueue instance based on the specified list of items. 
    /// The items will be enqueued. The list can be a Null reference. 
    /// </summary> 
    /// <param name="items">The items</param> 
    public WaitFreeQueue(IEnumerable<T> items) 
      foreach(T item in items) 

    // Private types 
    // ============= 
    #region Private types 
    /// <summary> 
    /// The Node class represents a single node in the linked list of a WaitFreeQueue. 
    /// It contains the queued-up value and a reference to the next node in the list. 
    /// </summary> 
    /// <typeparam name="U">The type parameter.</typeparam> 
    private class Node<U> 
     // Public fields 
     // ============= 
     #region Public fields 
     public Node<U> Next; 
     public U Value; 

     // Public constructors 
     // =================== 
     #region Public constructors 
     /// <summary> 
     /// Constructs a new node with the specified value. 
     /// </summary> 
     /// <param name="value">The value</param> 
     public Node(U value) 
      this.Value = value; 

    } // Node generic class 

} // WaitFreeQueue class 

Si la restricción de tener sólo un hilo único de-la puesta en cola, mientras que múltiples hilos pueden en-cola está Está bien con usted, entonces podría usar eso. Fue genial para el juego porque significaba que no se requería sincronización de subprocesos.


Ejemplo de uso simple sería

namespace ConsoleApplication1 
    class Program 

     static void Main(string[] args) 
      ExampleQueue eq = new ExampleQueue(); 

      // Wait... 


    class ExampleQueue 
     private Queue<int> _myQueue = new Queue<int>(); 

     public void Run() 
      ThreadPool.QueueUserWorkItem(new WaitCallback(PushToQueue), null); 
      ThreadPool.QueueUserWorkItem(new WaitCallback(PopFromQueue), null); 

     private void PushToQueue(object Dummy) 
      for (int i = 0; i <= 1000; i++) 
       lock (_myQueue) 
      System.Console.WriteLine("END PushToQueue"); 


     private void PopFromQueue(object Dummy) 
      int dataElementFromQueue = -1; 
      while (dataElementFromQueue < 1000) 
       lock (_myQueue) 
        if (_myQueue.Count > 0) 
         dataElementFromQueue = _myQueue.Dequeue(); 

         // Do something with dataElementFromQueue... 
         System.Console.WriteLine("Dequeued " + dataElementFromQueue); 
      System.Console.WriteLine("END PopFromQueue"); 


Si necesita el uso de hilo de seguridad ConcurrentQueue<T>.


Sólo quiero señalar que ConcurrentQueue es para NET marcos de 4 y más; si aún se está desarrollando en comparación con los marcos 3.5 o inferiores, esto no está disponible para usted. – Will


Es posible que desee utilizar una cola de bloqueo, en la que el hilo que está saliendo de la cola esperará hasta que haya algunos datos disponibles.

Ver: Creating a blocking Queue<T> in .NET?

Cuestiones relacionadas