2009-05-01 17 views
75

Tengo una clase que toma objetos de un BlockingQueue y los procesa llamando al take() en un bucle continuo. En algún momento sé que no se agregarán más objetos a la cola. ¿Cómo interrumpo el método take() para que deje de bloquear?¿Cómo se interrumpe una BlockingQueue que está bloqueando en take()?

Aquí está la clase que procesa los objetos:

public class MyObjHandler implements Runnable { 

    private final BlockingQueue<MyObj> queue; 

    public class MyObjHandler(BlockingQueue queue) { 
    this.queue = queue; 
    } 

    public void run() { 
    try { 
     while (true) { 
     MyObj obj = queue.take(); 
     // process obj here 
     // ... 
     } 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } 
    } 
} 

Y aquí está el método que utiliza esta clase para procesar objetos:

public void testHandler() { 

    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjectHandler handler = new MyObjectHandler(queue); 
    new Thread(handler).start(); 

    // get objects for handler to process 
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) { 
    queue.put(i.next()); 
    } 

    // what code should go here to tell the handler 
    // to stop waiting for more objects? 
} 

Respuesta

61

Si interrumpir el hilo no es una opción, otra es colocar un "marcador" o un objeto "comando" en la cola que sería reconocido como tal por MyObjHandler y descanso fuera de la lupa.

+34

Esto también se conoce como el enfoque de "cierre de píldora venenosa" y se discute extensamente en "Concurrencia de Java en la práctica", específicamente en las páginas 155-156. –

12
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 
MyObjectHandler handler = new MyObjectHandler(queue); 
Thread thread = new Thread(handler); 
thread.start(); 
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) { 
    queue.put(i.next()); 
} 
thread.interrupt(); 

Sin embargo, si usted hace esto, el hilo podría interrumpirse mientras todavía hay elementos en la cola, esperando ser procesados. Es posible que desee considerar el uso de poll en lugar de take, que permitirá que el subproceso de procesamiento expire y finalice cuando haya esperado un tiempo sin nueva entrada.

+0

Sí, es un problema si el hilo se interrumpe mientras todavía hay elementos en la cola. Para evitar esto, agregué código para asegurarme de que la cola está vacía antes de interrumpir el hilo: while (queue.size()>0) Thread.currentThread().sleep(5000); MCS

+2

@MCS: tenga en cuenta para los futuros visitantes que su enfoque aquí es un hack y no debe reproducirse en el código de producción por tres razones. Siempre es preferible encontrar una forma real de enganchar el cierre. Nunca es aceptable usar 'Thread.sleep()' como una alternativa a un gancho apropiado. En otras implementaciones, otros subprocesos pueden colocar cosas en la cola, y el ciclo while puede que nunca termine. –

+0

Afortunadamente, no hay necesidad de confiar en tales hacks porque uno simplemente lo procesa * después de * la interrupción si es necesario. Por ejemplo, una implementación 'exhaustiva 'de' take() 'podría verse así:' try {return take(); } catch (InterruptedException e) {E o = poll(); if (o == null) throw e; Thread.currentThread(). Interrupt(); devolver o; } 'Sin embargo, no hay ninguna razón por la que deba implementarse en esta capa, e implementarlo un poco más arriba conducirá a un código más eficiente (como al evitar la' InterruptedException' por elemento y/o al usar 'BlockingQueue.drainTo () '). – antak

1

Interrumpir el hilo:

thread.interrupt() 
0

O no interrumpa, es asqueroso.

public class MyQueue<T> extends ArrayBlockingQueue<T> { 

     private static final long serialVersionUID = 1L; 
     private boolean done = false; 

     public ParserQueue(int capacity) { super(capacity); } 

     public void done() { done = true; } 

     public boolean isDone() { return done; } 

     /** 
     * May return null if producer ends the production after consumer 
     * has entered the element-await state. 
     */ 
     public T take() throws InterruptedException { 
      T el; 
      while ((el = super.poll()) == null && !done) { 
       synchronized (this) { 
        wait(); 
       } 
      } 

      return el; 
     } 
    } 
  1. cuando se pone productores se oponen a la cola, llame queue.notify(), si termina, llame queue.done()
  2. bucle while (! Queue.isDone() ||! Queue.isEmpty())
  3. Hacer el test() valor de retorno para nula
+0

Yo diría que la solución anterior era más limpia y más simple que esta – sakthisundar

+0

La bandera hecha es la misma que una píldora venenosa, simplemente se administra de manera diferente :) –

+0

¿Limpiador? Lo dudo. No sabes cuando el hilo se interrumpe exactamente. O déjame preguntar, ¿qué es más limpio con eso? Es menos código, eso es un hecho. – tomasb

12

muy tarde, pero que esto ayude también otros como me enfrenté al problema similar y se utiliza la poll enfoque sugerido por erickson above con algunos cambios menores,

class MyObjHandler implements Runnable 
{ 
    private final BlockingQueue<MyObj> queue; 
    public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL 
    public MyObjHandler(BlockingQueue queue) 
    { 
     this.queue = queue; 
     Finished = false; 
    } 
    @Override 
    public void run() 
    {   
     while (true) 
     { 
      try 
      { 
       MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); 
       if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return 
       { 
        // process obj here 
        // ... 
       } 
       if(Finished && queue.isEmpty()) 
        return; 

      } 
      catch (InterruptedException e) 
      {     
       return; 
      } 
     } 
    } 
} 

public void testHandler() 
{ 
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler handler = new MyObjHandler(queue); 
    new Thread(handler).start(); 

    // get objects for handler to process 
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) 
    { 
     queue.put(i.next()); 
    } 

    // what code should go here to tell the handler to stop waiting for more objects? 
    handler.Finished = true; //THIS TELLS HIM 
    //If you need you can wait for the termination otherwise remove join 
    myThread.join(); 
} 

Esto resolvió tanto los problemas

  1. señaló la BlockingQueue para que sepa que no tiene que esperar más de los elementos
  2. No se interrumpe en el medio para que los bloques de procesamiento finalicen solo cuando se procesen todos los elementos en cola y no haya elementos restantes para agregar
+2

Haz que la variable 'Finalizada' sea' volátil' para garantizar la visibilidad entre los hilos. Ver http://stackoverflow.com/a/106787 – lukk

+0

@lukk gracias por su respuesta, edité la respuesta para incluir su punto ..... – dbw

+2

Si no me equivoco, el elemento final en la cola no será procesada. Cuando toma el elemento final de la cola, el final es verdadero y la cola está vacía, por lo que regresará antes de haber manejado ese elemento final. Agregue una tercera condición if (Finalizado && queue.isEmpty() && obj == null) –

Cuestiones relacionadas