2011-08-16 21 views
8

Estoy intentando obtener un comportamiento tolerante a fallas en los actores Akka. Estoy trabajando en un código que depende de que los actores del sistema estén disponibles para un largo período de procesamiento. Estoy descubriendo que mi procesamiento se detiene después de un par de horas (debería tomar alrededor de 10 horas) y no ocurre gran cosa. Creo que mis Actores no se están recuperando de las excepciones.¿Cómo configuro la tolerancia a fallas del Akka Actor?

¿Qué debo hacer para que los actores se reinicien de forma permanente uno por uno? Espero que esto se puede hacer a partir de esta documentación http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

estoy trabajando con akka 1.1.3 y Scala 2,9

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached 
import akka.dispatch.Dispatchers 
import akka.routing.CyclicIterator 
import akka.routing.LoadBalancer 
import akka.config.Supervision._ 


object TestActor { 
    val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") 
        .setCorePoolSize(100) 
        .setMaxPoolSize(100) 
        .build 
} 

class TestActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.dispatcher = TestActor.dispatcher 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure") 
     println("Actor: " + name + " Received: " + num) 
     //Thread.sleep(100) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    //callback method for restart handling 
    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    //callback method for restart handling 
    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

trait CyclicLoadBalancing extends LoadBalancer { this: Actor => 
    val testActors: List[ActorRef] 
    val seq = new CyclicIterator[ActorRef](testActors) 
} 

trait TestActorManager extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000) 
    val testActors: List[ActorRef] 
    override def preStart = testActors foreach { self.startLink(_) } 
    override def postStop = { System.out.println("postStop") } 
} 


    object FaultTest { 
    def main(args : Array[String]) : Unit = { 
     println("starting FaultTest.main()") 
     val numOfActors = 5 
     val supervisor = actorOf(
     new TestActorManager with CyclicLoadBalancing { 
      val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); 
     } 
    ) 

     supervisor.start(); 

     println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) 

     val testActor = Actor.registry.actorsFor(classOf[TestActor]).head 

     (1 until 200 toList) foreach { testActor ! _ } 

    } 
    } 

Este código configura 5 actores detrás de un LoadBalancer que acaba de imprimir números enteros que sean enviado a ellos excepto que arrojan Excepciones en números pares para simular fallas. Los enteros del 0 al 200 se envían a estos actores. Espero que los números impares salgan, pero todo parece cerrarse después de un par de fallas en los números pares. La ejecución de este código con resultados SBT en esta salida:

[info] Running FaultTest 
starting FaultTest.main() 
Loading config [akka.conf] from the application classpath. 
Number of Actors: 5 
Actor: 2 Received: 1 
Actor: 2 Received: 9 
Actor: 1 Received: 3 
Actor: 3 Received: 7 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM 

Lo que creo que está pasando aquí es que comienzan 5 actores, y los primeros 5 números pares ponerlos fuera del negocio y no se están renovadas.

¿Cómo se puede cambiar este código para que los actores se recuperen de las excepciones?

Espero que esto realmente imprima todos los números impares del 1 al 200. Creo que cada actor fallaría en los números pares pero se reiniciaría con un buzón intacto en las excepciones. Espero ver la impresión de preRestart y postRestart. ¿Qué se debe configurar en este ejemplo de código para que esto suceda?

Aquí hay algunas suposiciones adicionales sobre akka y Actores que pueden llevar a mi malentendido. Supongo que un Actor se puede configurar con un Supervisor o un Manejador de Fallas para que se reinicie y continúe disponible cuando se produce una excepción durante la recepción. Supongo que el mensaje que se envió al actor se perderá si arroja una excepción durante la recepción. Supongo que se invocarán preRestart() y postRestart() en el actor que lanza la excepción.

El ejemplo de código representa lo que estoy tratando de hacer y se basa en Why is my Dispatching on Actors scaled down in Akka?

** ** Otro ejemplo de código

Aquí hay otro ejemplo de código que es más sencillo. Estoy comenzando un actor que arroja excepciones sobre números pares. No hay equilibrador de carga u otras cosas en el camino. Estoy intentando imprimir información sobre el actor. Estoy esperando salir del programa por un minuto después de que los mensajes hayan sido enviados al Actor y supervisar lo que está sucediendo.

Espero que esto imprima los números impares pero parece que el Actor se encuentra con mensajes en su buzón.

¿Tengo el OneForOneStrategy configurado incorrecto? ¿Debo vincular al Actor con algo? ¿Este tipo de configuración está fundamentalmente mal dirigida por mi parte? ¿Se debe configurar un Dispatcher con tolerancia a errores de algún modo? ¿Podría estar estropeando los hilos en el Dispatcher?

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.ActorRegistry 
import akka.config.Supervision._ 

class SingleActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000) 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure, where does this get logged?") 
     println("Actor: " + name + " Received: " + num) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

object TestSingleActor{ 

    def main(args : Array[String]) : Unit = { 
     println("starting TestSingleActor.main()") 

     val testActor = Actor.actorOf(new SingleActor(1)).start() 

     println("number of actors: " + registry.actors.size) 
     printAllActorsInfo 

     (1 until 20 toList) foreach { testActor ! _ } 

     for(i <- 1 until 120){ 
     Thread.sleep(500) 
     printAllActorsInfo 
     } 
    } 

    def printAllActorsInfo() ={ 
    registry.actors.foreach((a) => 
     println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " 
       .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) 
    } 
} 

Estoy recibiendo una salida como:

[info] Running TestSingleActor 
starting TestSingleActor.main() 
Loading config [akka.conf] from the application classpath. 
number of actors: 1 
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1 
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ... 

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM 

Respuesta

5

El problema era que estaba con mi archivo akka.conf. Estaba usando el archivo akka.conf de referencia 1.1.3, excepto la línea que configuró los manejadores de eventos.

mío (el roto uno):

event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

referencia 1.1.3 (la que trabaja):

event-handlers = ["akka.event.EventHandler$DefaultListener"] 

Con mi evento manipuladores línea de configuración, reinicie el actor no suceden. Con la referencia 1.1.3, los reinicios de línea ocurren maravillosamente.

hice este cambio sobre la base de estas instrucciones http://akka.io/docs/akka/1.1.3/general/slf4j.html

Así, mediante la eliminación de las sugerencias en la página y volver a la referencia 1.1.3 akka.conf yo era capaz de obtener fallos Actores tolerantes.

1

Creo que su problema termina después se envían los mensajes, no están tratando de mantener su solicitud asíncrona con vida, por lo que las principales salidas de rosca , y lleva todo abajo con eso.

+0

Si añado un Trhead.sleep (100000) al final del main() me sale: '[info] Ejecución de FaultTest partir FaultTest.main() Cargando config [akka.conf] de la ruta de clases de la aplicación. número de actores: 5 Actor: 0 Recibido: 1 Actor: 4 Recibido: 3 Actor: 1 Recibido: 7 Actor: 1 Recibido: 9' y las pausas de salida, pero los números adicionales no se imprimen. No esperé a que la aplicación saliera, pero después de 30-40 segundos no había nada. Además, si elimino la falla, los números se imprimen muy rápido, en menos de 2 segundos. –

Cuestiones relacionadas