Estoy intentando obtener un comportamiento tolerante a fallas en los actores Akka. Estoy trabajando en un código que depende de que los actores del sistema estén disponibles para un largo período de procesamiento. Estoy descubriendo que mi procesamiento se detiene después de un par de horas (debería tomar alrededor de 10 horas) y no ocurre gran cosa. Creo que mis Actores no se están recuperando de las excepciones.¿Cómo configuro la tolerancia a fallas del Akka Actor?
¿Qué debo hacer para que los actores se reinicien de forma permanente uno por uno? Espero que esto se puede hacer a partir de esta documentación http://akka.io/docs/akka/1.1.3/scala/fault-tolerance
estoy trabajando con akka 1.1.3 y Scala 2,9
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._
object TestActor {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
.setCorePoolSize(100)
.setMaxPoolSize(100)
.build
}
class TestActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.dispatcher = TestActor.dispatcher
def receive = {
case num: Integer => {
if(num % 2 == 0)
throw new Exception("This is a simulated failure")
println("Actor: " + name + " Received: " + num)
//Thread.sleep(100)
}
}
override def postStop(){
println("TestActor post Stop ")
}
//callback method for restart handling
override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}
//callback method for restart handling
override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}
trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
val testActors: List[ActorRef]
val seq = new CyclicIterator[ActorRef](testActors)
}
trait TestActorManager extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
val testActors: List[ActorRef]
override def preStart = testActors foreach { self.startLink(_) }
override def postStop = { System.out.println("postStop") }
}
object FaultTest {
def main(args : Array[String]) : Unit = {
println("starting FaultTest.main()")
val numOfActors = 5
val supervisor = actorOf(
new TestActorManager with CyclicLoadBalancing {
val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
}
)
supervisor.start();
println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length)
val testActor = Actor.registry.actorsFor(classOf[TestActor]).head
(1 until 200 toList) foreach { testActor ! _ }
}
}
Este código configura 5 actores detrás de un LoadBalancer que acaba de imprimir números enteros que sean enviado a ellos excepto que arrojan Excepciones en números pares para simular fallas. Los enteros del 0 al 200 se envían a estos actores. Espero que los números impares salgan, pero todo parece cerrarse después de un par de fallas en los números pares. La ejecución de este código con resultados SBT en esta salida:
[info] Running FaultTest
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info]
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM
Lo que creo que está pasando aquí es que comienzan 5 actores, y los primeros 5 números pares ponerlos fuera del negocio y no se están renovadas.
¿Cómo se puede cambiar este código para que los actores se recuperen de las excepciones?
Espero que esto realmente imprima todos los números impares del 1 al 200. Creo que cada actor fallaría en los números pares pero se reiniciaría con un buzón intacto en las excepciones. Espero ver la impresión de preRestart y postRestart. ¿Qué se debe configurar en este ejemplo de código para que esto suceda?
Aquí hay algunas suposiciones adicionales sobre akka y Actores que pueden llevar a mi malentendido. Supongo que un Actor se puede configurar con un Supervisor o un Manejador de Fallas para que se reinicie y continúe disponible cuando se produce una excepción durante la recepción. Supongo que el mensaje que se envió al actor se perderá si arroja una excepción durante la recepción. Supongo que se invocarán preRestart() y postRestart() en el actor que lanza la excepción.
El ejemplo de código representa lo que estoy tratando de hacer y se basa en Why is my Dispatching on Actors scaled down in Akka?
** ** Otro ejemplo de código
Aquí hay otro ejemplo de código que es más sencillo. Estoy comenzando un actor que arroja excepciones sobre números pares. No hay equilibrador de carga u otras cosas en el camino. Estoy intentando imprimir información sobre el actor. Estoy esperando salir del programa por un minuto después de que los mensajes hayan sido enviados al Actor y supervisar lo que está sucediendo.
Espero que esto imprima los números impares pero parece que el Actor se encuentra con mensajes en su buzón.
¿Tengo el OneForOneStrategy configurado incorrecto? ¿Debo vincular al Actor con algo? ¿Este tipo de configuración está fundamentalmente mal dirigida por mi parte? ¿Se debe configurar un Dispatcher con tolerancia a errores de algún modo? ¿Podría estar estropeando los hilos en el Dispatcher?
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._
class SingleActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
def receive = {
case num: Integer => {
if(num % 2 == 0)
throw new Exception("This is a simulated failure, where does this get logged?")
println("Actor: " + name + " Received: " + num)
}
}
override def postStop(){
println("TestActor post Stop ")
}
override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}
override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}
object TestSingleActor{
def main(args : Array[String]) : Unit = {
println("starting TestSingleActor.main()")
val testActor = Actor.actorOf(new SingleActor(1)).start()
println("number of actors: " + registry.actors.size)
printAllActorsInfo
(1 until 20 toList) foreach { testActor ! _ }
for(i <- 1 until 120){
Thread.sleep(500)
printAllActorsInfo
}
}
def printAllActorsInfo() ={
registry.actors.foreach((a) =>
println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
.format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
}
}
Estoy recibiendo una salida como:
[info] Running TestSingleActor
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false
... 117 more of these lines repeted ...
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false
[info] == run ==
[success] Successful.
[info]
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM
Si añado un Trhead.sleep (100000) al final del main() me sale: '[info] Ejecución de FaultTest partir FaultTest.main() Cargando config [akka.conf] de la ruta de clases de la aplicación. número de actores: 5 Actor: 0 Recibido: 1 Actor: 4 Recibido: 3 Actor: 1 Recibido: 7 Actor: 1 Recibido: 9' y las pausas de salida, pero los números adicionales no se imprimen. No esperé a que la aplicación saliera, pero después de 30-40 segundos no había nada. Además, si elimino la falla, los números se imprimen muy rápido, en menos de 2 segundos. –