2011-05-23 16 views

Respuesta

0

Una posible implementación es usar un buffer en anillo con punteros de lectura y escritura separados. En cada operación de lectura/escritura, copie el puntero opuesto (debe ser seguro para subprocesos) en su contexto local y luego realice lecturas o escrituras por lotes.

en cada lectura o escritura actualizas el puntero y pulsas un evento.

Si el hilo de lectura o escritura llega a donde no tiene más trabajo que hacer espere en el otro evento de hilos antes de volver a leer el puntero apropiado.

2

Si utiliza System.Collections.Queue hilo de seguridad está garantizado de esta manera:

var queue = new Queue(); 
Queue.Synchronized(queue).Enqueue(new WorkItem()); 
Queue.Synchronized(queue).Enqueue(new WorkItem()); 
Queue.Synchronized(queue).Clear(); 

si usted quiere uso System.Collections.Generic.Queue<T> continuación, crear su propia clase contenedora. Hice esto allready con System.Collections.Generic.Stack<T>:

using System; 
using System.Collections.Generic; 

[Serializable] 
public class SomeStack 
{ 
    private readonly object stackLock = new object(); 

    private readonly Stack<WorkItem> stack; 

    public ContextStack() 
    { 
     this.stack = new Stack<WorkItem>(); 
    } 

    public IContext Push(WorkItem context) 
    { 
     lock (this.stackLock) 
     { 
      this.stack.Push(context); 
     } 

     return context; 
    } 

    public WorkItem Pop() 
    { 
     lock (this.stackLock) 
     { 
      return this.stack.Pop(); 
     } 
    } 
} 
0

puede implementar una cola de flujos seguros usando operaciones atómicas. Una vez escribí la siguiente clase para un juego multijugador. Permite múltiples hilos para escribir de forma segura a la cola, y un único otro hilo para leer de forma segura de la cola:

/// <summary> 
/// The WaitFreeQueue class implements the Queue abstract data type through a linked list. The WaitFreeQueue 
/// allows thread-safe addition and removal of elements using atomic operations. Multiple threads can add 
/// elements simultaneously, and another thread can remove elements from the queue at the same time. Only one 
/// thread can remove elements from the queue at any given time. 
/// </summary> 
/// <typeparam name="T">The type parameter</typeparam> 
public class WaitFreeQueue<T> 
{ 
    // Private fields 
    // ============== 
    #region Private fields 
    private Node<T> _tail; // The tail of the queue. 
    private Node<T> _head; // The head of the queue. 
    #endregion 



    // Public methods 
    // ============== 
    #region Public methods 
    /// <summary> 
    /// Removes the first item from the queue. This method returns a value to indicate if an item was 
    /// available, and passes the item back through an argument. 
    /// This method is not thread-safe in itself (only one thread can safely access this method at any 
    /// given time) but it is safe to call this method while other threads are enqueueing items. 
    /// 
    /// If no item was available at the time of calling this method, the returned value is initialised 
    /// to the default value that matches this instance's type parameter. For reference types, this is 
    /// a Null reference. 
    /// </summary> 
    /// <param name="value">The value.</param> 
    /// <returns>A boolean value indicating if an element was available (true) or not.</returns> 
    public bool Dequeue(ref T value) 
    { 
     bool succeeded = false; 
     value = default(T); 

     // If there is an element on the queue then we get it. 
     if (null != _head) 
     { 
      // Set the head to the next element in the list, and retrieve the old head. 
      Node<T> head = System.Threading.Interlocked.Exchange<Node<T>>(ref _head, _head.Next); 

      // Sever the element we just pulled off the queue. 
      head.Next = null; 

      // We have succeeded. 
      value = head.Value; 
      succeeded = true; 
     } 

     return succeeded; 
    } 

    /// <summary> 
    /// Adds another item to the end of the queue. This operation is thread-safe, and multiple threads 
    /// can enqueue items while a single other thread dequeues items. 
    /// </summary> 
    /// <param name="value">The value to add.</param> 
    public void Enqueue(T value) 
    { 
     // We create a new node for the specified value, and point it to itself. 
     Node<T> newNode = new Node<T>(value); 

     // In one atomic operation, set the tail of the list to the new node, and remember the old tail. 
     Node<T> previousTail = System.Threading.Interlocked.Exchange<Node<T>>(ref _tail, newNode); 

     // Link the previous tail to the new tail. 
     if (null != previousTail) 
      previousTail.Next = newNode; 

     // If this is the first node in the list, we save it as the head of the queue. 
     System.Threading.Interlocked.CompareExchange<Node<T>>(ref _head, newNode, null); 
    } // Enqueue() 
    #endregion 



    // Public constructor 
    // ================== 
    #region Public constructor 
    /// <summary> 
    /// Constructs a new WaitFreeQueue instance. 
    /// </summary> 
    public WaitFreeQueue() { } 

    /// <summary> 
    /// Constructs a new WaitFreeQueue instance based on the specified list of items. 
    /// The items will be enqueued. The list can be a Null reference. 
    /// </summary> 
    /// <param name="items">The items</param> 
    public WaitFreeQueue(IEnumerable<T> items) 
    { 
     if(null!=items) 
      foreach(T item in items) 
       this.Enqueue(item); 
    } 
    #endregion 



    // Private types 
    // ============= 
    #region Private types 
    /// <summary> 
    /// The Node class represents a single node in the linked list of a WaitFreeQueue. 
    /// It contains the queued-up value and a reference to the next node in the list. 
    /// </summary> 
    /// <typeparam name="U">The type parameter.</typeparam> 
    private class Node<U> 
    { 
     // Public fields 
     // ============= 
     #region Public fields 
     public Node<U> Next; 
     public U Value; 
     #endregion 



     // Public constructors 
     // =================== 
     #region Public constructors 
     /// <summary> 
     /// Constructs a new node with the specified value. 
     /// </summary> 
     /// <param name="value">The value</param> 
     public Node(U value) 
     { 
      this.Value = value; 
     } 
     #endregion 

    } // Node generic class 
    #endregion 

} // WaitFreeQueue class 

Si la restricción de tener sólo un hilo único de-la puesta en cola, mientras que múltiples hilos pueden en-cola está Está bien con usted, entonces podría usar eso. Fue genial para el juego porque significaba que no se requería sincronización de subprocesos.

0

Ejemplo de uso simple sería

namespace ConsoleApplication1 
{ 
    class Program 
    { 

     static void Main(string[] args) 
     { 
      ExampleQueue eq = new ExampleQueue(); 
      eq.Run(); 

      // Wait... 
      System.Threading.Thread.Sleep(100000); 
     } 


    } 

    class ExampleQueue 
    { 
     private Queue<int> _myQueue = new Queue<int>(); 

     public void Run() 
     { 
      ThreadPool.QueueUserWorkItem(new WaitCallback(PushToQueue), null); 
      ThreadPool.QueueUserWorkItem(new WaitCallback(PopFromQueue), null); 
     } 

     private void PushToQueue(object Dummy) 
     { 
      for (int i = 0; i <= 1000; i++) 
      { 
       lock (_myQueue) 
       { 
        _myQueue.Enqueue(i); 
       } 
      } 
      System.Console.WriteLine("END PushToQueue"); 

     } 

     private void PopFromQueue(object Dummy) 
     { 
      int dataElementFromQueue = -1; 
      while (dataElementFromQueue < 1000) 
      { 
       lock (_myQueue) 
       { 
        if (_myQueue.Count > 0) 
        { 
         dataElementFromQueue = _myQueue.Dequeue(); 

         // Do something with dataElementFromQueue... 
         System.Console.WriteLine("Dequeued " + dataElementFromQueue); 
        } 
       } 
      } 
      System.Console.WriteLine("END PopFromQueue"); 

     } 
    } 
} 
8

Si necesita el uso de hilo de seguridad ConcurrentQueue<T>.

+0

Sólo quiero señalar que ConcurrentQueue es para NET marcos de 4 y más; si aún se está desarrollando en comparación con los marcos 3.5 o inferiores, esto no está disponible para usted. – Will

0

Es posible que desee utilizar una cola de bloqueo, en la que el hilo que está saliendo de la cola esperará hasta que haya algunos datos disponibles.

Ver: Creating a blocking Queue<T> in .NET?

Cuestiones relacionadas