2010-06-06 16 views
6

tengo un algoritmo que pasará por un gran conjunto de datos leerá algunos archivos de texto y buscará términos específicos en esas líneas. Lo tengo implementado en Java, pero no quise publicar el código para que no parezca que estoy buscando alguien que lo implemente, pero es cierto, realmente necesito mucha ayuda. Esto no fue planeado para mi proyecto, pero el conjunto de datos resultó ser enorme, así que la maestra me dijo que tenía que hacerlo así.Necesito ayuda para implementar este algoritmo con el mapa Hadoop MapReduce

EDITAR (no me aclaro i Previos versión) El conjunto de datos que tengo es en un clúster Hadoop, y yo debería hacer que su implementación MapReduce

que estaba leyendo acerca de MapReduce y thaught que primero haga el implementación estándar y luego será más o menos fácil hacerlo con mapreduce. Pero no sucedió, ya que el algoritmo es bastante estúpido y nada especial, y el mapa se reduce ... no puedo entenderlo.

Así que aquí es poco seudo código de mi algoritmo

LIST termList (there is method that creates this list from lucene index) 
FOLDER topFolder 

INPUT topFolder 
IF it is folder and not empty 
    list files (there are 30 sub folders inside) 
    FOR EACH sub folder 
     GET file "CheckedFile.txt" 
     analyze(CheckedFile) 
    ENDFOR 
END IF 


Method ANALYZE(CheckedFile) 

read CheckedFile 
WHILE CheckedFile has next line 
    GET line 
    FOR(loops through termList) 
      GET third word from line 
      IF third word = term from list 
     append whole line to string buffer 
    ENDIF 
ENDFOR 
END WHILE 
OUTPUT string buffer to file 

Además, como se puede ver, cada vez que "analizar" se llama, nuevo archivo tiene que ser creado, entendí que MapReduce es difícil para escribir en muchas salidas ???

Entiendo mapreduce la intuición, y mi ejemplo parece perfectamente adecuado para mapreduce, pero cuando se trata de hacer esto, obviamente no sé lo suficiente y ¡estoy PEGADO!

Por favor ayuda.

Respuesta

3

Puede usar un reductor vacío y dividir su trabajo para ejecutar un solo asignador por archivo. Cada mapeador creará su propio archivo de salida en su carpeta de salida.

+0

¡Hola! Gracias por la respuesta! Pero no estoy seguro de entender:/¿Me puede dar más información? ¿Quizás tengas algunos ejemplos como ese? – Julia

2

Map Reduce se implementa fácilmente utilizando algunas agradables funciones de simultaneidad Java 6, especialmente Future, Callable y ExecutorService.

he creado un rescatable que analizará un archivo en la forma en que ha especificado

public class FileAnalyser implements Callable<String> { 

    private Scanner scanner; 
    private List<String> termList; 

    public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException { 
    this.termList = termList; 
    scanner = new Scanner(new File(filename)); 
    } 

    @Override 
    public String call() throws Exception { 
    StringBuilder buffer = new StringBuilder(); 
    while (scanner.hasNextLine()) { 
     String line = scanner.nextLine(); 
     String[] tokens = line.split(" "); 
     if ((tokens.length >= 3) && (inTermList(tokens[2]))) 
     buffer.append(line); 
    } 
    return buffer.toString(); 
    } 

    private boolean inTermList(String term) { 
    return termList.contains(term); 
    } 
} 

Necesitamos crear un nuevo exigible para cada archivo encontrado y la enviará al servicio de ejecutor. El resultado de la presentación es un futuro que podemos usar más adelante para obtener el resultado del análisis del archivo.

public class Analayser { 

    private static final int THREAD_COUNT = 10; 

    public static void main(String[] args) { 

    //All callables will be submitted to this executor service 
    //Play around with THREAD_COUNT for optimum performance 
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); 

    //Store all futures in this list so we can refer to them easily 
    List<Future<String>> futureList = new ArrayList<Future<String>>(); 

    //Some random term list, I don't know what you're using. 
    List<String> termList = new ArrayList<String>(); 
    termList.add("terma"); 
    termList.add("termb"); 

    //For each file you find, create a new FileAnalyser callable and submit 
    //this to the executor service. Add the future to the list 
    //so we can check back on the result later 
    for each filename in all files { 
     try { 
     Callable<String> worker = new FileAnalyser(filename, termList); 
     Future<String> future = executor.submit(worker); 
     futureList.add(future); 
     } 
     catch (FileNotFoundException fnfe) { 
     //If the file doesn't exist at this point we can probably ignore, 
     //but I'll leave that for you to decide. 
     System.err.println("Unable to create future for " + filename); 
     fnfe.printStackTrace(System.err); 
     } 
    } 

    //You may want to wait at this point, until all threads have finished 
    //You could maybe loop through each future until allDone() holds true 
    //for each of them. 

    //Loop over all finished futures and do something with the result 
    //from each 
    for (Future<String> current : futureList) { 
     String result = current.get(); 
     //Do something with the result from this future 
    } 
    } 
} 

Mi ejemplo aquí está lejos de ser completo y está lejos de ser eficiente. No he considerado el tamaño de la muestra, si es realmente enorme se podía mantener un bucle sobre el futureList, la eliminación de los elementos que han terminado, algo similar a:

while (futureList.size() > 0) { 
     for (Future<String> current : futureList) { 
     if (current.isDone()) { 
      String result = current.get(); 
      //Do something with result 
      futureList.remove(current); 
      break; //We have modified the list during iteration, best break out of for-loop 
     } 
     } 
} 

Como alternativa puede implementar una configuración del tipo de productor-consumidor, donde la el productor envía callables al servicio del ejecutor y produce un futuro y el consumidor toma el resultado del futuro y descarta el futuro.

Esto podría requerir que el producto y el consumidor sean hilos, y una lista sincronizada para agregar/eliminar futuros.

Cualquier pregunta, pregunte.

+0

¡Hola! ¡Muchas gracias por la solución propuesta!Lo siento, probablemente no haya especificado claramente el problema, aunque lo intenté. Mi error, acabo de mencionar a Hadoop en el título, pero mi conjunto de datos está en un clúster que ejecuta hadoop, así que debería implementarlo de acuerdo con Hadoop MaPreduce frameork ... Editaré mi publicación ahora. El conjunto de datos que estoy analizando es de 6GB :/¿Demasiado para concurrencia para sobrellevarlo ????? – Julia

+0

Vaya, soy un novato aquí: D Para canjearme un poco, ejecuté mi código en 100 archivos, ~ 61MB cada uno, ~ 6GB en total. No estoy del todo seguro de qué es lo que hace tu analizador de archivos, así que dejé fuera el detalle sangriento y simplemente escaneé cada línea y devolví una cadena vacía. Un poco artificial, lo sé. El rendimiento no fue demasiado terrible, el tamaño del grupo de subprocesos era 100, por lo que los 100 archivos se analizaron sin que el servicio del ejecutor los pusiera en cola. El tiempo total de ejecución fue de 17 minutos en mi procesador Atom. Disculpa, no pude responder tu pregunta correctamente. No tengo experiencia con Hadoop, pero después de leer la respuesta de SquareCog tiene sentido. –

+0

Hola! Muchas gracias, has ayudado mucho, porque no puedo hacer frente a hadoop MR con el cerebro y el tiempo que tengo. Tendré unos algoritmos más similares para implementar, así que tengo que probarlo de manera que sea capaz de hacerlo. No puedo obtener ayuda de hadoop en ningún lado:/ Así que adopté su código, y en mi Intel 2Ghz, con el grupo de subprocesos 42 tardaron unos 20 minutos en analizarse y generar resultados en nuevos archivos, pero solo en 200Mb de datos (42 archivos). Una vez más, tengo que hacer algunas modificaciones al analizador sintáctico, tiene que hacer una coincidencia más estricta, no un término "contiene" puro, así que cuando lo ejecuto todo, le hago saber los resultados :) – Julia

Cuestiones relacionadas