2009-09-30 11 views
8
 
(fileNameToCharStream "bigfile" 
|>> fuse [length; 
      splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length; 
      splitBy (fun x -> x = '\n') keepEmpty |>> length; 
     ]) 
    (*fuse "fuses" the three functions to run concurrently*) 
|> run 2 (*forces to run in parallel on two threads*) 
|> (fun [num_chars; num_words; num_lines] -> 
     printfn "%d %d %d" 
      num_chars num_words, num_lines)) 

Quiero hacer que este código funcione de la siguiente manera: divida la transmisión original en dos exactamente en el medio; luego para cada mitad ejecute un cómputo separado que computa 3 cosas: la longitud (es decir, número de caracteres), el número de palabras, el número de líneas. Sin embargo, no quiero tener un problema si I erróneamente dividir en una palabra. Esto tiene que ser atendido. El archivo debe leerse solo una vez.Segmentación paralela

¿Cómo debo programar las funciones especificadas y el operador | >>? ¿Es posible?

+0

Puede ser que los EE.UU. aún no ha despertado, pero a la espera de que, es posible que desee para buscar la palabra clave 'asíncrono' para obtener una mejor idea de lo que es posible. – Benjol

+0

¿Qué firmas imaginas que fusionar, ejecutar y | >> tendrían? Por ejemplo, ¿dónde se convierte su lista de tres elementos en una 3-tupla? – Gabriel

+0

Correcto, me refiero a: |> (fun [num_chars; num_words; num_lines] -> –

Respuesta

8

Parece que estás pidiendo un poco. Dejaré que ustedes decidan la manipulación de la cadena, pero les mostraré cómo definir un operador que ejecuta una serie de operaciones en paralelo.

Paso 1: Escribir una función fuse

Su función fusible parece trazar una única entrada con múltiples funciones, que es bastante fácil de escribir de la siguiente manera:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list 
let fuse functionList input = [ for f in functionList -> f input] 

Tenga en cuenta que todos sus funciones de mapeo necesitan tener el mismo tipo.

Paso 2: Definir el operador para ejecutar funciones en paralelo

La función de mapa paralelo estándar se puede escribir de la siguiente manera:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array 
let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

Que yo sepa, Async.Parallel ejecutará las operaciones asíncronas en paralelo, donde el número de tareas paralelas que se ejecutan en un momento dado es igual al número de núcleos en una máquina (alguien puede corregirme si estoy equivocado). Entonces, en una máquina de doble núcleo, deberíamos tener como máximo 2 hilos ejecutándose en mi máquina cuando se llama a esta función. Esto es algo bueno, ya que no esperamos ninguna aceleración ejecutando más de un hilo por núcleo (de hecho, el cambio de contexto adicional puede ralentizar las cosas).

Podemos definir un operador |>> en términos de pmap y fuse:

//val (|>>) : seq<'a> -> seq<('a -> 'b)> -> 'b list array 
let (|>>) input functionList = pmap (fuse functionList) input 

Así el operador |>> lleva un montón de entradas y mapas usando un montón de diferentes salidas. Hasta ahora, si ponemos todo esto junto, obtenemos el siguiente (en FSI):

> let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];; 

val countOccurrences : 'a -> seq<'a> -> int 
val length : string -> int 
val testData : string [] = 
    [|"Juliet is awesome"; "Someone should give her a medal"|] 
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|] 

testOutput contiene dos elementos, los cuales se calculan en paralelo.

Paso 3: Agregado elementos en una única salida

bien, así que ahora tenemos resultados parciales representadas por cada elemento en nuestra matriz, y queremos fusionar nuestros resultados parciales en un único agregado. Supongo que cada elemento de la matriz debe fusionarse con la misma función, ya que cada elemento en la entrada tiene el mismo tipo de datos.

Aquí es una función muy feo que escribí para el trabajo:

> let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);; 

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list 

> reduceMany (+) testOutput;; 
val it : int list = [48; 1; 4] 

reduceMany toma sucesión de secuencias de longitud n, y devuelve una matriz n-longitud como una salida. Si usted puede pensar en una mejor manera de escribir esta función, adelante :)

para decodificar la salida anterior:

  • 48 = suma de las longitudes de mis dos cadenas de entrada. Tenga en cuenta que la cadena original tenía 49 caracteres, pero la dividía en "|" se comió un char por "|".
  • 1 = suma de todas las instancias de 'J' en mi entrada
  • 4 = suma de todas las instancias de 'O'.

Paso 4: Poner todo junto

let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

let fuse functionList input = [ for f in functionList -> f input] 

let (|>>) input functionList = pmap (fuse functionList) input 

let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]) 

let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'] 
    |> reduceMany (+)