2010-02-06 23 views
8

Tengo un escenario en el que necesito eliminar un elemento para la cola apenas se procesa. entiendo que no puedo eliminar un elemento de una colección, mientras que en bucle, pero preguntaba si algo se podría hacer con el enumerador etc ...¿Cómo puedo modificar una colección de cola en un bucle?

Esto es sólo un ejemplo básico lanzando un error "Colección se modificó después de la el enumerador fue instanciado ".

¿Alguna sugerencia? ¡¡¡Muchas gracias!!!

código es el siguiente:

 class Program 
     { 
      static void Main() 
      { 

       Queue<Order> queueList = GetQueueList(); 

       foreach (Order orderItem in queueList) 
       { 
        Save(orderItem); 
        Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
        queueList.Dequeue(); 
       } 
       Console.Read(); 

      } 

      private static void Save(Order orderItem) 
      { 
       //we are pretending to save or do something. 
      } 

      private static Queue<Order>GetQueueList() 
      { 
       Queue<Order> orderQueue = new Queue<Order>(); 
       orderQueue.Enqueue(new Order { Id = 1, Name = "Order 1" }); 
       orderQueue.Enqueue(new Order { Id = 1, Name = "Order 2" }); 
       orderQueue.Enqueue(new Order { Id = 2, Name = "Order 3" }); 
       orderQueue.Enqueue(new Order { Id = 3, Name = "Order 4" }); 
       orderQueue.Enqueue(new Order { Id = 4, Name = "Order 5" }); 
       return orderQueue; 
      } 
     } 

     public class Order 
     { 
      public int Id { get; set; } 
      public string Name { get; set; } 
     } 

Respuesta

10

Cambiar la foreach para:

while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 
    Save(orderItem); 
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
} 

Editar:

Para volver a procesar salvación falla hacer algo como:

while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 

    if (!Save(orderItem)) 
    { 
     queueList.Enqueue(orderItem); // Reprocess the failed save, probably want more logic to prevent infinite loop 
    } 
    else 
    { 
     Console.WriteLine("Successfully saved: {0} Name {1} ", orderItem.Id, orderItem.Name); 
    } 
} 

Editar:

John K menciona hilo de seguridad que es una preocupación válida si tiene varios hilos que accedan al mismo Queue. Vea http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487 para una clase ThreadSafeQueue que cubre problemas sencillos de seguridad de hilos.


Editar: Aquí está el ejemplo de seguridad hilo sigo señalando que todo el mundo :-)

He aquí un ejemplo de los problemas de seguridad mencionados hilo. Como se muestra, el valor predeterminado Queue puede "perder" elementos mientras sigue disminuyendo el conteo.

Actualizado: Para representar mejor el problema. Nunca agrego un elemento nulo al Queue, pero el estándar Queue.Dequeue() devuelve varios valores nulos. Esto solo estaría bien, pero al hacerlo se eliminará un elemento válido de la colección interna y se reducirá el Count. Es una suposición segura, en este ejemplo específico, que cada artículo null devuelto de una operación Queue.Dequeue() representa un artículo válido que nunca se procesó.

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace SO_ThreadSafeQueue 
{ 
    class Program 
    { 
     static int _QueueExceptions; 
     static int _QueueNull; 
     static int _QueueProcessed; 

     static int _ThreadSafeQueueExceptions; 
     static int _ThreadSafeQueueNull; 
     static int _ThreadSafeQueueProcessed; 

     static readonly Queue<Guid?> _Queue = new Queue<Guid?>(); 
     static readonly ThreadSafeQueue<Guid?> _ThreadSafeQueue = new ThreadSafeQueue<Guid?>(); 
     static readonly Random _Random = new Random(); 

     const int Expected = 10000000; 

     static void Main() 
     { 
      Console.Clear(); 
      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Creating queues..."); 

      for (int i = 0; i < Expected; i++) 
      { 
       Guid guid = Guid.NewGuid(); 
       _Queue.Enqueue(guid); 
       _ThreadSafeQueue.Enqueue(guid); 
      } 

      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Processing queues..."); 

      for (int i = 0; i < 100; i++) 
      { 
       ThreadPool.QueueUserWorkItem(ProcessQueue); 
       ThreadPool.QueueUserWorkItem(ProcessThreadSafeQueue); 
      } 

      int progress = 0; 

      while (_Queue.Count > 0 || _ThreadSafeQueue.Count > 0) 
      { 
       Console.SetCursorPosition(0, 0); 

       switch (progress) 
       { 
        case 0: 
         { 
          Console.WriteLine("Processing queues... |"); 
          progress = 1; 
          break; 
         } 
        case 1: 
         { 
          Console.WriteLine("Processing queues... /"); 
          progress = 2; 
          break; 
         } 
        case 2: 
         { 
          Console.WriteLine("Processing queues... -"); 
          progress = 3; 
          break; 
         } 
        case 3: 
         { 
          Console.WriteLine("Processing queues... \\"); 
          progress = 0; 
          break; 
         } 
       } 

       Thread.Sleep(200); 
      } 

      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Finished processing queues..."); 
      Console.WriteLine("\r\nQueue Count:   {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _Queue.Count, _QueueProcessed, _QueueExceptions, _QueueNull); 
      Console.WriteLine("ThreadSafeQueue Count: {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _ThreadSafeQueue.Count, _ThreadSafeQueueProcessed, _ThreadSafeQueueExceptions, _ThreadSafeQueueNull); 

      Console.WriteLine("\r\nPress any key..."); 
      Console.ReadKey(); 
     } 

     static void ProcessQueue(object nothing) 
     { 
      while (_Queue.Count > 0) 
      { 
       Guid? currentItem = null; 

       try 
       { 
        currentItem = _Queue.Dequeue(); 
       } 
       catch (Exception) 
       { 
        Interlocked.Increment(ref _QueueExceptions); 
       } 

       if (currentItem != null) 
       { 
        Interlocked.Increment(ref _QueueProcessed); 
       } 
       else 
       { 
        Interlocked.Increment(ref _QueueNull); 
       } 

       Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times 
      } 
     } 

     static void ProcessThreadSafeQueue(object nothing) 
     { 
      while (_ThreadSafeQueue.Count > 0) 
      { 
       Guid? currentItem = null; 

       try 
       { 
        currentItem = _ThreadSafeQueue.Dequeue(); 
       } 
       catch (Exception) 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueExceptions); 
       } 

       if (currentItem != null) 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueProcessed); 
       } 
       else 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueNull); 
       } 

       Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times 
      } 
     } 

     /// <summary> 
     /// Represents a thread safe <see cref="Queue{T}"/> 
     /// </summary> 
     /// <typeparam name="T"></typeparam> 
     public class ThreadSafeQueue<T> : Queue<T> 
     { 
      #region Private Fields 
      private readonly object _LockObject = new object(); 
      #endregion 

      #region Public Properties 
      /// <summary> 
      /// Gets the number of elements contained in the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      public new int Count 
      { 
       get 
       { 
        int returnValue; 

        lock (_LockObject) 
        { 
         returnValue = base.Count; 
        } 

        return returnValue; 
       } 
      } 
      #endregion 

      #region Public Methods 
      /// <summary> 
      /// Removes all objects from the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      public new void Clear() 
      { 
       lock (_LockObject) 
       { 
        base.Clear(); 
       } 
      } 

      /// <summary> 
      /// Removes and returns the object at the beggining of the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      /// <returns></returns> 
      public new T Dequeue() 
      { 
       T returnValue; 

       lock (_LockObject) 
       { 
        returnValue = base.Dequeue(); 
       } 

       return returnValue; 
      } 

      /// <summary> 
      /// Adds an object to the end of the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      /// <param name="item">The object to add to the <see cref="ThreadSafeQueue{T}"/></param> 
      public new void Enqueue(T item) 
      { 
       lock (_LockObject) 
       { 
        base.Enqueue(item); 
       } 
      } 

      /// <summary> 
      /// Set the capacity to the actual number of elements in the <see cref="ThreadSafeQueue{T}"/>, if that number is less than 90 percent of current capactity. 
      /// </summary> 
      public new void TrimExcess() 
      { 
       lock (_LockObject) 
       { 
        base.TrimExcess(); 
       } 
      } 
      #endregion 
     } 

    } 
} 
+0

Hola Eso funciona. ¿Qué tal si quería dequeue el ítem solo si el guardado fue exitoso? ¿Todavía puedo hacer eso? lo siento + gracias no estoy familiarizado con la cola – user9969

+1

@ devnet247: en realidad no. Si no dequeue el elemento en la parte superior, no puede obtener el que está detrás de él. Necesita mover el elemento fallido al _tail_ de la cola como lo hace esta muestra. –

+0

Muchas gracias por su ayuda. Esto lo hará por ahora. Tengo que implementar el enhebrado completo y este proceso ocurre en un servicio wcf. Otra cosa que aprender. Gracias de nuevo – user9969

0

Para mí, parece que estás tratando de procesar el elemento en cola uno a uno.

¿Qué tal envolviendo esto en while bucle y procesar cada elemento desde Dequeue, hasta que la cola esté vacía?

+0

En el código de producción tenemos una cola de pedidos por procesar y después de cada uno de ellos necesito quitarlos. ¿Puedes mostrarme con un pequeño fragmento de lo que quieres decir? Gracias – user9969

+0

@ devnet247: Consulte mi respuesta para obtener un fragmento de código. –

1

foreach como una forma razonable para iterar a través de la cola de cuando no se está quitando elementos

Cuando se desea eliminar y elementos de proceso, la, forma correcta de flujos seguros es simplemente eliminarlos de uno en uno hora y procesarlos después de que hayan sido eliminados.

Una forma es esta

// the non-thread safe way 
// 
while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 
    Save(orderItem); 
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
} 

Es posible que el número de elementos en la cola para cambiar entre queueList.Count y queueList.Dequeue(), por lo que ser seguro para subprocesos, usted tiene que utilizar solo Dequeue, pero Dequeue lanzará cuando la cola esté vacía, por lo que deberá usar un manejador de excepciones.

// the thread safe way. 
// 
while (true) 
{ 
    Order orderItem = NULL; 
    try { orderItem = queueList.Dequeue(); } catch { break; } 
    if (null != OrderItem) 
    { 
     Save(orderItem); 
     Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
    } 
} 
+0

La prueba de la captura no resolverá el problema de seguridad del hilo ya que con muchos hilos que tocan la misma cola, es posible que Dequeue() devuelva un objeto nulo al mismo tiempo que elimina un elemento de la colección interna. Consulte http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487 para obtener una mejor opción. –

+0

@Cory: gracias. ¿Tiene una referencia sobre ese comportamiento? –

+0

@John: No solo experiencia. Recientemente, había escrito una aplicación que generaba 100 hilos para procesar archivos de una sola 'Cola 'y noté que aunque' Queue.Count' se fue a cero, otros contadores "procesados" no se sumaron al 'Queue.Count' inicial. '. Los bloqueos simples que agregué al 'ThreadSafeQueue' proporcionaron resultados consistentes sin importar cuántos hilos arrojé al mismo' Queue' –

Cuestiones relacionadas