2010-10-21 17 views
6

Utilizo BlockingCollection para implementar un patrón productor-consumidor en C# 4.0.BlockingCollection in Task Parallel Library no libera automáticamente la referencia de las instancias subyacentes

El BlockingCollection contiene elementos que ocupan mucha memoria. Me gustaría dejar que el productor saque un artículo de BlockingCollection a la vez y lo procese.

Estaba pensando que mediante el uso de foreach en BlockingCollection.GetConsumingEnumerable(), cada vez, el BlockingCollection se eliminará el elemento de la cola subyacente (es decir, todas junto con la referencia) lo que al final del método del proceso() que procesa el elemento , el artículo puede ser basura recolectada.

Pero esto no es verdad. Parece que el bucle foreach en BlockingCollection.GetConsumingEnumerable() contiene todas las referencias de los elementos ingresados ​​en la cola. Todos los artículos se guardan (evitando así que sean basura) hasta que se salga del ciclo foreach.

En lugar de usar el bucle foreach simple en BlockingCollection.GetConsumingEnumerable(), utilizo un ciclo while probando el bloque BlockingCollection.IsComplete y dentro del ciclo utilizo BlockingCollection.Take() para tomar un elemento consumible. Supongo que BlockingCollection.Take() tiene un efecto similar al List.Remove(), que eliminará la referencia del artículo de BlockingCollection. Pero nuevamente esto está mal. Todos los artículos son solo basura recolectada fuera del ciclo while.

Así que mi pregunta es, ¿cómo podemos implementar fácilmente el requisito de tal manera que BlockingCollection potencialmente contiene elementos que consumen memoria y cada elemento puede ser recogido basura una vez que es consumido por el consumidor? Muchas gracias por cualquier ayuda.

EDIT: conforme a lo solicitado, se añade un código de demostración simple:

// Entity is what we are going to process. 
// The finalizer will tell us when Entity is going to be garbage collected. 
class Entity 
{ 
    private static int counter_; 
    private int id_; 
    public int ID { get{ return id_; } } 
    public Entity() { id_ = counter++; } 
    ~Entity() { Console.WriteLine("Destroying entity {0}.", id_); } 
} 

... 

private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>(); 
private List<Task> tasks_ = new List<Task>(); 

// This is the method to launch and wait for the tasks to finish the work. 
void Run() 
{ 
    tasks_.Add(Task.Factory.StartNew(ProduceEntity); 
    Console.WriteLine("Start processing."); 
    tasks_.Add(Task.Factory.StartNew(ConsumeEntity); 
    Task.WaitAll(tasks_.ToArray()); 
} 

// The producer creates Entity instances and add them to BlockingCollection. 
void ProduceEntity() 
{ 
    for(int i = 0; i < 10; i ++) // We are adding totally 10 entities. 
    { 
     var newEntity = new Entity(); 
     Console.WriteLine("Create entity {0}.", newEntity.ID); 
     jobQueue_.Add(newEntity); 
    } 
    jobQueue_.CompleteAdding(); 
} 

// The consumer takes entity, process it (and what I need: destroy it). 
void ConsumeEntity() 
{ 
    while(!jobQueue_.IsCompleted){ 
     Entity entity; 
     if(jobQueue_.TryTake(entity)) 
     { 
      Console.WriteLine("Process entity {0}.", entity.ID); 
      entity = null; 

      // I would assume after GC, the entity will be finalized and garbage collected, but NOT. 
      GC.Collect(); 
      GC.WaitForPendingFinalizers(); 
      GC.Collect(); 
     } 
    } 
    Console.WriteLine("Finish processing."); 
} 

La salida es que todos los mensajes de creación y de proceso, seguido de "terminar de procesar." y seguido por todos los mensajes de destrucción de las entidades. Y entidades creación de mensajes mostrando Entity.ID de 0 a 9 y los mensajes de destrucción que muestran Entity.ID de 9 a 0.

EDIT:

Incluso cuando me puse la capacidad límite del BlockingCollection, todos los artículos siempre entrar en ella se finalizan solo cuando sale el bucle, lo cual es extraño.

+0

El hecho de que no hay REF retenidos no significa que el GC intervendrá de inmediato y recogerla ... código de ejemplo que muestra el problema con los métodos etc. gc.collect apropiado sería ser útil –

Respuesta

2

Si BlockingCollection continúa teniendo referencias depende del tipo de colección que está utilizando.

El default collection type para BlockingCollection<T> es ConcurrentQueue<T>.

Por lo tanto, el comportamiento de la recolección de basura dependerá del tipo de colección. En el caso de ConcurrentQueue<T> esta es una estructura FIFO, por lo que me sorprendería enormemente si esto no liberara referencias de la estructura de datos después de que se eliminaran de la cola (¡es como la definición de una cola)!

¿Cómo está determinando exactamente que los objetos no se están recogiendo basura?

6

ConcurrentQueue contiene segmentos con una matriz interna de 32 elementos. Los elementos de la Entidad no serán recogidos basura hasta que el segmento sea basura recolectada. Esto ocurrirá después de que los 32 elementos se retiren de la cola. Si cambia su ejemplo para agregar 32 elementos, verá los mensajes de "Entidad de destrucción" antes de "Terminar el procesamiento"."

+0

Hm. Esto es un comportamiento extraño para mí. No puedo asegurarme cada vez que tenemos tantos elementos. Pero lo probé como lo describió. Creo que tengo que liberar memoria de cada elemento de forma manual cada vez que Consumir un artículo. El tuyo está muy cerca de la respuesta final. Pero esperaré para ver si alguien más puede encontrar una solución para lidiar con este extraño comportamiento. – Steve

Cuestiones relacionadas