2010-09-19 18 views
5

Soy un principiante de Haskell y pensé que sería un buen ejercicio. Tengo una asignación donde necesito leer el archivo en un hilo A, manejar las líneas del archivo de en hilos b_i, y entonces la salida de los resultados en hilo C.Limitar el uso de memoria al leer archivos

he implementado este momento ya, pero uno de los requisitos es que no podemos confiar en que todo el archivo se ajuste a la memoria. Esperaba que el perezoso IO y el recolector de basura hicieran esto por mí, pero lamentablemente el uso de la memoria sigue subiendo y subiendo.

El hilo del lector (A) lee el archivo con readFile que luego se comprime con los números de línea y se envuelve en Just. Estas líneas comprimidas se escriben a continuación a Control.Concurrent.Chan. Cada hilo de consumidor B tiene su propio canal.

Cada consumidor lee su propio canal cuando tiene datos y si la expresión regular coincide, se envía a su propio canal de salida respectivo envuelto dentro de Maybe (hecho de listas).

La impresora verifica el canal de salida de cada uno de los hilos B. Si ninguno de los resultados (línea) es Nada, la línea se imprime. Dado que en este punto no debería haber ninguna referencia a las líneas anteriores, pensé que el colector de basura podría liberar estas líneas, pero desafortunadamente me parece estar en el error aquí.

El archivo está en .lhs aquí: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

Entonces la pregunta es, ¿cómo puedo limitar el uso de memoria, o permitir que el recolector de basura para eliminar las líneas.

Fragmentos según lo solicitado. Con suerte sangría no está demasiado mal destruidos :)

data Global = Global {done :: MVar Bool, consumers :: Consumers} 
type Done = Bool 
type Linenum = Int 
type Line = (Linenum, Maybe String) 
type Output = MVar [Line] 
type Input = Chan Line 
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output))) 
type State a = ReaderT Global IO a 


producer :: [Input] -> FilePath -> State() 
producer c p = do 
    liftIO $ Main.log "Starting producer" 
    d <- asks done 
    f <- liftIO $ readFile p 
    mapM_ (\l -> mapM_ 
    (liftIO . flip writeChan l) c) 
    $ zip [1..] $ map Just $ lines f 
    liftIO $ modifyMVar_ d (return . not) 

printer :: State() 
printer = do 
    liftIO $ Main.log "Starting printer" 
    c <- (fmap (map (snd . snd) . M.elems) 
    (asks consumers >>= liftIO . readMVar)) 
    uniq' c 
    where head' :: Output -> IO Line 
    head' ch = fmap head (readMVar ch) 

    tail' = mapM_ (liftIO . flip modifyMVar_ 
     (return . tail)) 

    cont ch = tail' ch >> uniq' ch 

    printMsg ch = readMVar (head ch) >>= 
     liftIO . putStrLn . fromJust . snd . head 

    cempty :: [Output] -> IO Bool 
    cempty ch = fmap (any id) 
     (mapM (fmap ((==) 0 . length) . readMVar) ch) 

    {- Return false unless none are Nothing -} 
    uniq :: [Output] -> IO Bool 
    uniq ch = fmap (any id . map (isNothing . snd)) 
     (mapM (liftIO . head') ch) 

    uniq' :: [Output] -> State() 
    uniq' ch = do 
     d <- consumersDone 
     e <- liftIO $ cempty ch 
     if not e 
     then do 
      u <- liftIO $ uniq ch 
      if u then cont ch else do 
     liftIO $ printMsg ch 
     cont ch 
      else unless d $ uniq' ch 

Respuesta

6

Programación concurrente no ofrece ninguna orden de ejecución definido a menos que refuerce uno mismo con MVARs y similares. Por lo tanto, es probable que el hilo del productor adhiera todas/la mayoría de las líneas en el chan antes de que cualquier consumidor las lea y las transmita. Otra arquitectura que debe ajustarse a los requisitos es simplemente hacer que el hilo A invoque el archivo de lectura diferido y pegue el resultado en un mvar. Luego, cada subproceso de consumidor toma el mvar, lee una línea y luego reemplaza el mvar antes de proceder a manejar la línea. Incluso entonces, si el hilo de salida no puede seguir el ritmo, entonces el número de líneas coincidentes almacenadas en el chan allí puede acumularse arbitrariamente.

Lo que tienes es una arquitectura push. Para realmente hacer que funcione en un espacio constante, piense en términos de demanda impulsada. Encuentre un mecanismo tal que el hilo de salida envíe señales a los hilos de procesamiento de que deben hacer algo, y de tal manera que los hilos de procesamiento le señalen al lector que deben hacer algo.

Otra forma de hacer esto es tener chans de tamaño limitado, por lo que el lector enhebra los bloques cuando los hilos del procesador no se han alcanzado, por lo que las hebras del procesador se bloquean cuando el hilo de salida no se ha alcanzado.

En general, el problema de hecho me recuerda a la referencia de Broadfinder de Tim Bray, aunque los requisitos son algo diferentes. En cualquier caso, condujo a una amplia discusión sobre la mejor manera de implementar grep multinúcleo. El gran problema fue que el problema está vinculado a IO, y usted quiere múltiples hilos de lectura sobre archivos mmapped.

Consulte aquí para obtener más de lo que nunca quiere saber: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

+4

BoundedChan está en hackage precisamente para este tipo de uso. –

+0

Gracias Tom y sciv. Intentaré implementarlo y marcaré como respuesta si funciona – Masse

Cuestiones relacionadas