2011-02-10 26 views
30

Uso un ExecutorService para ejecutar una tarea. Esta tarea puede crear recursivamente otras tareas que se envían al mismo ExecutorService y esas tareas secundarias también pueden hacer eso.Java ExecutorService: awaitTerminación de todas las tareas creadas recursivamente

Ahora tengo el problema de que quiero esperar hasta que se hayan completado todas las tareas (es decir, todas las tareas hayan finalizado y no hayan enviado nuevas) antes de continuar.

No puedo llamar al ExecutorService.shutdown() en el hilo principal porque esto impide que el ExecutorService acepte nuevas tareas.

Y Llamar ExecutorService.awaitTermination() parece que no hace nada si shutdown no ha sido llamado.

Así que estoy un poco atrapado aquí. No puede ser tan difícil para el ExecutorService ver que todos los trabajadores están inactivos, ¿o sí? La única solución poco elegante que pude encontrar es usar directamente un ThreadPoolExecutor y consultar su getPoolSize() de vez en cuando. ¿Realmente no hay una mejor manera de hacerlo?

Respuesta

16

Si inicialmente se desconoce la cantidad de tareas en el árbol de tareas recursivas, quizás la forma más fácil sea implementar su propia primitiva de sincronización, una especie de "semáforo inverso", y compartirla entre sus tareas. Antes de enviar cada tarea, incrementa un valor, cuando la tarea se completa, disminuye ese valor, y espera hasta que el valor sea 0.

Al implementarlo como una primitiva separada explícitamente llamada de tareas, se desacopla esta lógica de la implementación del grupo de subprocesos y le permite enviar varios árboles independientes de tareas recursivas en el mismo grupo.

Algo como esto:

public class InverseSemaphore { 
    private int value = 0; 
    private Object lock = new Object(); 

    public void beforeSubmit() { 
     synchronized(lock) { 
      value++; 
     } 
    } 

    public void taskCompleted() { 
     synchronized(lock) { 
      value--; 
      if (value == 0) lock.notifyAll(); 
     } 
    } 

    public void awaitCompletion() throws InterruptedException { 
     synchronized(lock) { 
      while (value > 0) lock.wait(); 
     } 
    } 
} 

Tenga en cuenta que taskCompleted() deberían ser llamados dentro de un bloque finally, para que sea inmune a las posibles excepciones.

También tenga en cuenta que beforeSubmit() debe ser llamado por el hilo de envío antes de enviar la tarea, no por la tarea en sí, para evitar la posible "falsa finalización" cuando las tareas anteriores se completan y las nuevas aún no se inician.

EDIT: Problema importante con el patrón de uso reparado.

+1

Estaba respondiendo algo similar: podría obtener usando un AtomicInteger. –

+0

@SB: con AtomicInteger no puede esperar la finalización sin tener que esperar. – axtavt

+0

Hay un error tipográfico, estás haciendo lock-- en lugar de value-- –

0

Utilice CountDownLatch. Pase el objeto CountDownLatch a cada una de sus tareas y codifique sus tareas como a continuación.

public void doTask() { 
    // do your task 
    latch.countDown(); 
} 

Considerando que el hilo que tiene que esperar debe ejecutar el siguiente código:

public void doWait() { 
    latch.await(); 
} 

Pero por supuesto, esto supone que ya conoce el número de tareas secundarias para que se pudo inicializar el recuento del pestillo.

+1

y lo que debería ser el pestillo a intialised? – dogbane

+1

CountDownLatch tiene un problema porque no puede restablecer el recuento una vez que se ha creado. Supongo que no sabe la cantidad de tareas que el sistema invocará. –

+0

sí ... lo sé, supuse que él podría saber el número de tareas antes de la mano –

1

Utilice un Future para sus tareas (en lugar de presentar Runnable 's), una devolución de llamada actualiza su estado cuando se ha completado, por lo que puede utilizar para rastrear el Future.isDone Estado de todas sus tareas.

+0

¿Cómo se obtiene una devolución de llamada en un futuro? Pensé que tenías que llamar a .get on it. –

+1

Cuando dice devolución de llamada, se refiere al valor que devuelve desde el método de llamada –

+0

Lo que quiero decir es que el indicador 'listo' está configurado para usted (a través de una devolución de llamada). He reformulado la respuesta para hacer esto más explícito. – Joel

16

Este es realmente un candidato ideal para un Phaser. Java 7 está saliendo con esta nueva clase.Es un CountdonwLatch/CyclicBarrier flexible. Puede obtener una versión estable al JSR 166 Interest Site.

La forma en que es un CountdownLatch más flexible/CyclicBarrier es porque es capaz de soportar no sólo un número desconocido de partes (hilos), pero su también reutilizables (que es donde la parte de fase viene en)

Para cada La tarea que envíe se registraría, cuando se complete esa tarea, llegará. Esto puede hacerse recursivamente.

Phaser phaser = new Phaser(); 
ExecutorService e = // 

Runnable recursiveRunnable = new Runnable(){ 
    public void run(){ 
     //do work recursively if you have to 

     if(shouldBeRecursive){ 
      phaser.register(); 
      e.submit(recursiveRunnable); 
     } 

     phaser.arrive(); 
    } 
} 

public void doWork(){ 
    int phase = phaser.getPhase(); 

    phaser.register(); 
    e.submit(recursiveRunnable); 

    phaser.awaitAdvance(phase); 
} 

Editar: Gracias @depthofreality por señalar la condición de carrera en mi ejemplo anterior. Lo estoy actualizando para que el hilo de ejecución solo espere el avance de la fase actual, ya que bloquea para que se complete la función recursiva.

El número de fase no se disparará hasta el número de arrive s == register s. Dado que antes de cada llamada recursiva invoca register, se producirá un incremento de fase cuando se completen todas las invocaciones.

+0

Bueno, los phasers parecen ser lo que necesito. Quiero apegarme a la biblioteca Java estándar actual, pero tan pronto como esté disponible las usaré. ¡Gracias por el consejo! – Christoph

+0

Soy consciente de que fue publicado hace mucho tiempo. Todavía me pregunto si hay una condición de carrera aquí.No se puede doWork() completar antes de registros recursivos ejecutables con phaser? – depthofreality

+0

@depthofreality Ese es un gran punto. Usted está en lo cierto sin duda será una carrera aquí (debe haber pasado por alto ya que puse este ejemplo juntos rápidamente). Lo actualizaré ahora. –

6

Wow, ustedes son rápidos :)

Gracias por todas las sugerencias. Los futuros no se integran fácilmente con mi modelo porque no sé cuántos runnables están programados de antemano. Entonces, si mantengo viva una tarea parental solo para esperar a que terminen las tareas secundarias recursivas, tengo mucha basura por ahí.

Resolví mi problema con la sugerencia de AtomicInteger. Básicamente, he subclasificado ThreadPoolExecutor e incremento el contador en llamadas a execute() y decremento en llamadas a afterExecute(). Cuando el contador obtiene 0, llamo shutdown(). Esto parece funcionar para mis problemas, no estoy seguro de si esa es una buena manera de hacerlo. Especialmente, supongo que solo usas execute() para agregar Runnables.

Como nodo lateral: primero traté de registrar después de Ejecutar() el número de Ejecutables en la cola y el número de trabajadores que están activos y el apagado cuando esos son 0; pero eso no funcionó porque no todos los Runnables aparecieron en la cola y getActiveCount() tampoco hizo lo que yo esperaba.

De todos modos, aquí está mi solución: (si alguien encuentra serios problemas con esto, por favor hágamelo saber :)

public class MyThreadPoolExecutor extends ThreadPoolExecutor { 

    private final AtomicInteger executing = new AtomicInteger(0); 

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime, 
     TimeUnit seconds, BlockingQueue<Runnable> queue) { 
     super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue); 
    } 


    @Override 
    public void execute(Runnable command) { 
     //intercepting beforeExecute is too late! 
     //execute() is called in the parent thread before it terminates 
     executing.incrementAndGet(); 
     super.execute(command); 
    } 


    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     int count = executing.decrementAndGet(); 
     if(count == 0) { 
      this.shutdown(); 
     } 
    } 

} 
+0

Si bien esto funcionaría para sus requisitos específicos, no es una solución general (dada la posible condición de carrera después de disminuir y luego probar el valor de conteo == 0.) La solución general sería usar AbstractQueuedSynchronizer para rodar su propio pestillo de cuenta atrás "dinámico". – alphazero

+0

Tiene el problema de que el ejecutor no sabe cuándo ha dejado de agregar tareas. Si en algún momento todas sus tareas finalizan antes de que haya terminado de agregarlas, esas tareas se rechazarán cuando el grupo se cierre. –

+0

@PeterLawrey Correcto, pero hay una solución trivial: Incremente el contador inicialmente y disminuya cuando termine de agregar. O use una "tarea sumadora" para agregar todas las tareas. – maaartinus

0

(mea culpa: es un 'bit' más allá de mi hora de dormir;) pero aquí es un primer intento de un cierre dinámico):

package oss.alphazero.sto4958330; 

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.AbstractQueuedSynchronizer; 

public class DynamicCountDownLatch { 
    @SuppressWarnings("serial") 
    private static final class Sync extends AbstractQueuedSynchronizer { 
     private final CountDownLatch toplatch; 
     public Sync() { 
      setState(0); 
      this.toplatch = new CountDownLatch(1); 
     } 

     @Override 
     protected int tryAcquireShared(int acquires){ 
      try { 
       toplatch.await(); 
      } 
      catch (InterruptedException e) { 
       throw new RuntimeException("Interrupted", e); 
      } 
      return getState() == 0 ? 1 : -1; 
     } 
     public boolean tryReleaseShared(int releases) { 
      for (;;) { 
       int c = getState(); 
       if (c == 0) 
        return false; 
       int nextc = c-1; 
       if (compareAndSetState(c, nextc)) 
        return nextc == 0; 
      } 
     } 
     public boolean tryExtendState(int acquires) { 
      for (;;) { 
       int s = getState(); 
       int exts = s+1; 
       if (compareAndSetState(s, exts)) { 
        toplatch.countDown(); 
        return exts > 0; 
       } 
      } 
     } 
    } 
    private final Sync sync; 
    public DynamicCountDownLatch(){ 
     this.sync = new Sync(); 
    } 
    public void await() 
     throws InterruptedException 
    { 
     sync.acquireSharedInterruptibly(1); 
    } 

    public boolean await(long timeout, TimeUnit unit) 
     throws InterruptedException 
    { 
     return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 
    } 
    public void countDown() { 
     sync.releaseShared(1); 
    } 
    public void join() { 
     sync.tryExtendState(1); 
    } 
} 

Este pestillo introduce un nuevo método join() a la existente (clonado) API CountDownLatch, que es utilizado por las tareas para señalar su entrada en el grupo de trabajo más grande.

El pestillo se pasa de la tarea principal a la tarea secundaria. Cada tarea, según el patrón de Suraj, primero 'une()' el pestillo, hace su tarea(), y luego countDown().

Para hacer frente a situaciones en las que el hilo principal lanza el grupo de tareas y luego inmediatamente espera() - antes que cualquiera de los hilos de trabajo han tenido la oportunidad de incluso unirse() - el topLatch es utilizado interno clase int Sync. Este es un pestillo que contará hacia abajo en cada unión(); solo la primera cuenta regresiva es por supuesto significativa, ya que todas las subsecuentes son nops.

La implementación inicial anterior introduce una especie de arruga semántica, ya que se supone que tryAcquiredShared (int) no está lanzando una excepción InterruptedException, pero luego tenemos que ocuparnos de la interrupción de la espera en el topLatch.

¿Es esto una mejora sobre la propia solución de OP usando contadores Atomic? Yo diría que probablemente no IFF es insistente sobre el uso de ejecutores, pero creo que es un enfoque alternativo igualmente válido que usa el AQS en ese caso, y también se puede utilizar con hilos genéricos.

Critiar compañeros hackers.

2

Puede crear su propio grupo de subprocesos que se extiende ThreadPoolExecutor. Desea saber cuándo se ha enviado una tarea y cuándo se completa.

public class MyThreadPoolExecutor extends ThreadPoolExecutor { 
    private int counter = 0; 

    public MyThreadPoolExecutor() { 
     super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); 
    } 

    @Override 
    public synchronized void execute(Runnable command) { 
     counter++; 
     super.execute(command); 
    } 

    @Override 
    protected synchronized void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     counter--; 
     notifyAll(); 
    } 

    public synchronized void waitForExecuted() throws InterruptedException { 
     while (counter == 0) 
      wait(); 
    } 
} 
+1

Me gusta más esta solución que la que tiene una puntuación de 13. SIN EMBARGO: el "while (counter == 0)" debería ser "while (counter> 0)", ¿no? –

0

Si desea utilizar clases JSR166y, p. Phaser o Fork/Join: cualquiera de los dos puede funcionar para usted, siempre puede descargar el backport de Java 6 desde: http://gee.cs.oswego.edu/dl/concurrency-interest/ y utilizarlo como base en lugar de escribir una solución completamente homebrew. Luego, cuando sale 7, puede soltar la dependencia en el backport y cambiar algunos nombres de paquetes.

(La revelación completa: Hemos estado utilizando el LinkedTransferQueue en prod desde hace un tiempo sin problemas.)

0

debo decir, que las soluciones descritas anteriormente de un problema en esta tarea llamada recursiva y esperar a que las tareas suborden final DOESN no me satisfagas Hay mi solución inspirada en la documentación original de Oracle allí: CountDownLatch y ejemplo allí: Human resources CountDownLatch.

El primer hilo común en proceso en la instancia de la clase ha HRManagerCompact espera de retención para las discusiones de dos hijas, cosa que ha pestillos de espera para su posterior hilos 2 de la hija ... etc

Por supuesto, pestillo se puede establecer en el valor diferente de 2 (en el constructor de CountDownLatch), así como el número de objetos ejecutables se pueden establecer en iteración, es decir, ArrayList, pero debe corresponderse (el número de count downs debe ser igual al parámetro en CountDownLatch constructor).

Tenga cuidado, el número de pestillos aumenta exponencialmente según la condición de restricción: 'level.get() < 2', así como el número de objetos. 1, 2, 4, 8, 16 ... y pestillos 0, 1, 2, 4 ... Como puede ver, para cuatro niveles (level.get() < 4) habrá 15 hilos de espera y 7 pestillos en el tiempo, cuando se están ejecutando los 16 subprocesos máximos.

package processes.countdownlatch.hr; 

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicLong; 

/** Recursively latching running classes to wait for the peak threads 
* 
* @author hariprasad 
*/ 
public class HRManagerCompact extends Thread { 
    final int N = 2; // number of daughter's tasks for latch 
    CountDownLatch countDownLatch; 
    CountDownLatch originCountDownLatch; 
    AtomicInteger level = new AtomicInteger(0); 
    AtomicLong order = new AtomicLong(0); // id latched thread waiting for 

    HRManagerCompact techLead1 = null; 
    HRManagerCompact techLead2 = null; 
    HRManagerCompact techLead3 = null; 

// constructor 
public HRManagerCompact(CountDownLatch countDownLatch, String name, 
    AtomicInteger level, AtomicLong order){ 
    super(name); 
    this.originCountDownLatch=countDownLatch; 
    this.level = level; 
    this.order = order; 
} 

private void doIt() { 
    countDownLatch = new CountDownLatch(N); 
    AtomicInteger leveli = new AtomicInteger(level.get() + 1); 
    AtomicLong orderi = new AtomicLong(Thread.currentThread().getId()); 
    techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi); 
    techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi); 
    //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli); 

    techLead1.start(); 
    techLead2.start(); 
    //techLead3.start(); 

    try { 
    synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread 
     System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi); 
     countDownLatch.await(); // wait actual thread 
    } 
    System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi); 
    } catch (InterruptedException e) { 
    e.printStackTrace(); 
    } 
    } 

@Override 
public void run() { 
    try { 
    System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId()); 
    Thread.sleep(10*level.intValue()); 
    if (level.get() < 2) doIt(); 
    Thread.yield(); 
    } 
    catch (Exception e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
    } 
    /*catch (InterruptedException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
    }*/ 
    // TODO Auto-generated method stub 
    System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId()); 
    originCountDownLatch.countDown(); // count down 
} 

public static void main(String args[]){ 
    AtomicInteger levelzero = new AtomicInteger(0); 
    HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue())); 
    hr.doIt(); 
} 
} 

Posible salida comentado (con cierta probabilidad):

first: working... 1, 1, 10 // thread 1, first daughter's task (10) 
second: working... 1, 1, 11 // thread 1, second daughter's task (11) 
first: working... 2, 10, 12 // thread 10, first daughter's task (12) 
first: working... 2, 11, 14 // thread 11, first daughter's task (14) 
second: working... 2, 11, 15 // thread 11, second daughter's task (15) 
second: working... 2, 10, 13 // thread 10, second daughter's task (13) 
--- first: recruted 2, 10, 12 // finished 12 
--- first: recruted 2, 11, 14 // finished 14 
--- second: recruted 2, 10, 13 // finished 13 (now can be opened latch 10) 
--- second: recruted 2, 11, 15 // finished 15 (now can be opened latch 11) 
*** HR Manager waiting for recruitment to complete... 0, 0, 1 
*** HR Manager waiting for recruitment to complete... 1, 1, 10 
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened 
--- first: recruted 1, 1, 10 // finished 10 
*** HR Manager waiting for recruitment to complete... 1, 1, 11 
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened 
--- second: recruted 1, 1, 11 // finished 11 (now can be opened latch 1) 
*** Distribute Offer Letter, it means finished. 0, 0, 1 // latch on 1 opened 
0

La única solución poco elegante que podía llegar a es utilizar directamente un ThreadPoolExecutor y consultar su getPoolSize() de vez en mientras. ¿Realmente no hay una mejor manera de hacerlo?

usted tiene que utilizar shutdown() , awaitTermination() and shutdownNow() métodos en una secuencia apropiada.

shutdown(): inicia un cierre ordenado en el que se ejecutan las tareas enviadas anteriormente, pero no se aceptarán nuevas tareas.

awaitTermination(): Bloquea hasta que todas las tareas hayan completado la ejecución después de una solicitud de apagado, o se agote el tiempo de espera, o se interrumpa el hilo actual, lo que ocurra primero.

shutdownNow(): Intenta detener todas las tareas que se ejecutan activamente, detiene el procesamiento de las tareas en espera y devuelve una lista de las tareas que estaban esperando su ejecución.

Forma recomendada de Oracle página de documentación de ExecutorService:

void shutdownAndAwaitTermination(ExecutorService pool) { 
    pool.shutdown(); // Disable new tasks from being submitted 
    try { 
    // Wait a while for existing tasks to terminate 
    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 
     pool.shutdownNow(); // Cancel currently executing tasks 
     // Wait a while for tasks to respond to being cancelled 
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 
      System.err.println("Pool did not terminate"); 
    } 
    } catch (InterruptedException ie) { 
    // (Re-)Cancel if current thread also interrupted 
    pool.shutdownNow(); 
    // Preserve interrupt status 
    Thread.currentThread().interrupt(); 
    } 

Puede reemplazar si la condición con mientras condición en caso de larga duración en la finalización de las tareas de la siguiente manera:

Cambio

if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 

Para

while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { 
    Thread.sleep(60000); 
} 

Se puede hacer referencia a otras alternativas (excepto join(), que pueden ser utilizados con hilo independiente) en:

wait until all threads finish their work in java

Cuestiones relacionadas