Estoy luchando con la mejor manera de implementar mi canal de procesamiento.colas de trabajo productor/consumidor
Mis productores alimentan el trabajo a un BlockingQueue. Del lado del consumidor, sondeé la cola, envolví lo que obtengo en una tarea Runnable y lo envié a un ExecutorService.
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
Esto no es ideal; el ExecutorService tiene su propia cola, por supuesto, así que lo que realmente está sucediendo es que siempre estoy agotando por completo mi cola de trabajo y llenando la cola de tareas, que se vacía lentamente a medida que se completan las tareas.
Me doy cuenta de que puedo poner tareas en cola al final del productor, pero realmente prefiero no hacer eso. Me gusta que la indirección/aislamiento de mi cola de trabajo sean cuerdas tontas; realmente no es asunto del productor lo que les va a pasar. Obligar al productor a poner en cola Runnable o Invocable rompe una abstracción, en mi humilde opinión.
Pero sí quiero que la cola de trabajos compartida represente el estado de procesamiento actual. Quiero poder bloquear a los productores si los consumidores no se mantienen al día.
Me encantaría usar los ejecutores, pero siento que estoy luchando contra su diseño. ¿Puedo beber parcialmente el Kool-ade, o tengo que tragarlo? ¿Me estoy equivocando al resistir las tareas de puesta en cola? (Sospecho que podría configurar ThreadPoolExecutor para usar una cola de 1 tarea y anular su método de ejecución para bloquear en lugar de rechazar en cola llena, pero eso parece bruto.)
¿Sugerencias?
Thanks; mi implementación anterior era muy parecida a esta, aunque solo usaba ThreadFactory: una vez que lo reduce a un conjunto fijo de hilos que intentan agotar la cola de trabajo, no tiene mucho sentido utilizar ExecutorService. Estaba cambiando a ExecutorService para aprovechar un grupo de subprocesos más sintonizables, con la semántica de "encontrar un hilo de trabajo existente disponible, si es que existe uno, crear uno si es necesario, matarlos si están inactivos". –
The Executors.newCachedThreadPool() hará algo similar a eso. También puede ajustar la política de grupo en ThreadPoolExecutor. ¿Qué es lo que buscas? – Kevin
Esa es la idea ... se puede ajustar exactamente de la manera que me gusta, si estuviera dispuesto a utilizar su cola de trabajo de tareas. Lo que realmente quiero es forjar la inteligencia del conjunto de subprocesos del ejecutor e implementar mi propio cliente del grupo de subprocesos, pero en realidad no está configurado para eso. –