2010-01-28 23 views
25

Tengo un proceso que delega las tareas de asincronización a un grupo de subprocesos. Necesito asegurarme de que ciertas tareas se ejecuten en orden. Así, por ejemploControl del orden de ejecución de tareas con ExecutorService

Tareas llegan a fin

Tareas A1, B1, C1, D1, E1, A2, A3, B2, F1

tareas pueden ser ejecutadas en cualquier orden, excepto cuando hay una Dependencia natural, por lo que a1, a2, a3 deben procesarse en ese orden asignándolas al mismo hilo o bloqueándolas hasta que sepa que se completó la tarea a # anterior.

Actualmente no utiliza el paquete Java Concurrency, pero estoy considerando cambiar para aprovechar la gestión de subprocesos.

¿Alguien tiene una solución o sugerencias de cómo lograr esto

Respuesta

2

similares Cuando se envía una Runnable o Callable a un ExecutorService recibe un Future a cambio. Haga que los hilos que dependen de a1 pasen a1's Future y llame al Future.get(). Esto se bloqueará hasta que el hilo se complete.

Así:

ExecutorService exec = Executor.newFixedThreadPool(5); 
Runnable a1 = ... 
final Future f1 = exec.submit(a1); 
Runnable a2 = new Runnable() { 
    @Override 
    public void run() { 
    f1.get(); 
    ... // do stuff 
    } 
} 
exec.submit(a2); 

y así sucesivamente.

+4

No creo que esto va a funcionar con un grupo de subprocesos fijo, como los hilos podrían ser un punto muerto todos los bloques en 'f1.get()' a la vez y. – finnw

+0

Sintonice el tamaño de la agrupación según corresponda. – cletus

+0

O use un grupo de subprocesos en caché. – finnw

2

Otra opción es crear su propio ejecutor, llamarlo OrderedExecutor, y crear una matriz de objetos ThreadPoolExecutor encapsulados, con 1 hilo por ejecutor interno. A continuación, proporciona un mecanismo para la elección de uno de los objetos internos, por ejemplo, se puede hacer esto proporcionando una interfaz que el usuario de su clase puede implementar:

 
executor = new OrderedExecutor(10 /* pool size */, new OrderedExecutor.Chooser() { 
    public int choose(Runnable runnable) { 
    MyRunnable myRunnable = (MyRunnable)runnable; 
    return myRunnable.someId(); 
    }); 

executor.execute(new MyRunnable()); 

La implementación de OrderedExecutor.execute() y luego va a utilizar el Selector para obtener un int, usted mod esto con el tamaño del grupo, y ese es su índice en el conjunto interno. La idea es que "someId()" devolverá el mismo valor para todas las "a", etc.

12

Cuando he hecho esto en el pasado, generalmente he tenido un pedido que maneja un componente que luego envía callables/runnables a un ejecutor.

Algo así como.

  • Tienes una lista de tareas a ejecutar, algunas de ellas con dependencias
  • Crear un ejecutor y se envuelve con un ExecutorCompletionService
  • Buscar todas las tareas, todo sin dependencias, programarlos través del servicio de terminación
  • encuesta el servicio de terminación
  • a medida que cada tarea se completa
    • Añadir a una lista "completado"
    • Reevalúe cualquier tarea en espera wrt a la "lista completa" para ver si se trata de "dependencia completa".Si es así ellos programar
    • repetición
    • Enjuague hasta que se presenten todas las tareas/completado

El servicio de terminación es una buena manera de ser capaz de obtener las tareas a medida que completa en lugar de tratar de sondear un montón de Futuros. Sin embargo, es probable que desee mantener un Map<Future, TaskIdentifier> que se completa cuando una tarea se programa a través del servicio de finalización para que cuando el servicio de finalización le proporcione un futuro completo pueda averiguar cuál es TaskIdentifier.

Si alguna vez se encuentra en un estado donde las tareas todavía están esperando para ejecutarse, pero no se está ejecutando nada y no se puede programar nada, entonces tiene un problema circular de dependencia.

9

Escribo el propio ejecutor que garantiza el orden de tareas para tareas con la misma clave. Utiliza un mapa de colas para tareas de pedido con la misma clave. Cada tarea con clave ejecuta la siguiente tarea con la misma clave.

Esta solución no maneja RejectedExecutionException u otras excepciones del ejecutor delegado! Así que el ejecutor delegado debe ser "ilimitado".

import java.util.HashMap; 
import java.util.LinkedList; 
import java.util.Map; 
import java.util.Queue; 
import java.util.concurrent.Executor; 

/** 
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). 
*/ 
public class OrderingExecutor implements Executor{ 

    private final Executor delegate; 
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>(); 

    public OrderingExecutor(Executor delegate){ 
     this.delegate = delegate; 
    } 

    @Override 
    public void execute(Runnable task) { 
     // task without key can be executed immediately 
     delegate.execute(task); 
    } 

    public void execute(Runnable task, Object key) { 
     if (key == null){ // if key is null, execute without ordering 
      execute(task); 
      return; 
     } 

     boolean first; 
     Runnable wrappedTask; 
     synchronized (keyedTasks){ 
      Queue<Runnable> dependencyQueue = keyedTasks.get(key); 
      first = (dependencyQueue == null); 
      if (dependencyQueue == null){ 
       dependencyQueue = new LinkedList<Runnable>(); 
       keyedTasks.put(key, dependencyQueue); 
      } 

      wrappedTask = wrap(task, dependencyQueue, key); 
      if (!first) 
       dependencyQueue.add(wrappedTask); 
     } 

     // execute method can block, call it outside synchronize block 
     if (first) 
      delegate.execute(wrappedTask); 

    } 

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
     return new OrderedTask(task, dependencyQueue, key); 
    } 

    class OrderedTask implements Runnable{ 

     private final Queue<Runnable> dependencyQueue; 
     private final Runnable task; 
     private final Object key; 

     public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
      this.task = task; 
      this.dependencyQueue = dependencyQueue; 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      try{ 
       task.run(); 
      } finally { 
       Runnable nextTask = null; 
       synchronized (keyedTasks){ 
        if (dependencyQueue.isEmpty()){ 
         keyedTasks.remove(key); 
        }else{ 
         nextTask = dependencyQueue.poll(); 
        } 
       } 
       if (nextTask!=null) 
        delegate.execute(nextTask); 
      } 
     } 
    } 
} 
+0

+1. Gracias por eso.Usaré esta implantación, pero realmente no sé cómo esto no está marcado como la respuesta final para la pregunta. –

0

En Habanero-Java library, hay un concepto de tareas basadas en datos que pueden ser utilizados para expresar dependencias entre las tareas y evitar operaciones de hilo de bloqueo. Bajo las cubiertas, la biblioteca Habanero-Java usa los JDKs ForkJoinPool (es decir, un ExecutorService).

Por ejemplo, el caso de uso para tareas A1, A2, A3, ... podría expresarse de la siguiente manera:

HjFuture a1 = future(() -> { doA1(); return true; }); 
HjFuture a2 = futureAwait(a1,() -> { doA2(); return true; }); 
HjFuture a3 = futureAwait(a2,() -> { doA3(); return true; }); 

Tenga en cuenta que A1, A2 y A3 son sólo referencias a objetos de tipo HjFuture y puede mantenerse en sus estructuras de datos personalizadas para especificar las dependencias cuando las tareas A2 y A3 entren en el tiempo de ejecución.

Hay algunos tutorial slides available. Puede encontrar más documentación como javadoc, API summary y primers.

0

Puede usar Executors.newSingleThreadExecutor(), pero solo usará un hilo para ejecutar sus tareas. Otra opción es usar CountDownLatch. Aquí hay un ejemplo simple:

public class Main2 { 

public static void main(String[] args) throws InterruptedException { 

    final CountDownLatch cdl1 = new CountDownLatch(1); 
    final CountDownLatch cdl2 = new CountDownLatch(1); 
    final CountDownLatch cdl3 = new CountDownLatch(1); 

    List<Runnable> list = new ArrayList<Runnable>(); 
    list.add(new Runnable() { 
     public void run() { 
      System.out.println("Task 1"); 

      // inform that task 1 is finished 
      cdl1.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 1 is finished 
      try { 
       cdl1.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 2"); 

      // inform that task 2 is finished 
      cdl2.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 2 is finished 
      try { 
       cdl2.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 3"); 

      // inform that task 3 is finished 
      cdl3.countDown(); 
     } 
    }); 

    ExecutorService es = Executors.newFixedThreadPool(200); 
    for (int i = 0; i < 3; i++) { 
     es.submit(list.get(i)); 
    } 

    es.shutdown(); 
    es.awaitTermination(1, TimeUnit.MINUTES); 
} 
} 
0

Creé un OrderingExecutor para este problema. Si pasa la misma clave al método execute() con diferentes ejecutables, la ejecución de los ejecutables con la misma clave estará en el orden en que se ejecuta execute() y nunca se superpondrá.

import java.util.Arrays; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.Executor; 

/** 
* Special executor which can order the tasks if a common key is given. 
* Runnables submitted with non-null key will guaranteed to run in order for the same key. 
* 
*/ 
public class OrderedExecutor { 

    private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
      new ConcurrentLinkedQueue<Runnable>()); 

    private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>(); 
    private Executor delegate; 
    private volatile boolean stopped; 

    public OrderedExecutor(Executor delegate) { 
     this.delegate = delegate; 
    } 

    public void execute(Runnable runnable, Object key) { 
     if (stopped) { 
      return; 
     } 

     if (key == null) { 
      delegate.execute(runnable); 
      return; 
     } 

     Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> { 
      v.add(runnable); 
      return v; 
     }); 
     if (queueForKey == null) { 
      // There was no running task with this key 
      Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>()); 
      newQ.add(runnable); 
      // Use putIfAbsent because this execute() method can be called concurrently as well 
      queueForKey = taskMap.putIfAbsent(key, newQ); 
      if (queueForKey != null) 
       queueForKey.add(runnable); 
      delegate.execute(new InternalRunnable(key)); 
     } 
    } 

    public void shutdown() { 
     stopped = true; 
     taskMap.clear(); 
    } 

    /** 
    * Own Runnable used by OrderedExecutor. 
    * The runnable is associated with a specific key - the Queue&lt;Runnable> for this 
    * key is polled. 
    * If the queue is empty, it tries to remove the queue from taskMap. 
    * 
    */ 
    private class InternalRunnable implements Runnable { 

     private Object key; 

     public InternalRunnable(Object key) { 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      while (true) { 
       // There must be at least one task now 
       Runnable r = taskMap.get(key).poll(); 
       while (r != null) { 
        r.run(); 
        r = taskMap.get(key).poll(); 
       } 
       // The queue emptied 
       // Remove from the map if and only if the queue is really empty 
       boolean removed = taskMap.remove(key, EMPTY_QUEUE); 
       if (removed) { 
        // The queue has been removed from the map, 
        // if a new task arrives with the same key, a new InternalRunnable 
        // will be created 
        break; 
       } // If the queue has not been removed from the map it means that someone put a task into it 
        // so we can safely continue the loop 
      } 
     } 
    } 

    /** 
    * Special Queue implementation, with equals() and hashCode() methods. 
    * By default, Java SE queues use identity equals() and default hashCode() methods. 
    * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()). 
    * 
    * @param <E> The type of elements in the queue. 
    */ 
    private static class QueueWithHashCodeAndEquals<E> implements Queue<E> { 

     private Queue<E> delegate; 

     public QueueWithHashCodeAndEquals(Queue<E> delegate) { 
      this.delegate = delegate; 
     } 

     public boolean add(E e) { 
      return delegate.add(e); 
     } 

     public boolean offer(E e) { 
      return delegate.offer(e); 
     } 

     public int size() { 
      return delegate.size(); 
     } 

     public boolean isEmpty() { 
      return delegate.isEmpty(); 
     } 

     public boolean contains(Object o) { 
      return delegate.contains(o); 
     } 

     public E remove() { 
      return delegate.remove(); 
     } 

     public E poll() { 
      return delegate.poll(); 
     } 

     public E element() { 
      return delegate.element(); 
     } 

     public Iterator<E> iterator() { 
      return delegate.iterator(); 
     } 

     public E peek() { 
      return delegate.peek(); 
     } 

     public Object[] toArray() { 
      return delegate.toArray(); 
     } 

     public <T> T[] toArray(T[] a) { 
      return delegate.toArray(a); 
     } 

     public boolean remove(Object o) { 
      return delegate.remove(o); 
     } 

     public boolean containsAll(Collection<?> c) { 
      return delegate.containsAll(c); 
     } 

     public boolean addAll(Collection<? extends E> c) { 
      return delegate.addAll(c); 
     } 

     public boolean removeAll(Collection<?> c) { 
      return delegate.removeAll(c); 
     } 

     public boolean retainAll(Collection<?> c) { 
      return delegate.retainAll(c); 
     } 

     public void clear() { 
      delegate.clear(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (!(obj instanceof QueueWithHashCodeAndEquals)) { 
       return false; 
      } 
      QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj; 
      return Arrays.equals(toArray(), other.toArray()); 
     } 

     @Override 
     public int hashCode() { 
      return Arrays.hashCode(toArray()); 
     } 

    } 

} 
Cuestiones relacionadas