2012-01-18 12 views
7

Quiero ejecutar algunas tareas diferentes en paralelo, pero tengo el concepto de que si una tarea ya está en cola o se está procesando actualmente, no se volverá a poner en cola. He leído un poco sobre la API de Java y he encontrado el siguiente código, que parece funcionar. ¿Alguien puede arrojar luz sobre si el método que estoy usando es el mejor enfoque? ¿Algún peligro (seguridad de la rosca?) O mejores formas de hacer esto? Código es la siguiente:Thread Pool manejo de tareas 'duplicadas'

import java.util.HashMap; 
import java.util.concurrent.Future; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2; 
    static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>(); 
    static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); 
    static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q); 

    public static void main(String[] args) { 
     try { 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    static boolean execute(TestExecution e) { 
     System.out.println("Handling "+e.key1+":"+e.key2); 
     if (executions.containsKey(e)) { 
     Future<?> f = (Future<?>) executions.get(e); 
     if (f.isDone()) { 
      System.out.println("Previous execution has completed"); 
      executions.remove(e); 
     } else { 
      System.out.println("Previous execution still running"); 
      return false; 
     }   
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     Future<?> f = tpe.submit(e); 
     executions.put(e, f);    
     return true; 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    }    
} 

seguimiento para comentar a continuación:
El plan es que la activación de las tareas a ejecutar serán manejados por cron llamando al servicio web REST. Por ejemplo, a continuación, se muestra la configuración de una tarea activada a las 9:30 todos los días, más otra programada cada dos minutos.

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22 

En este caso, si key11 tarea: key12 se está ejecutando, o ya están en cola para funcionar, no quiero hacer cola otra instancia. Entiendo que tenemos otras opciones para programar, sin embargo, tendemos a utilizar cron para otras tareas, por lo que quiero tratar de mantener esto.

Segunda actualización. En respuesta a los comentarios hasta el momento, he vuelto a escribir el código. ¿Podría comentar algún problema con la siguiente solución actualizada?

import java.util.concurrent.LinkedBlockingQueue; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2;  
    static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>()); 

    public static void main(String[] args) { 
     try { 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     tpe.execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    } 
} 


import java.util.Collections; 
import java.util.HashSet; 
import java.util.Set; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 


public class TestThreadPoolExecutor extends ThreadPoolExecutor { 
    Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>()); 

    public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {  
     super(2, 5, 1, TimeUnit.MINUTES, q);  
    } 

    public void execute(Runnable command) { 
     if (executions.contains(command)) { 
     System.out.println("Previous execution still running"); 
     return; 
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     super.execute(command);  
     executions.add(command);  
    } 

    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t);   
     executions.remove(r); 
    }  
} 
+0

Por qué no utilizar un hashset para TestExecution en lugar de HashMap ?? –

Respuesta

2

Un par de comentarios:

  • en el método de ejecución, obtendrá una carrera de condición entre la lectura de "ejecuciones" (containsKey) y la escritura (quitar o poner) si varios hilos llaman a este método al mismo tiempo. Debe ajustar todas las llamadas a "ejecuciones" que se supone que son atómicas en un bloque sincronizado. (En su caso, haciendo que el método sincronizado funcionará) http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
  • Usted debe manejar estado utilizando un producto único en lugar de variables (es decir, globales) estáticas

Pero realmente me gustaría saber un poco más acerca de su diseño para entender lo que estás tratando de lograr ¿Por qué una tarea se pondría en cola para su ejecución varias veces?

+0

Gracias, he actualizado la pregunta con algo más de información. – Patrick

+1

Y para un diseño más orientado a objetos, consideraría subclasificar ThreadPoolExecutor y poner el código para administrar el mapa de ejecuciones en las funciones execute() y afterExecute(). (También me parece más correcto llamar a execute() en lugar de submit(), pero la especificación no es explícita en este punto) –

+0

Saludos, han reescrito el código según sus sugerencias. ¿Eso parece mejor? – Patrick

3

Aquí es cómo iba a manejar y evitar duplicados

import java.util.Collections; 
import java.util.Set; 
import java.util.concurrent.*; 

public class TestExecution implements Callable<Void> { 
    private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
    private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>()); 

    private final String key1; 
    private final String key2; 

    public static void main(String... args) throws InterruptedException { 
     new TestExecution("A", "A").execute(); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     Thread.sleep(8000); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     new TestExecution("B", "B").execute(); 
     TPE.shutdown(); 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2; 
    } 

    void execute() { 
     if (TE_SET.add(this)) { 
      System.out.println("Handling " + this); 
      TPE.submit(this); 
     } else { 
      System.out.println("... ignoring duplicate " + this); 
     } 
    } 

    public boolean equals(Object obj) { 
     return obj instanceof TestExecution && 
       key1.equals(((TestExecution) obj).key1) && 
       key2.equals(((TestExecution) obj).key2); 
    } 

    public int hashCode() { 
     return key1.hashCode() * 31 + key2.hashCode(); 
    } 

    @Override 
    public Void call() throws InterruptedException { 
     if (!TE_SET.remove(this)) { 
      System.out.println("... dropping duplicate " + this); 
      return null; 
     } 
     System.out.println("Start processing " + this); 
     Thread.sleep(4000); 
     System.out.println("Finish processing " + this); 
     return null; 
    } 

    public String toString() { 
     return key1 + ':' + key2; 
    } 
} 

grabados

Handling A:A 
... ignoring duplicate A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
Finish processing A:A 
Finish processing B:B 
Handling A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
... ignoring duplicate B:B 
Finish processing B:B 
Finish processing A:A 
+0

Bien, gracias, algunos buenos punteros allí, especialmente evitar problemas de subprocesamiento múltiple utilizando ConcurrentHashMap y sobreescribiendo el método toString. Un par de preguntas.¿Por qué no usar HashSet (es porque no hay un objeto seguro para subprocesos equivalente para usar?) Tampoco entiendo el código para eliminarlo de HashMap. Parece que hace esto al comienzo del procesamiento, ¿no debería ser eso al final del procesamiento? – Patrick

+1

Puede usar 'Collections.synchronizedSet (new HashSet())' Esto es seguro para subprocesos, pero no es concurrente. Si está al principio o al final depende de sus requisitos. ¿Es mejor ocasionalmente hacer algo dos veces u ocasionalmente no hacer algo (porque la nueva tarea se agregó entre la finalización de la tarea y la eliminación) –

+0

OK, realmente no sé la diferencia entre 'thread safe' y 'concurrent', tal vez Tengo algo de investigación que hacer allí. Pero para eliminar el elemento, si no quiero volver a ejecutar (o poner en cola) un trabajo que ya ha comenzado, entonces el TE_SET.remove debe moverse al final de la función de llamada, ¿verdad? ¿Estoy en lo correcto al asumir que el caso de "soltar duplicados" es un caso de error? Si estamos en la función de llamada, entonces un elemento siempre debería haber sido escrito en el HashSet ¿verdad? – Patrick