Sugiero que eche un vistazo a ScalaQuery, que hace casi lo mismo. Y puede hacerlo, porque este es un problema de mónada. De hecho, algunas soluciones de Haskell como Arrows, implementadas por Scalaz library, parecen estar bastante cerca.
Esa sería la mejor solución, ya que la abstracción adecuada facilitará los cambios en el futuro.
Como truco, me ocurrirá algo como esto:
abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate
class Query(title: String) {
self =>
// Create actors
def createActor(qm: QueryModifiers): Actor = {
val actor = qm match {
case Consolidate => // create a consolidator actor
case //... as needed
}
actor.start
actor
}
// The pipeline
val pipe: List[List[QueryModifiers]] = Nil
// Build the pipeline
def ->(qms: List[QueryModifiers]) = new Query(title) {
override val pipe = qms :: self.pipe
}
def ->(qm: QueryModifiers) = new Query(title) {
override val pipe = List(qm) :: self.pipe
}
def ->(c: Consolidate.type) = {
// Define the full pipeline
// Because the way pipe is built, the last layer comes first, and the first comes last
val pipeline = Consolidate :: pipe
// Create an actor for every QueryModifier, using an unspecified createActor function
val actors = pipeline map (_ map (createActor(_))
// We have a list of lists of actors now, where the first element of the list
// was the last QueryModifiers we received; so, group the layers by two, and for each
// pair, make the second element send the result to the first.
// Since each layer can contain many actors, make each member of the second
// layer send the results to each member of the first layer.
// The actors should be expecting to receive message SendResultsTo at any time.
for {
List(nextLayer, previousLayer) <- actors.iterator sliding 2
nextActor <- nextLayer
previousActor <- previousLayer
} previousActor ! SendResultsTo(nextActor)
// Send the query to the first layer
for (firstActor <- actors.last) firstActor ! Query(title)
// Get the result from the last layer, which is the consolidator
val results = actors.head.head !? Results
// Return the results
results
}
}
EDITAR
Puede ordenar garantizar también, con un poco de un truco. Estoy tratando de evitar Scala 2.8 aquí, aunque puede hacer esto mucho más fácil con los parámetros predeterminados y nombrados.
sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers
class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {
// Build the pipeline
def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
}
// Do similarly for qm: NextQM
// Consolidation
def ->(qm: Consolidate.type) = {
// Create Searchers actors
// Send them the Filters
// Send them Fetchers
// Create the Consolidator actor
// Send it to Searchers actors
// Send Searchers the query
// Ask Consolidator for answer
}
}
object Query {
def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}
Ahora, los actores Buscadores mantener una lista de filtros, una lista de Fetchers, y la referencia al consolidador. Escuchan mensajes que les informan sobre estas cosas y sobre la consulta. Para cada resultado, crean un actor de filtro para cada filtro en la lista, envían a cada uno de ellos la lista de captadores y el consolidador, y luego les envían el resultado.
Los actores de filtro mantienen una lista de buscadores y una referencia al consolidador. Escuchan mensajes que les informan sobre estas cosas y sobre el resultado del buscador. Envían sus resultados, si los hay, a los actores de captura recientemente creados, que son los primeros informados del consolidador.
Los captadores mantienen una referencia a los consolidadores. Escuchan un mensaje informándoles de esa referencia y del resultado del filtro. Ellos envían su resultado, por turno, al consolidador.
El consolidador escucha dos mensajes. Un mensaje, proveniente de actores captores, les informa de los resultados que acumulan.Otro mensaje, proveniente de la consulta, solicita ese resultado, que devuelve.
Lo único que queda es idear una forma de que el consolidador sepa que se han procesado todos los resultados. Una forma sería la siguiente:
- En la consulta, informe al actor Consolidator de cada Buscador que se haya creado. El consolidador mantiene una lista de ellos, con un indicador que indica si están terminados o no.
- Cada buscador mantiene una lista de los filtros que creó y espera un mensaje de "finalización" de ellos. Cuando un buscador no tiene que procesar nada y ha recibido el mensaje "hecho" de todos los filtros, envía un mensaje al consolidador informándole que ha finalizado.
- Cada filtro, a su vez, mantiene una lista de captadores que ha creado, y, del mismo modo, espera mensajes "finalizados" de ellos. Cuando ha terminado de procesarse, y ha recibido "done" de todos los buscadores, informa al buscador de que lo ha hecho.
- It fetcher envía un mensaje "hecho" al filtro que lo ha creado cuando se completa su trabajo y se envía al consolidador.
- El consolidador solo escucha el mensaje que consulta el resultado después de haber recibido un "hecho" de todos los buscadores.
Felicitaciones, eso es muy elegante. Sin embargo, me pregunto por qué quiere "hacer que cada miembro de la segunda capa envíe los resultados a cada miembro de la primera capa". El tamaño de cada capa depende del tamaño del resultado de la capa anterior porque la siguiente capa está destinada a procesar todo lo que proviene de la capa anterior en paralelo. La primera Capa envía cada Resultado a * un * Actor de la siguiente Capa. Así que no sé el número de QueryModifiers en el momento de Pipeline-construction. ¡Muchas gracias! Jugará con su solución – hotzen
Ah, ya veo. Esto es _masivamente_ paralelo, no solo paralelo a la tubería. :-) En ese caso, en lugar de crear todas las capas, solo cree la primera capa y envíela a la tubería. Cada actor, a su vez, se elimina de la tubería, crea la siguiente capa y le envía la tubería restante. Es posible que aún desee mantener la última capa como un actor singleton creado con la primera capa, para que la función pueda consultarla. –
Bien, muchas gracias. Voy a estar en vacaciones de Navidad por ahora y luego probaré esto. – hotzen