2010-02-01 16 views
40

preguntas similares:Envolver un cálculo asíncrono en un cálculo síncrona (bloqueo)

Tengo un objeto con un método me gustaría exponer a los clientes de la biblioteca (especialmente clientes de scripting) como algo como:

interface MyNiceInterface 
{ 
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg); 
    public Future<Baz> doSomething(Foo fooArg, Bar barArg); 
    // doSomethingAndBlock is the straightforward way; 
    // doSomething has more control but deals with 
    // a Future and that might be too much hassle for 
    // scripting clients 
} 

pero la "materia" primitivo que tengo disponible es un conjunto de clases basadas en eventos:

interface BazComputationSink 
{ 
    public void onBazResult(Baz result); 
} 

class ImplementingThing 
{ 
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink); 
} 

donde ImplementingThing toma insumos, hace algunas cosas arcano, como encolar las cosas en una cola de tareas, y luego más tarde cuando se produce un resultado , Se llama a sink.onBazResult() en un hilo que puede ser o no el mismo hilo que ImplementingThing.doSomethingAsync().

¿Hay alguna manera de usar las funciones controladas por eventos que tengo, junto con las primitivas de concurrencia, para implementar MyNiceInterface para que los clientes de scripting puedan esperar con gusto un hilo de bloqueo?

editar: ¿Puedo usar FutureTask para esto?

Respuesta

41

Usando su propio implemenation futuro:

public class BazComputationFuture implements Future<Baz>, BazComputationSink { 

    private volatile Baz result = null; 
    private volatile boolean cancelled = false; 
    private final CountDownLatch countDownLatch; 

    public BazComputationFuture() { 
     countDownLatch = new CountDownLatch(1); 
    } 

    @Override 
    public boolean cancel(final boolean mayInterruptIfRunning) { 
     if (isDone()) { 
      return false; 
     } else { 
      countDownLatch.countDown(); 
      cancelled = true; 
      return !isDone(); 
     } 
    } 

    @Override 
    public Baz get() throws InterruptedException, ExecutionException { 
     countDownLatch.await(); 
     return result; 
    } 

    @Override 
    public Baz get(final long timeout, final TimeUnit unit) 
      throws InterruptedException, ExecutionException, TimeoutException { 
     countDownLatch.await(timeout, unit); 
     return result; 
    } 

    @Override 
    public boolean isCancelled() { 
     return cancelled; 
    } 

    @Override 
    public boolean isDone() { 
     return countDownLatch.getCount() == 0; 
    } 

    public void onBazResult(final Baz result) { 
     this.result = result; 
     countDownLatch.countDown(); 
    } 

} 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    BazComputationFuture future = new BazComputationFuture(); 
    doSomethingAsync(fooArg, barArg, future); 
    return future; 
} 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    return doSomething(fooArg, barArg).get(); 
} 

La solución crea un CountDownLatch internamente que se borra una vez que se recibe la devolución de llamada. Si el usuario llama a get, CountDownLatch se usa para bloquear el hilo de llamada hasta que el cálculo se complete y llame a la devolución de llamada de onBazResult. CountDownLatch asegurará que si la devolución de llamada ocurre antes de llamar a get(), el método get() devolverá inmediatamente con un resultado.

+0

(+1) buena solución – skaffman

+0

+1 porque creo que lo entiendo ... pero ¿podría explicar los aspectos de concurrencia? (el uso del bloqueo de cuenta regresiva) –

+0

también te falta un "hecho = verdadero" en algún lugar en onBazResult() .... –

14

Bueno, no es la solución simple de hacer algo como:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    final AtomicReference<Baz> notifier = new AtomicReference(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
    public void onBazResult(Baz result) { 
     synchronized (notifier) { 
     notifier.set(result); 
     notifier.notify(); 
     } 
    } 
    }); 
    synchronized (notifier) { 
    while (notifier.get() == null) 
     notifier.wait(); 
    } 
    return notifier.get(); 
} 

Por supuesto, esto supone que el resultado de su Baz nunca será nula ...

+0

este funciona y es un bonito patrón elegante. Gracias. –

+0

Devolución de llamada interna (BazComputationSink() {}), se quejaría: inicializa el notificador. Y si se inicializa a nulo, obtengo NullPointerException lanzado para el primer hit, porque mis resultados no están listos aún en la tarea asincrónica. –

+1

Esto no se recomienda: si la devolución de llamada vuelve inmediatamente, se llamará a la notificación antes de esperar y se producirá un interbloqueo (esto puede suceder si tiene una lógica de caché) – for3st

11

Google guava library tiene un SettableFuture fácil de usar que hace que este problema sea muy simple (alrededor de 10 líneas de código).

public class ImplementingThing { 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    try { 
     return doSomething(fooArg, barArg).get(); 
    } catch (Exception e) { 
     throw new RuntimeException("Oh dear"); 
    } 
}; 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    final SettableFuture<Baz> future = new SettableFuture<Baz>(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
     @Override 
     public void onBazResult(Baz result) { 
      future.set(result); 
     } 
    }); 
    return future; 
}; 

// Everything below here is just mock stuff to make the example work, 
// so you can copy it into your IDE and see it run. 

public static class Baz {} 
public static class Foo {} 
public static class Bar {} 

public static interface BazComputationSink { 
    public void onBazResult(Baz result); 
} 

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) { 
    new Thread(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       Thread.sleep(4000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      Baz baz = new Baz(); 
      sink.onBazResult(baz); 
     } 
    }).start(); 
}; 

public static void main(String[] args) { 
    System.err.println("Starting Main"); 
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null)); 
    System.err.println("Ending Main"); 
} 
3

Un ejemplo muy simple, sólo para entender CountDownLatch sin ningún código adicional.

A java.util.concurrent.CountDownLatch es una construcción de simultaneidad que permite que uno o más subprocesos esperen a que se complete un conjunto dado de operaciones.

A CountDownLatch se inicializa con un conteo dado. Este conteo se reduce mediante llamadas al método countDown(). Los hilos que esperan que esta cuenta llegue a cero pueden llamar a uno de los métodos await(). Llamar al await() bloquea el hilo hasta que el conteo llegue a cero.

A continuación se muestra un ejemplo simple.Después de que el Decrementador ha llamado countDown() 3 veces en el CountDownLatch, el mesero en espera se libera de la llamada await().

También puede mencionar algunos TimeOut a la espera.

CountDownLatch latch = new CountDownLatch(3); 

Waiter  waiter  = new Waiter(latch); 
Decrementer decrementer = new Decrementer(latch); 

new Thread(waiter)  .start(); 
new Thread(decrementer).start(); 

Thread.sleep(4000); 
public class Waiter implements Runnable{ 

    CountDownLatch latch = null; 

    public Waiter(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 
     try { 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println("Waiter Released"); 
    } 
} 

// --------------

public class Decrementer implements Runnable { 

    CountDownLatch latch = null; 

    public Decrementer(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 

     try { 
      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Reference

Si no desea utilizar un CountDownLatch o su requerimiento es algo lo mismo que Facebook como y a diferencia de la funcionalidad. Significa que si se está llamando a un método, no llame al otro método.

En ese caso se puede declarar un

private volatile Boolean isInprocessOfLikeOrUnLike = false; 

y luego se puede comprobar en el comienzo de su llamada a un método que si es false continuación, llamar al método de lo contrario volver .. depende de su aplicación.

+0

volátil Boolean? Creo que quieres un booleano volátil (lowecase b), o mejor: una AtomicReference –

2

Aquí es una solución más genérica basada en la respuesta de Pablo Wagland:

public abstract class AsyncRunnable<T> { 
    protected abstract void run(AtomicReference<T> notifier); 

    protected final void finish(AtomicReference<T> notifier, T result) { 
     synchronized (notifier) { 
      notifier.set(result); 
      notifier.notify(); 
     } 
    } 

    public static <T> T wait(AsyncRunnable<T> runnable) { 
     final AtomicReference<T> notifier = new AtomicReference<>(); 

     // run the asynchronous code 
     runnable.run(notifier); 

     // wait for the asynchronous code to finish 
     synchronized (notifier) { 
      while (notifier.get() == null) { 
       try { 
        notifier.wait(); 
       } catch (InterruptedException ignore) {} 
      } 
     } 

     // return the result of the asynchronous code 
     return notifier.get(); 
    } 
} 

He aquí un ejemplo de cómo usarlo ::

String result = AsyncRunnable.wait(new AsyncRunnable<String>() { 
     @Override 
     public void run(final AtomicReference<String> notifier) { 
      // here goes your async code, e.g.: 
      new Thread(new Runnable() { 
       @Override 
       public void run() { 
        finish(notifier, "This was a asynchronous call!"); 
       } 
      }).start(); 
     } 
    }); 

Una versión más detallada del código se puede encontrar aquí: http://pastebin.com/hKHJUBqE

EDIT: el ejemplo relacionado con la pregunta sería:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) { 
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() { 
     @Override 
     protected void run(final AtomicReference<Baz> notifier) { 
      doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
       public void onBazResult(Baz result) { 
        synchronized (notifier) { 
         notifier.set(result); 
         notifier.notify(); 
        } 
       } 
      }); 
     } 
    }); 
} 
+0

Comprendo tu respuesta, pero no veo cómo asignarla a mi pregunta de hace más de seis años. –

+0

Actualicé la respuesta para que pueda asignarla a la pregunta –

+0

BTW si tiene que hacer esto en los lugares que consideraría usar esto: https://github.com/ReactiveX/RxJava. ReactiveX es una combinación perfecta para su problema, aunque tiene una curva de aprendizaje pronunciada (al menos para mí). Esta es una gran introducción: https://www.youtube.com/watch?v=_t06LRX0DV0. –

2

Esto es muy simple con RxJava 2.x:

try { 
    Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
      doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
      .toFuture().get(); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} catch (ExecutionException e) { 
    e.printStackTrace(); 
} 

o sin Lambda notación:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() { 
       @Override 
       public void subscribe(SingleEmitter<Baz> emitter) { 
        doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
         @Override 
         public void onBazResult(Baz result) { 
          emitter.onSuccess(result); 
         } 
        }); 
       } 
      }).toFuture().get(); 

aún más simple:

Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
       doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
       .blockingGet();