2012-05-16 18 views
6

Todavía estoy en el proceso de comprender cómo funciona la concurrencia en Java. Entiendo que (si está suscrito al modelo de simultaneidad OO Java 5) implementa un Task o Callable con un método run() o call() (respectivamente), y le corresponde paralelizar tanto como sea posible ese método implementado.Multithreading un archivo masivo lea

Pero todavía no estoy entendiendo algo inherente acerca de la programación concurrente en Java:

  • ¿Cómo es un Task 's run() método asigna la cantidad correcta de trabajo correspondiente a realizar?

Como ejemplo concreto, lo que si tengo una readMobyDick() método de E/S de ruedas que lee todo el contenido de Moby Dick de Herman Melville en la memoria de un archivo en el sistema local. Y digamos que yo quiero este método readMobyDick() a ser concurrente y manejado por 3 hilos, donde:

  • Tema # 1 lee la primera 1/3o del libro en la memoria
  • Tema # 2 lee el segundo 1/3ª del libro en la memoria
  • Tema # 3 lee el último 1/3o del libro en la memoria

¿Es necesario que trozo de Moby Dick en tres archivos y pasarlos cada uno con su propia tarea , o hago II simplemente llame al readMobyDick() desde adentro del imp método run() lement y (de alguna manera) el Executor sabe cómo romper el trabajo entre los hilos.

Soy un alumno muy visual, por lo que cualquier ejemplo de código de la forma correcta de abordar esto es muy apreciada! ¡Gracias!

+0

gran nombre de método! Cuando traté de leer Moby Dick, descubrí que tenía que hacerlo al mismo tiempo (intercalado con otros libros) ;-) –

+0

Entonces ... la gran ballena blanca sería .NET, ¿sí? –

Respuesta

14

¡Probablemente haya elegido por accidente el peor ejemplo absoluto de actividades paralelas!

Leer en paralelo desde un solo disco mecánico es realmente más lento que leer con un solo hilo, porque de hecho está rebotando el cabezal mecánico en diferentes secciones del disco a medida que cada hilo ejecuta su turno. Esto es mejor dejarlo como una sola actividad con subprocesos.

Tomemos otro ejemplo, que es similar al suyo pero que puede ofrecer algún beneficio: supongamos que quiero buscar las ocurrencias de una palabra determinada en una lista enorme de palabras (esta lista podría provenir incluso de un archivo de disco , pero como he dicho, leído por un solo hilo). Supongamos que puedo usar 3 hilos como en su ejemplo, cada uno buscando en 1/3 de la gran lista de palabras y manteniendo un contador local de cuántas veces apareció la palabra buscada.

En este caso, querrá dividir la lista en 3 partes, pasar cada parte a un objeto diferente cuyo tipo implemente Runnable y que la búsqueda se implemente en el método run.

El tiempo de ejecución en sí no tiene idea de cómo hacer las particiones ni nada de eso, tiene que especificarlo usted mismo. Existen muchas otras estrategias de partición, cada una con sus fortalezas y debilidades propias, pero por el momento podemos mantener la partición estática.

Vamos a ver algo de código:

class SearchTask implements Runnable { 
    private int localCounter = 0; 
    private int start; // start index of search 
    private int end; 
    private List<String> words; 
    private String token; 

    public SearchTask(int start, int end, List<String> words, String token) { 
     this.start = start; 
     this.end = end; 
     this.words = words; 
     this.token = token; 
    } 

    public void run() { 
     for(int i = start; i < end; i++) { 
       if(words.get(i).equals(token)) localCounter++; 
     } 
    } 

    public int getCounter() { return localCounter; } 
} 

// meanwhile in main :) 

List<String> words = new ArrayList<String>(); 
// populate words 
// let's assume you have 30000 words 

// create tasks 
SearchTask task1 = new SearchTask(0, 10000, words, "John"); 
SearchTask task2 = new SearchTask(10000, 20000, words, "John"); 
SearchTask task3 = new SearchTask(20000, 30000, words, "John"); 

// create threads 
Thread t1 = new Thread(task1); 
Thread t2 = new Thread(task1); 
Thread t3 = new Thread(task1); 

// start threads 
t1.start(); 
t2.start(); 
t3.start(); 

// wait for threads to finish 
t1.join(); 
t2.join(); 
t3.join(); 

// collect results 
int counter = 0; 
counter += task1.getCounter(); 
counter += task2.getCounter(); 
counter += task3.getCounter(); 

Esto debería funcionar bien. Tenga en cuenta que, en casos prácticos, crearía un esquema de partición más genérico. Alternativamente, puede utilizar ExecutorService e implementar Callable en lugar de Runnable si desea devolver un resultado.

Así un ejemplo alternativo utilizando construcciones más avanzadas:

class SearchTask implements Callable<Integer> { 
    private int localCounter = 0; 
    private int start; // start index of search 
    private int end; 
    private List<String> words; 
    private String token; 

    public SearchTask(int start, int end, List<String> words, String token) { 
     this.start = start; 
     this.end = end; 
     this.words = words; 
     this.token = token; 
    } 

    public Integer call() { 
     for(int i = start; i < end; i++) { 
       if(words.get(i).equals(token)) localCounter++; 
     } 
     return localCounter; 
    }   
} 

// meanwhile in main :) 

List<String> words = new ArrayList<String>(); 
// populate words 
// let's assume you have 30000 words 

// create tasks 
List<Callable> tasks = new ArrayList<Callable>(); 
tasks.add(new SearchTask(0, 10000, words, "John")); 
tasks.add(new SearchTask(10000, 20000, words, "John")); 
tasks.add(new SearchTask(20000, 30000, words, "John")); 

// create thread pool and start tasks 
ExecutorService exec = Executors.newFixedThreadPool(3); 
List<Future> results = exec.invokeAll(tasks); 

// wait for tasks to finish and collect results 
int counter = 0; 
for(Future f: results) { 
    counter += f.get(); 
} 
+0

Entonces, ¿cuál sería un buen ejemplo de una tarea que se beneficiaría de multihilo? Realmente no me importa nada leer archivos del disco; me importa ver un ejemplo de cómo vivir y respirar (** código **) sobre cómo se fragmenta el trabajo y se lo alimenta a las tareas. – IAmYourFaja

+0

@herpylderp: publiqué una edición. El código viene pronto :) – Tudor

+0

Un buen ejemplo sería una cola atendida por múltiples hilos –

1

usted escogió un mal ejemplo, como Tudor fue tan amable de señalar. El hardware del disco giratorio está sujeto a las limitaciones físicas de mover los platos y las cabezas, y la implementación de lectura más eficiente es leer cada bloque en orden, lo que reduce la necesidad de mover el cabezal o esperar a que el disco se alinee.

Dicho esto, algunos sistemas operativos no siempre almacenan cosas continuamente en los discos, y para aquellos que lo recuerdan, la desfragmentación podría proporcionar un aumento del rendimiento del disco si su sistema de archivos/OS no le hiciera el trabajo.

Como mencionó querer un programa que se beneficiaría, permítame sugerirle una simple, suma de matriz.

Suponiendo que ha creado un hilo por núcleo, puede dividir trivialmente dos matrices que se agregarán en N filas (una para cada hilo). Además de la matriz (si recuerdan) funciona como tal:

A + B = C 

o

[ a11, a12, a13 ] [ b11, b12, b13] = [ (a11+b11), (a12+b12), (a13+c13) ] 
[ a21, a22, a23 ] + [ b21, b22, b23] = [ (a21+b21), (a22+b22), (a23+c23) ] 
[ a31, a32, a33 ] [ b31, b32, b33] = [ (a31+b31), (a32+b32), (a33+c33) ] 

Así que para distribuir este a través de N hilos, simplemente hay que tomar el número de filas y dividir el módulo por el número de hilos para obtener el "id. de subproceso" se agregará.

matrix with 20 rows across 3 threads 
row % 3 == 0 (for rows 0, 3, 6, 9, 12, 15, and 18) 
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19) 
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17) 
// row 20 doesn't exist, because we number rows from 0 

Ahora cada hilo "sabe" qué filas se debe manipular, y los resultados "por fila" se puede calcular trivial, porque los resultados no se cruzan en el dominio de otro hilo de la computación.

Todo lo que se necesita ahora es una estructura de datos de "resultados" que rastrea cuándo se han calculado los valores, y cuándo se establece el último valor, luego se completa el cálculo. En este ejemplo "falso" de un resultado de suma de matriz con dos subprocesos, calcular la respuesta con dos subprocesos toma aproximadamente la mitad del tiempo.

// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only. Real Threads are scheduled across cores due to 
// availability and attempts to prevent unnecessary core migration of a running thread. 
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3) 
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1) 
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3) 
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1) 

Los problemas más complejos se pueden resolver con subprocesamiento múltiple, y se resuelven diferentes problemas con diferentes técnicas. Escogí a propósito uno de los ejemplos más simples.

1

se implementa una tarea o invocable con un método run() o llame al() (respectivamente), y usted le corresponde poner en paralelo tanto de ese método implementado como sea posible.

A Task representa una unidad discreta de trabajo
Carga de un archivo en la memoria es una unidad discreta de trabajo y puede, por tanto, esta actividad se puede delegar en un subproceso de fondo. Es decir. un hilo de fondo ejecuta esta tarea de cargar el archivo.
Es una unidad de trabajo discreta ya que no tiene otras dependencias necesarias para hacer su trabajo (cargar el archivo) y tiene límites discretos.
Lo que está pidiendo es dividir esto en tareas. Es decir. un hilo carga 1/3 del archivo mientras que otro hilo el 2/3 etc.
Si fue capaz de dividir la tarea en subtareas adicionales, entonces no sería una tarea en primer lugar por definición. Entonces, cargar un archivo es una sola tarea en sí misma.

Para darle un ejemplo:
Digamos que tiene una GUI y necesita presentar los datos del usuario de 5 archivos diferentes. Para presentarlos, también debe preparar algunas estructuras de datos para procesar los datos reales.
Todas estas son tareas separadas.
P. ej. la carga de archivos es de 5 tareas diferentes, por lo que se puede hacer con 5 hilos diferentes.
La preparación de las estructuras de datos podría hacerse con un hilo diferente.
La GUI se ejecuta, por supuesto, en otro hilo.
Todo esto puede suceder de alto rendimiento al mismo tiempo

-1

Si el sistema E/S admitidas, aquí es cómo puede hacerlo:

How to read a file using multiple threads in Java when a high throughput(3GB/s) file system is available

Aquí está la solución para leer un archivo único con múltiples hilos.

Divida el archivo en N trozos, lea cada trozo de una secuencia y luego combínelos en orden. Tenga cuidado con las líneas que cruzan los límites de los pedazos. Es la idea básica según lo sugerido por el usuario slaks

cotas de referencia por debajo de implementación de-hilos múltiples para un solo archivo de 20 GB:

1 Tema: 50 segundos: 400 MB/s

2 Temas: 30 segundos: 666 MB/s

4 Temas: 20 segundos: 1 GB/s

8 Temas: 60 SECON ds: 333 MB/s

readAllLines Java7 equivalentes(): 400 segundos: 50 MB/s

Nota: Esto sólo puede funcionar en sistemas que están diseñados para soportar de alto rendimiento de E/S, y no en ordenadores personales habituales

Aquí están las liendres esenciales del código, para los detalles completos, siga el enlace

public class FileRead implements Runnable 
{ 

private FileChannel _channel; 
private long _startLocation; 
private int _size; 
int _sequence_number; 

public FileRead(long loc, int size, FileChannel chnl, int sequence) 
{ 
    _startLocation = loc; 
    _size = size; 
    _channel = chnl; 
    _sequence_number = sequence; 
} 

@Override 
public void run() 
{ 
     System.out.println("Reading the channel: " + _startLocation + ":" + _size); 

     //allocate memory 
     ByteBuffer buff = ByteBuffer.allocate(_size); 

     //Read file chunk to RAM 
     _channel.read(buff, _startLocation); 

     //chunk to String 
     String string_chunk = new String(buff.array(), Charset.forName("UTF-8")); 

     System.out.println("Done Reading the channel: " + _startLocation + ":" + _size); 

} 

//args[0] is path to read file 
//args[1] is the size of thread pool; Need to try different values to fing sweet spot 
public static void main(String[] args) throws Exception 
{ 
    FileInputStream fileInputStream = new FileInputStream(args[0]); 
    FileChannel channel = fileInputStream.getChannel(); 
    long remaining_size = channel.size(); //get the total number of bytes in the file 
    long chunk_size = remaining_size/Integer.parseInt(args[1]); //file_size/threads 


    //thread pool 
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1])); 

    long start_loc = 0;//file pointer 
    int i = 0; //loop counter 
    while (remaining_size >= chunk_size) 
    { 
     //launches a new thread 
     executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i)); 
     remaining_size = remaining_size - chunk_size; 
     start_loc = start_loc + chunk_size; 
     i++; 
    } 

    //load the last remaining piece 
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i)); 

    //Tear Down 

} 

}