2011-09-03 15 views
12

supongamos que tengo una función del sueño:Método Scala @suspendable en un futuro

def sleep(delay:Int) : Unit @suspendable = { 
    .... 
} 

es posible tener un futuro función que crea una versión asíncrona de la función del sueño que puede ser esperado en forma sincrónica.

def future(targetFunc: (Int => Unit @suspendable)) : (Int => Future) = { 
    .... 
} 

class Future { 
    def await : Unit @suspendable = { 
    .... 
    } 
} 

usted debería ser capaz de hacer algo como esto: deben aparecer

reset { 
    val sleepAsync = future(sleep) 
    val future1 = sleepAsync(2000) 
    val future2 = sleepAsync(3000) 
    future1.await 
    future2.await 
    /* finishes after a delay of 3000 */ 
} 

las dos llamadas a sleepAsync para volver de inmediato y las dos llamadas a futuro # aguardan debe aparecer bloquear. por supuesto, todos caen al final del restablecimiento y el código posterior es responsable de llamar a la continuación después de la demora.

de lo contrario, ¿existe un método alternativo para ejecutar dos funciones @suspendable en paralelo y esperar a que se completen?

que tienen una esencia compilar con un esqueleto de lo que quiero hacer: https://gist.github.com/1191381

+0

escribí esto: https://gist.github.com/1191571 que parece funcionar, pero parece bastante complicado. Siento que me podría estar perdiendo una forma más simple de hacerlo. – benmmurphy

+0

también encontró esto: http://days2011.scala-lang.org/node/138/288 que parece hacerlo mucho mejor. – benmmurphy

+0

¿Tiene preferencia por la respuesta "ganadora"? Necesito dar el premio de recompensa. –

Respuesta

1

No estoy seguro de que entiendo completamente la pregunta, pero aquí es una oportunidad:

import scala.util.continuations._ 

class Future(thread: Thread) { 
    def await = thread.join 
} 

object Future { 

    def sleep(delay: Long) = Thread.sleep(delay) 

    def future[A,B](f: A => B) = (a: A) => shift { k: (Future => Unit) => 
    val thread = new Thread { override def run() { f(a) } } 
    thread.start() 

    k(new Future(thread)) 
    } 

    def main(args:Array[String]) = reset { 
    val sleepAsync = future(sleep) 
    val future1 = sleepAsync(2000) // returns right away 
    val future2 = sleepAsync(3000) // returns right away 
    future1.await // returns after two seconds 
    future2.await // returns after an additional one second 
    // finished after a total delay of three seconds 
    } 
} 

Aquí, una instancia Future no es más que un identificador en un Thread, por lo que puede usar su método join para bloquear hasta que finalice.

La función future toma una función de tipo A => B, y devuelve una función que, cuando se suministra con una A se iniciará un hilo para ejecutar la función "FUTURED", y se envuelve en una Future, que se inyecta de nuevo en la continuación, asignándola a val future1.

¿Es esto algo cercano a lo que usted estaba buscando?

+0

quería usar continuaciones en lugar de hilos – benmmurphy

+0

Puede ejecutar las continuaciones como desee, pero de alguna manera tienen que salir del hilo actual (de lo contrario, el tiempo de ejecución total sería de 5000 ms en lugar de 3000 ms). En la práctica, probablemente usaría un grupo de subprocesos en lugar de crear sus propias instancias 'Thread'. ¿Cómo quieres ejecutar 'future1' y' future2'? – earldouglas

+0

ah. Quiero que el futuro pueda tomar una función suspendible no es una función normal – benmmurphy

2
object Forks { 

    import scala.util.continuations._ 

    case class Forker(forks: Vector[() => Unit @suspendable]) { 
    def ~(block: => Unit @suspendable): Forker = Forker(forks :+ (() => block)) 
    def joinIf(pred: Int => Boolean): Unit @suspendable = shift { k: (Unit => Unit) => 
     val counter = new java.util.concurrent.atomic.AtomicInteger(forks.size) 
     forks foreach { f => 
     reset { 
      f() 
      if (pred(counter.decrementAndGet)) k() 
     } 
     } 
    } 
    def joinAll() = joinIf(_ == 0) 
    def joinAny() = joinIf(_ == forks.size - 1) 
    } 

    def fork(block: => Unit @suspendable): Forker = Forker(Vector(() => block)) 
} 

usando fork(), ahora podemos esperar muchas "suspensables". use ~() para encadenar suspensibles. utilice joinAll() para esperar todas las suspensibles y joinAny() para esperar solo una. use joinIf() para personalizar la estrategia de unión.

object Tests extends App { 

    import java.util.{Timer, TimerTask} 
    import scala.util.continuations._ 

    implicit val timer = new Timer 

    def sleep(ms: Int)(implicit timer: Timer): Unit @suspendable = { 
    shift { k: (Unit => Unit) => 
     timer.schedule(new TimerTask { 
     def run = k() 
     }, ms) 
    } 
    } 

    import Forks._ 

    reset { 
    fork { 
     println("sleeping for 2000 ms") 
     sleep(2000) 
     println("slept for 2000 ms") 
    } ~ { 
     println("sleeping for 4000 ms") 
     sleep(4000) 
     println("slept for 4000 ms") 
    } joinAll() 
    println("and we are done") 
    } 
    println("outside reset") 
    readLine 
    timer.cancel 
} 

y esta es la salida. programa se inicia en el momento T:

sleeping for 2000 ms 
sleeping for 4000 ms 
outside reset   <<<<<< T + 0 second 
slept for 2000 ms  <<<<<< T + 2 seconds 
slept for 4000 ms  <<<<<< T + 4 seconds 
and we are done  <<<<<< T + 4 seconds 
Cuestiones relacionadas