2010-08-04 14 views
7

Estoy intentando escribir un rastreador web multiproceso.Uso de Java ThreadPool

Mi clase principal de entrada tiene el siguiente código:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null) 
     return; 
exec.execute(new URLCrawler(this, url)); 
} 

El URLCrawler Obtiene la URL especificada, analiza el código HTML extrae enlaces de ella, y los horarios de enlaces que no se ven de nuevo a la frontera.

Una frontera es una cola de URL no rastreadas. El problema es cómo escribir el método get(). Si la cola está vacía, debe esperar hasta que termine URLCrawlers y vuelva a intentarlo. Debería devolver nulo solo cuando la cola está vacía y no hay actualmente URLrawler activo.

Mi primera idea fue utilizar un AtomicInteger para contar el número actual de URCrawlers activos y un objeto auxiliar para llamadas notifyAll()/wait(). Cada rastreador al inicio incrementa el número de ejecutores URLrawlers en funcionamiento, y al salir lo reduce, y notifica al objeto que se ha completado.

Pero he leído que notify()/notifyAll() y wait() son métodos algo obsoletos para hacer la comunicación de subprocesos.

¿Qué debo usar en este patrón de trabajo? Es similar a M productores y N consumidores, la pregunta es cómo lidiar con la exausación de los productores.

Respuesta

1

Creo que el uso de wait/notify está justificado en este caso. No se puede pensar en ninguna forma directa de hacerlo utilizando j.u.c.
En una clase, vamos a llamar a Coordinador:

private final int numOfCrawlers; 
private int waiting; 

public boolean shouldTryAgain(){ 
    synchronized(this){ 
     waiting++; 
     if(waiting>=numOfCrawlers){ 
      //Everybody is waiting, terminate 
      return false; 
     }else{ 
      wait();//spurious wake up is okay 
      //waked up for whatever reason. Try again 
      waiting--; 
      return true; 
     } 
    } 

public void hasEnqueued(){ 
    synchronized(this){ 
     notifyAll(); 
    } 
} 

entonces,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null){ 
     if(!coordinator.shouldTryAgain()){ 
      //all threads are waiting. No possibility of new jobs. 
      return; 
     }else{ 
      //Possible that there are other jobs. Try again 
      continue; 
     } 
    } 
    exec.execute(new URLCrawler(this, url)); 
}//while(true) 
3

No estoy seguro de entender su diseño, pero esto puede ser un trabajo para una opción Semaphore

3

Una de ellas es hacer "frontera" una cola de bloqueo, así que cualquier hilo tratando de "obtener" de que bloqueará . Tan pronto como cualquier otro URLCrawler ponga objetos en esa cola, cualquier otro subproceso se notificará automáticamente (con el objeto eliminado)

+0

Sí, esa es una solución para un estado estable. ¿Pero cómo lidiar luego con la situación cuando ninguno de URLrawlers pone en cola ninguna URL? Con una cola de bloqueo, la frontera se bloqueará infinitamente. –

+0

En ese caso, puede tener un método crawlerDone() en su objeto de frontera que se llama cada vez que un UrlCrawler termina de funcionar. Este método, junto con el enfoque de contador que sugirió, puede probar (en su método de frontera) si todos los rastreadores han finalizado. Si eso es cierto, get() puede devolver nulo sin bloquear – naikus

+0

frontier puede ser una cola de bloqueo de capacidad fija. un buen candidato para esa capacidad es numberOfCrawlers –

2

creo que un bloque de construcción básico para su caso de uso es un "enganche", similar a CountDownLatch, pero a diferencia de CountDownLatch, uno que permite incrementando el recuento también.

Una interfaz para un cerrojo tal podría ser

public interface Latch { 
    public void countDown(); 
    public void countUp(); 
    public void await() throws InterruptedException; 
    public int getCount(); 
} 

valores válidos para el recuento sería 0 en adelante. El método await() te permitirá bloquear hasta que el conteo se reduzca a cero.

Si tiene un pestillo de este tipo, su caso de uso puede describirse con bastante facilidad. También sospecho que la cola (frontera) se puede eliminar en esta solución (el ejecutor proporciona uno de todos modos, así que es algo redundante).Me gustaría reescribir su rutina principal como

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers); 
Latch latch = ...; // instantiate a latch 
URL[] initialUrls = ...; 
for (URL url: initialUrls) { 
    executor.execute(new URLCrawler(this, url, latch)); 
} 
// now wait for all crawling tasks to finish 
latch.await(); 

Su URLCrawler utilizaría el pestillo de esta manera:

public class URLCrawler implements Runnable { 
    private final Latch latch; 

    public URLCrawler(..., Latch l) { 
     ... 
     latch = l; 
     latch.countUp(); // increment the count as early as possible 
    } 

    public void run() { 
     try { 
      List<URL> secondaryUrls = crawl(); 
      for (URL url: secondaryUrls) { 
       // submit new tasks directly 
       executor.execute(new URLCrawler(..., latch)); 
      } 
     } finally { 
      // as a last step, decrement the count 
      latch.countDown(); 
     } 
    } 
} 

En cuanto a las implementaciones de enganche, no puede haber una serie de implementaciones posibles, que van desde una que es basado en wait() y notifyAll(), uno que usa Bloqueo y condición, para una implementación que utiliza el sincronizador de síntesis abstracta (AbstractQueuedSynchronizer). Todas estas implementaciones, creo, serían bastante sencillas. Tenga en cuenta que la versión wait() - notifyAll() y la versión Lock-Condition se basarían en la exclusión mutua, mientras que la versión AQS utilizaría CAS (compare-and-swap) y, por lo tanto, podría escalarse mejor en determinadas situaciones.

+0

Su pestillo personalizado se parece mucho a un semáforo ... ¿Por qué no usar uno? – assylias

+0

Sí, ciertamente hay similitudes. Una cosa que falta en el semáforo vainilla es el método await() arriba, que en el término del semáforo puede bloquear hasta que se liberen todos los permisos.Uno probablemente puede crear esto combinando un semáforo y un bloqueo de cuenta atrás. – sjlee

0

Me gustaría sugerir un AdaptiveExecuter. En función de un valor característico, puede optar por serializar o paralizar un hilo para su ejecución. En el ejemplo siguiente, PUID es una cadena/objeto que quería usar para tomar esa decisión. Puede alterar la lógica para adaptarse a su código. Algunas partes del código se comentan para permitir experimentos adicionales.

clase AdaptiveExecutor implementa Executor { final Queue tasks = new LinkedBlockingQueue(); Runnable active; // ExecutorService threadExecutor = Executors.newCachedThreadPool(); static ExecutorService threadExecutor = Executors.newFixedThreadPool (4);

AdaptiveExecutor() { 
    System.out.println("Initial Queue Size=" + tasks.size()); 
} 

public void execute(final Runnable r) { 
    /* if immediate start is needed do either of below two 
    new Thread(r).start(); 

    try { 
     threadExecutor.execute(r); 
    } catch(RejectedExecutionException rEE) { 
     System.out.println("Thread Rejected " + new Thread(r).getName()); 
    } 

    */ 


    tasks.offer(r); // otherwise, queue them up 
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel. 
    /* 
    tasks.offer(new Runnable() { 
     public void run() { 
      try { 
       r.run(); 
      } finally { 
       scheduleNext(); 
      } 
     } 
    }); 
    */ 
    if ((active == null)&& !tasks.isEmpty()) { 
     active = tasks.poll(); 
     try { 
      threadExecutor.submit(active); 
     } catch (RejectedExecutionException rEE) { 
      System.out.println("Thread Rejected " + new Thread(r).getName()); 
     } 
    } 

    /* 
    if ((active == null)&& !tasks.isEmpty()) { 
     scheduleNext(); 
    } else tasks.offer(r); 
    */ 
    //tasks.offer(r); 

    //System.out.println("Queue Size=" + tasks.size()); 

} 

private void serialize(Thread th) { 
    try { 
     Thread activeThread = new Thread(active); 

     th.wait(200); 
     threadExecutor.submit(th); 
    } catch (InterruptedException iEx) { 

    } 
    /* 
    active=tasks.poll(); 
    System.out.println("active thread is " + active.toString()); 
    threadExecutor.execute(active); 
    */ 
} 

private void parallalize() { 
    if(null!=active) 
     threadExecutor.submit(active); 
} 

protected void scheduleNext(Thread r) { 
    //System.out.println("scheduleNext called") ; 
    if(false==compareKeys(r,new Thread(active))) 
     parallalize(); 
    else serialize(r); 
} 

private boolean compareKeys(Thread r, Thread active) { 
    // TODO: obtain names of threads. If they contain same PUID, serialize them. 
    if(null==active) 
     return true; // first thread should be serialized 
    else return false; //rest all go parallel, unless logic controlls it 
} 

}

2

La pregunta es un poco viejo, pero creo que he encontrado alguna solución sencilla, de trabajo:

extender la clase ThreadPoolExecutor como a continuación. La nueva funcionalidad mantiene el recuento de tareas activas (desafortunadamente, siempre que getActiveCount() no sea confiable). Si taskCount.get() == 0 y no hay más tareas en cola, significa que no hay nada que hacer y el ejecutor se apaga. Usted tiene su criterio de salida. Además, si crea su albacea, pero no presenta ninguna tarea, no va a bloquear:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor { 

    private final AtomicInteger taskCount = new AtomicInteger(); 

    public CrawlingThreadPoolExecutor() { 
     super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 

     super.beforeExecute(t, r); 
     taskCount.incrementAndGet(); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 

     super.afterExecute(r, t); 
     taskCount.decrementAndGet(); 
     if (getQueue().isEmpty() && taskCount.get() == 0) { 
      shutdown(); 
     } 
    } 
} 

Una cosa más que usted tiene que hacer es poner en práctica su Runnable en una forma en que se mantiene referencia a Executor se están utilizando para poder enviar nuevas tareas. Aquí hay un simulacro:

public class MockFetcher implements Runnable { 

    private final String url; 
    private final Executor e; 

    public MockFetcher(final Executor e, final String url) { 
     this.e = e; 
     this.url = url; 
    } 

    @Override 
    public void run() { 
     final List<String> newUrls = new ArrayList<>(); 
     // Parse doc and build url list, and then: 
     for (final String newUrl : newUrls) { 
      e.execute(new MockFetcher(this.e, newUrl)); 
     } 
    } 
} 
Cuestiones relacionadas