2009-10-28 26 views
23

He encontrado un problema dos veces ahora por el cual una hebra de productor produce N elementos de trabajo, los envía a un ExecutorService y luego necesita esperar hasta que se hayan procesado todos los N elementos.Conteo flexibleDownLatch?

Advertencias

  • N no se conoce de antemano. Si fuera así, simplemente crearía un CountDownLatch y luego tendré el hilo productor await() hasta que se complete todo el trabajo.
  • utilizando un CompletionService es inadecuado porque aunque mi hilo productor necesita para bloquear (es decir, llamando take()) hay hay manera de señalar que todo el trabajo se haya completado, para hacer que el hilo productor que dejar de esperar.

Mi solución preferida actual es utilizar un contador de números enteros, y para incremento esto cada vez que se presenta un elemento de trabajo y al decremento cuando se procesa un elemento de trabajo. Después de la presentación de todas las N tareas, mi secuencia de producción deberá esperar en un bloqueo, verificando si counter == 0 siempre que se notifique. El/los hilo (s) de consumo deberán notificar al productor si ha decrementado el contador y el nuevo valor es 0.

¿Hay un mejor enfoque para este problema o hay una construcción adecuada en java.util.concurrent? que "rodar el mío"?

Gracias de antemano.

+0

¿En qué momento el productor sabe cuántos elementos de trabajo hay? Cuando el último artículo ha sido producido? –

+0

Su solución actual puede sufrir una condición de carrera: producir elemento 1 -> contador ++ -> procesar elemento 1 -> contador-- -> producir elemento 2. Como el contador se ha decrementado antes de que el productor haya producido el siguiente artículo, el productor piensa que está listo. –

+0

@rwwilden: Tiene razón en que este escenario podría ocurrir. Sin embargo, mi productor solo inspeccionaría/esperaría en el mostrador después de enviar * todos los elementos de trabajo, por lo que no representa una condición de carrera en este caso particular. – Adamski

Respuesta

24

java.util.concurrent.Phaser parece que funcionaría bien para usted. Está previsto que se publique en Java 7, pero la versión más estable se puede encontrar en el sitio web del grupo de interés de jsr166.

El phaser es una barrera cíclica glorificada. Puede registrar N número de partes y cuando esté listo, aguarde su avance en la fase específica.

Un ejemplo rápido de cómo funcionaría:

final Phaser phaser = new Phaser(); 

public Runnable getRunnable(){ 
    return new Runnable(){ 
     public void run(){ 
      ..do stuff... 
      phaser.arriveAndDeregister(); 
     } 
    }; 
} 
public void doWork(){ 
    phaser.register();//register self 
    for(int i=0 ; i < N; i++){ 
     phaser.register(); // register this task prior to execution 
     executor.submit(getRunnable()); 
    } 
    phaser.arriveAndAwaitAdvance(); 
} 
+3

Cool - Gracias; Esto parece exactamente lo que estoy buscando ... No puedo creer que lo hayan llamado Phaser. – Adamski

+2

Desearía poder votar nuevamente x100 veces más. Muchas gracias. –

+2

Si el punto de 'getRunnable()' es representar una tarea que solo señala el phaser y luego termina, desea invocar 'phaser.arriveAndDeregister()' y ** not ** 'phaser.arrive()' de lo contrario cuando la tarea primaria invoque 'phaser.arriveAndAwaitAdvance()' por segunda vez, se bloqueará, ya que la tarea estará esperando las tareas terminadas que todavía están registradas en el phaser. –

2

Por supuesto podría utilizar un CountDownLatch protegido por un AtomicReference para que sus tareas envolverse así:

public class MyTask extends Runnable { 
    private final Runnable r; 
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; } 

    public void run() { 
     r.run(); 
     while (l.get() == null) Thread.sleep(1000L); //handle Interrupted 
     l.get().countDown(); 
    } 
} 

Aviso que las tareas se ejecutan su trabajo y luego giro hasta que la cuenta atrás es set (es decir, se conoce el número total de tareas). Tan pronto como se establece la cuenta regresiva, la cuentan hacia abajo y salen. Estos se presentaron de la siguiente manera:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); 
executor.submit(new MyTask(r, l)); 

Después el punto de la creación/presentación de su trabajo, cuando se sabe cuántas tareas que haya creado:

latch.set(new CountDownLatch(nTasks)); 
latch.get().await(); 
+0

¿Por qué el ajuste CountDownLatch con AtomicReference lo ayuda en este caso? La referencia no necesita protección, ya que se pasa al MyTask en su constructor y nunca cambia después de eso. – Avi

+0

Además, asegúrese de manejar throwables arbitrarios (RuntimeExceptions y Errors) en el método run(), al menos poniendo coundDown() en un bloque finally {}. – Avi

+0

Y, por supuesto, la pregunta era específicamente sobre el caso cuando no se conoce el número de tareas en el momento de la creación de la tarea. – Avi

0

que asumen su productor hace No es necesario saber cuándo la cola está vacía, pero necesita saber cuándo se completó la última tarea.

Agregaría un método waitforWorkDone(producer) al consumidor. El productor puede agregar sus N tareas y llamar al método de espera. El método de espera bloquea el hilo entrante si la cola de trabajo no está vacía y no se están ejecutando tareas en este momento.

El consumidor enhebra notifyAll() en el bloqueo waitfor iff su tarea ha finalizado, la cola está vacía y no se está ejecutando ninguna otra tarea.

1

He usado un ExecutorCompletionService para algo como esto:

ExecutorCompletionService executor = ...; 
int count = 0; 
while (...) { 
    executor.submit(new Processor()); 
    count++; 
} 

//Now, pull the futures out of the queue: 
for (int i = 0; i < count; i++) { 
    executor.take().get(); 
} 

Esto implica mantener una cola de tareas que se han presentado, por lo que si su lista es arbitrariamente larga, su método podría ser mejor.

Pero asegúrese de usar un AtomicInteger para la coordinación, de modo que pueda incrementarlo en un subproceso y disminuirlo en los subprocesos de trabajo.

+0

¿Cómo puedes 'esperar la terminación'? ¿Qué está cerrando su servicio? Además, no puede volver a utilizar el servicio de finalización en su respuesta de todos modos, ya que puede estar esperando tareas que lógicamente eran * otro conjunto *. Y (como mencioné en los comentarios a mi respuesta), todavía necesita saber 'nTasks' en algún punto –

+0

@oxbow_lakes: Es cierto. Es por eso que solo lo tengo como una opción (que he editado, ya que no ayuda en este caso). Supongo que este servicio de finalización no se utilizará para otro conjunto de tareas al mismo tiempo (superposición) con este. Se puede usar nuevamente más tarde, desde el mismo hilo. – Avi

+0

La ventaja de esta forma de realizar un seguimiento del número de tareas en lugar de utilizar un AtomicBoolean es que no tiene que manejar el wait() y notify() manualmente: la cola se encargará de eso. Todo lo que tiene que rastrear es la cantidad de elementos en la cola. – Avi

0

Lo que descrito es bastante similar al uso de un semáforo estándar pero usado 'hacia atrás'.

  • Su semáforo comienza con 0 permisos
  • Cada unidad de trabajo libera un permiso cuando se completa
  • Bloqueas esperando para adquirir N permite

además de contar con la flexibilidad necesaria para sólo adquieren M < N permisos que es útil si desea verificar el estado intermedio. Por ejemplo, estoy probando una cola de mensajes acotados asíncronos, así que espero que la cola esté llena para algunos M < N, así que puedo adquirir M y verificar que la cola está llena y adquirir los permisos N - M restantes después de consumir mensajes del cola.