2011-10-13 20 views
10

La pregunta es simple y me sorprende que no apareciera inmediatamente cuando la busqué.¿Cómo procesar las filas de un archivo CSV utilizando Groovy/GPars de manera más eficiente?

Tengo un archivo CSV, uno potencialmente muy grande, que debe procesarse. Cada línea debe entregarse a un procesador hasta que se procesen todas las filas. Para leer el archivo CSV, usaré OpenCSV, que básicamente proporciona un método readNext() que me da la siguiente fila. Si no hay más filas disponibles, todos los procesadores deben terminar.

Para esto creé un script groovy realmente simple, definí un método síncrono readNext() (ya que la lectura de la siguiente línea no consume mucho tiempo) y luego creé algunos hilos que leen la siguiente línea y la procesan. Funciona bien, pero ...

¿No debería haber una solución integrada que pudiera usar? No es el procesamiento de la colección gpars, porque eso siempre supone que hay una colección existente en la memoria. En cambio, no puedo permitirme leer todo en la memoria y luego procesarlo, me llevaría a excepciones fuera de la memoria.

Entonces ... ¿alguien que tenga una bonita plantilla para procesar un archivo CSV "línea por línea" usando un par de hilos de trabajo?

Respuesta

6

El acceso concurrente a un archivo puede no ser una buena idea y el procesamiento de unificación/unión de GPars solo está destinado a los datos (colecciones) en memoria. Mi sugerencia sería leer el archivo secuencialmente en una lista. Cuando la lista alcance un cierto tamaño, procese las entradas en la lista al mismo tiempo que usa GPars, borre la lista y luego continúe con las líneas de lectura.

2

Acabo de terminar una implementación de un problema como este en Grails (no se especifica si está utilizando grails, hibernación simple, JDBC simple o algo más).

No hay nada fuera de la caja que pueda obtener que yo sepa. Podrías mirar a la integración con Spring Batch, pero la última vez que lo miré, se sintió muy pesado para mí (y no muy groovy).

Si está utilizando JDBC normal, hacer lo que Christoph recomienda probablemente sea lo más fácil de hacer (leer en N filas y usar GPars para recorrer esas filas al mismo tiempo).

Si está utilizando grial o hibernación y desea que sus hilos de trabajo tengan acceso al contexto de primavera para la inyección de dependencia, las cosas se complican un poco.

La forma en que lo resolvió está usando el plugin de Grails Redis (exención de responsabilidad: Soy el autor) y la Jesque plugin, que es una aplicación java de Resque.

El complemento Jesque le permite crear clases de "Trabajo" que tienen un método de "proceso" con parámetros arbitrarios que se utilizan para procesar el trabajo en cola en una cola de Jesque. Puede girar tantos trabajadores como desee.

Tengo una carga de archivo en la que un usuario administrador puede publicar un archivo, guarda el archivo en disco y encola un trabajo para ProducerJob que he creado. Ese ProducerJob gira a través del archivo, para cada línea, encola un mensaje para que ConsumerJob recoja. El mensaje es simplemente un mapa de los valores leídos del archivo CSV.

ConsumerJob toma esos valores y crea el objeto de dominio apropiado para su línea y lo guarda en la base de datos.

Ya estábamos usando Redis en producción, por lo que utilizar esto como un mecanismo de puesta en cola tenía sentido.Teníamos una carga síncrona antigua que se ejecutaba en cargas de archivos en serie. Actualmente estoy usando un trabajador productor y 4 trabajadores de consumo y cargar cosas de esta manera es más de 100 veces más rápido que la carga anterior (con mucho mejor retroalimentación de progreso para el usuario final).

Estoy de acuerdo con la pregunta original de que probablemente haya espacio para que algo así se empaquete ya que es algo relativamente común.

ACTUALIZACIÓN: Puse a blog post with a simple example doing imports with Redis + Jesque.

5

Esto podría ser un buen problema para los actores. Un actor lector sincrónico podría transferir líneas CSV a actores de procesador en paralelo. Por ejemplo:

@Grab(group='org.codehaus.gpars', module='gpars', version='0.12') 

import groovyx.gpars.actor.DefaultActor 
import groovyx.gpars.actor.Actor 

class CsvReader extends DefaultActor { 
    void act() { 
     loop { 
      react { 
       reply readCsv() 
      } 
     } 
    } 
} 

class CsvProcessor extends DefaultActor { 
    Actor reader 
    void act() { 
     loop { 
      reader.send(null) 
      react { 
       if (it == null) 
        terminate() 
       else 
        processCsv(it) 
      } 
     } 
    } 
} 

def N_PROCESSORS = 10 
def reader = new CsvReader().start() 
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join() 
+0

¿Está asumiendo en este ejemplo que la llamada readCsv() devuelve una sola línea del archivo CSV? Solo quiero asegurarme de estar leyendo esto bien. – Scott

+0

Sí, 'readCsv()' leerá cada línea en secuencia. Cuando se llega al final del archivo, devolverá nulo, lo que le permite a los procesadores saber que se ha llegado al final y deben 'terminar()'. – ataylor

Cuestiones relacionadas