7

Estoy luchando con la mejor manera de implementar mi canal de procesamiento.colas de trabajo productor/consumidor

Mis productores alimentan el trabajo a un BlockingQueue. Del lado del consumidor, sondeé la cola, envolví lo que obtengo en una tarea Runnable y lo envié a un ExecutorService.

while (!isStopping()) 
{ 
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS); 
    if (work == null) 
    { 
     break; 
    } 
    executorService.execute(new Worker(work)); // needs to block if no threads! 
} 

Esto no es ideal; el ExecutorService tiene su propia cola, por supuesto, así que lo que realmente está sucediendo es que siempre estoy agotando por completo mi cola de trabajo y llenando la cola de tareas, que se vacía lentamente a medida que se completan las tareas.

Me doy cuenta de que puedo poner tareas en cola al final del productor, pero realmente prefiero no hacer eso. Me gusta que la indirección/aislamiento de mi cola de trabajo sean cuerdas tontas; realmente no es asunto del productor lo que les va a pasar. Obligar al productor a poner en cola Runnable o Invocable rompe una abstracción, en mi humilde opinión.

Pero sí quiero que la cola de trabajos compartida represente el estado de procesamiento actual. Quiero poder bloquear a los productores si los consumidores no se mantienen al día.

Me encantaría usar los ejecutores, pero siento que estoy luchando contra su diseño. ¿Puedo beber parcialmente el Kool-ade, o tengo que tragarlo? ¿Me estoy equivocando al resistir las tareas de puesta en cola? (Sospecho que podría configurar ThreadPoolExecutor para usar una cola de 1 tarea y anular su método de ejecución para bloquear en lugar de rechazar en cola llena, pero eso parece bruto.)

¿Sugerencias?

Respuesta

14

Quiero la cola de trabajo compartido para representan el estado actual de procesamiento .

Pruebe un tipo común BlockingQueue y tienen una piscina de subprocesos de trabajo que toman elementos de trabajo fuera de la cola.

Quiero ser capaz de bloquear los productores si los consumidores no son mantenerse al día.

Tanto ArrayBlockingQueue y LinkedBlockingQueue apoyo acotada colas de tal manera que bloqueará el puesto cuando está llena. El uso de los métodos de bloqueo put() garantiza que los productores estén bloqueados si la cola está llena.

Aquí hay un comienzo difícil. Puede ajustar el número de trabajadores y tamaño de la cola:

public class WorkerTest<T> { 

    private final BlockingQueue<T> workQueue; 
    private final ExecutorService service; 

    public WorkerTest(int numWorkers, int workQueueSize) { 
     workQueue = new LinkedBlockingQueue<T>(workQueueSize); 
     service = Executors.newFixedThreadPool(numWorkers); 

     for (int i=0; i < numWorkers; i++) { 
      service.submit(new Worker<T>(workQueue)); 
     } 
    } 

    public void produce(T item) { 
     try { 
      workQueue.put(item); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } 
    } 


    private static class Worker<T> implements Runnable { 
     private final BlockingQueue<T> workQueue; 

     public Worker(BlockingQueue<T> workQueue) { 
      this.workQueue = workQueue; 
     } 

     @Override 
     public void run() { 
      while (!Thread.currentThread().isInterrupted()) { 
       try { 
        T item = workQueue.take(); 
        // Process item 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        break; 
       } 
      } 
     } 
    } 
} 
+0

Thanks; mi implementación anterior era muy parecida a esta, aunque solo usaba ThreadFactory: una vez que lo reduce a un conjunto fijo de hilos que intentan agotar la cola de trabajo, no tiene mucho sentido utilizar ExecutorService. Estaba cambiando a ExecutorService para aprovechar un grupo de subprocesos más sintonizables, con la semántica de "encontrar un hilo de trabajo existente disponible, si es que existe uno, crear uno si es necesario, matarlos si están inactivos". –

+0

The Executors.newCachedThreadPool() hará algo similar a eso. También puede ajustar la política de grupo en ThreadPoolExecutor. ¿Qué es lo que buscas? – Kevin

+0

Esa es la idea ... se puede ajustar exactamente de la manera que me gusta, si estuviera dispuesto a utilizar su cola de trabajo de tareas. Lo que realmente quiero es forjar la inteligencia del conjunto de subprocesos del ejecutor e implementar mi propio cliente del grupo de subprocesos, pero en realidad no está configurado para eso. –

0

Usted podría tener su consumo ejecutar Runnable::run directamente en lugar de iniciar un nuevo hilo hacia arriba. Combina esto con una cola de bloqueo con un tamaño máximo y creo que obtendrás lo que deseas. Su consumidor se convierte en un trabajador que está ejecutando tareas en línea en función de los elementos de trabajo en la cola. Solo arrastrarán los artículos tan rápido como los procesan para que el productor los deje de consumir.

+0

Eso solo le dará a un trabajador, y me gustaría sintonizarlo para maximizar el procesamiento para una configuración de sistema dada. Los trabajos de procesamiento durarán entre una fracción de segundo y decenas de segundos, y serán una mezcla de trabajos vinculados a E/S y vinculados a CPU. –

+0

Estaba asumiendo múltiples consumidores asociados con uno o más productores. Si solo tiene un solo consumidor, ¿por qué no hacer que el productor descargue el trabajo directamente en el ejecutor? –

1

"encuentre un hilo de trabajo existente disponible si existe, cree uno si es necesario, mátelos si se quedan inactivos".

Administrar todos esos estados de trabajadores es tan innecesario como peligroso.Crearía un hilo de monitor que se ejecuta constantemente en segundo plano, cuya única tarea es rellenar la cola y generar consumidores ... ¿por qué no hacer que los hilos de trabajador daemons mueran tan pronto como se completen? Si los une a todos en un ThreadGroup puede cambiar el tamaño del pool de forma dinámica ... por ejemplo:

**for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
     spawnDaemonWorkers(queue.poll()); 
    }**