2009-12-17 21 views
6

Estoy luchando con el siguiente problema por una semana y necesito algunos consejos.Usando Scala Actors para crear algo así como un Pipeline

def query(title: String): List[Search] // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...] 

def searchIMDB(s: Search): List[SearchResult] 
def searchTMDB(s: Search): List[SearchResult] 

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult] 

def fetchIMDB(sr: SearchResult): List[MetaInfo] 
def fetchTMDB(sr: SearchResult): List[MetaInfo] 

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]] 

Quiero construir un oleoducto como:

query("Terminator") 
-> [askIMDB, askTMDB, ...] 
-> filterRedundantSearchResults (already-searched-state per query) 
-> [fetchIMDB, fetchTMDB, ...] 
-> consolidate     (collected-meta-infos-state per query) 
    => List[ TerminatorI-List[MetaInfo], TerminatorII-List[MetaInfo], ...] 

Hasta ahora, he implementado todos los segmentos de la tubería como actor. Necesito crear instancias de actor dedicadas para cada consulta, ya que algunos de los actores como filterXXX y consolidación necesitan mantener el estado por consulta.

Las funciones como askIMDB producen resultados múltiples que quiero procesar al mismo tiempo (cada uno para un actor independiente). Así que no he encontrado ninguna forma de preconstruir todo el gráfico de los actores antes de ejecutando la consulta() ni una forma elegante de modificarlo en tiempo de ejecución.

Mi primer intento fue una cadena de actores y pasé algo así como ID de transacción en los mensajes, por lo que cada Actor tenía un Mapa [ID de transacción-> Estado] pero se sentía bastante feo. El segundo intento fue crear una especie de Oleoducto que resuma el dígrafo de actores en un solo flujo, pero hasta ahora no he podido.

Esta es mi primera publicación, lo siento si olvidé algo o la pregunta es general/pseudocódigo. Cualquier consejo muy apreciado. ¡Gracias!

Respuesta

4

Sugiero que eche un vistazo a ScalaQuery, que hace casi lo mismo. Y puede hacerlo, porque este es un problema de mónada. De hecho, algunas soluciones de Haskell como Arrows, implementadas por Scalaz library, parecen estar bastante cerca.

Esa sería la mejor solución, ya que la abstracción adecuada facilitará los cambios en el futuro.

Como truco, me ocurrirá algo como esto:

abstract class QueryModifiers 
case object Consolidate extends QueryModifiers 
// create others as appropriate 

class Query(title: String) { 
    self => 

    // Create actors 
    def createActor(qm: QueryModifiers): Actor = { 
    val actor = qm match { 
     case Consolidate => // create a consolidator actor 
     case //... as needed 
    } 
    actor.start 
    actor 
    } 

    // The pipeline 
    val pipe: List[List[QueryModifiers]] = Nil 

    // Build the pipeline 
    def ->(qms: List[QueryModifiers]) = new Query(title) { 
    override val pipe = qms :: self.pipe 
    } 
    def ->(qm: QueryModifiers) = new Query(title) { 
    override val pipe = List(qm) :: self.pipe 
    } 
    def ->(c: Consolidate.type) = { 
    // Define the full pipeline 
    // Because the way pipe is built, the last layer comes first, and the first comes last 
    val pipeline = Consolidate :: pipe 

    // Create an actor for every QueryModifier, using an unspecified createActor function 
    val actors = pipeline map (_ map (createActor(_)) 

    // We have a list of lists of actors now, where the first element of the list 
    // was the last QueryModifiers we received; so, group the layers by two, and for each 
    // pair, make the second element send the result to the first. 
    // Since each layer can contain many actors, make each member of the second 
    // layer send the results to each member of the first layer. 
    // The actors should be expecting to receive message SendResultsTo at any time. 
    for { 
     List(nextLayer, previousLayer) <- actors.iterator sliding 2 
     nextActor <- nextLayer 
     previousActor <- previousLayer 
    } previousActor ! SendResultsTo(nextActor) 

    // Send the query to the first layer 
    for (firstActor <- actors.last) firstActor ! Query(title) 

    // Get the result from the last layer, which is the consolidator 
    val results = actors.head.head !? Results 

    // Return the results 
    results 
    } 
} 

EDITAR

Puede ordenar garantizar también, con un poco de un truco. Estoy tratando de evitar Scala 2.8 aquí, aunque puede hacer esto mucho más fácil con los parámetros predeterminados y nombrados.

sealed abstract class QueryModifiers 
case class QMSearcher(/*...*/) extends QueryModifiers 
case class QMFilter(/*...*/) extends QueryModifiers 
case class QMFetcher(/*...*/) extends QueryModifiers 
case object Consolidate extends QueryModifiers 

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) { 

// Build the pipeline 
    def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match { 
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil) 
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil) 
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms) 
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected") 
    } 
    // Do similarly for qm: NextQM 

    // Consolidation 
    def ->(qm: Consolidate.type) = { 
    // Create Searchers actors 
    // Send them the Filters 
    // Send them Fetchers 
    // Create the Consolidator actor 
    // Send it to Searchers actors 
    // Send Searchers the query 
    // Ask Consolidator for answer 
    } 
} 

object Query { 
    def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil) 
} 

Ahora, los actores Buscadores mantener una lista de filtros, una lista de Fetchers, y la referencia al consolidador. Escuchan mensajes que les informan sobre estas cosas y sobre la consulta. Para cada resultado, crean un actor de filtro para cada filtro en la lista, envían a cada uno de ellos la lista de captadores y el consolidador, y luego les envían el resultado.

Los actores de filtro mantienen una lista de buscadores y una referencia al consolidador. Escuchan mensajes que les informan sobre estas cosas y sobre el resultado del buscador. Envían sus resultados, si los hay, a los actores de captura recientemente creados, que son los primeros informados del consolidador.

Los captadores mantienen una referencia a los consolidadores. Escuchan un mensaje informándoles de esa referencia y del resultado del filtro. Ellos envían su resultado, por turno, al consolidador.

El consolidador escucha dos mensajes. Un mensaje, proveniente de actores captores, les informa de los resultados que acumulan.Otro mensaje, proveniente de la consulta, solicita ese resultado, que devuelve.

Lo único que queda es idear una forma de que el consolidador sepa que se han procesado todos los resultados. Una forma sería la siguiente:

  1. En la consulta, informe al actor Consolidator de cada Buscador que se haya creado. El consolidador mantiene una lista de ellos, con un indicador que indica si están terminados o no.
  2. Cada buscador mantiene una lista de los filtros que creó y espera un mensaje de "finalización" de ellos. Cuando un buscador no tiene que procesar nada y ha recibido el mensaje "hecho" de todos los filtros, envía un mensaje al consolidador informándole que ha finalizado.
  3. Cada filtro, a su vez, mantiene una lista de captadores que ha creado, y, del mismo modo, espera mensajes "finalizados" de ellos. Cuando ha terminado de procesarse, y ha recibido "done" de todos los buscadores, informa al buscador de que lo ha hecho.
  4. It fetcher envía un mensaje "hecho" al filtro que lo ha creado cuando se completa su trabajo y se envía al consolidador.
  5. El consolidador solo escucha el mensaje que consulta el resultado después de haber recibido un "hecho" de todos los buscadores.
+0

Felicitaciones, eso es muy elegante. Sin embargo, me pregunto por qué quiere "hacer que cada miembro de la segunda capa envíe los resultados a cada miembro de la primera capa". El tamaño de cada capa depende del tamaño del resultado de la capa anterior porque la siguiente capa está destinada a procesar todo lo que proviene de la capa anterior en paralelo. La primera Capa envía cada Resultado a * un * Actor de la siguiente Capa. Así que no sé el número de QueryModifiers en el momento de Pipeline-construction. ¡Muchas gracias! Jugará con su solución – hotzen

+0

Ah, ya veo. Esto es _masivamente_ paralelo, no solo paralelo a la tubería. :-) En ese caso, en lugar de crear todas las capas, solo cree la primera capa y envíela a la tubería. Cada actor, a su vez, se elimina de la tubería, crea la siguiente capa y le envía la tubería restante. Es posible que aún desee mantener la última capa como un actor singleton creado con la primera capa, para que la función pueda consultarla. –

+0

Bien, muchas gracias. Voy a estar en vacaciones de Navidad por ahora y luego probaré esto. – hotzen