2012-01-05 13 views
15

Tengo un archivo de texto simple con posiblemente millones de líneas que necesita un análisis personalizado y quiero cargarlo en una tabla HBase lo más rápido posible (usando el cliente Hadoop o HBase Java).¿Cuál es la forma más rápida de cargar datos de manera masiva en HBase mediante programación?

Mi solución actual se basa en un trabajo MapReduce sin la pieza Reducir. Yo uso FileInputFormat para leer el archivo de texto para que cada línea se pase al método map de mi clase Mapper. En este punto, la línea se analiza para formar un objeto Put que se escribe en el context. Luego, TableOutputFormat toma el objeto Put y lo inserta en la tabla.

Esta solución produce una tasa de inserción promedio de 1,000 filas por segundo, que es menor de lo que esperaba. Mi configuración HBase está en modo pseudo distribuido en un único servidor.

Una cosa interesante es que durante la inserción de 1,000,000 filas, se crean 25 Mappers (tareas) pero se ejecutan en serie (una tras otra); ¿esto es normal?

Este es el código para mi solución actual:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 

    protected void map(LongWritable key, Text value, Context context) throws IOException { 
     Map<String, String> parsedLine = parseLine(value.toString()); 

     Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1]))); 
     for (String currentKey : parsedLine.keySet()) { 
      row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))); 
     } 

     try { 
      context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

public int run(String[] args) throws Exception { 
    if (args.length != 2) { 
     return -1; 
    } 

    conf.set("hbase.mapred.outputtable", args[1]); 

    // I got these conf parameters from a presentation about Bulk Load 
    conf.set("hbase.hstore.blockingStoreFiles", "25"); 
    conf.set("hbase.hregion.memstore.block.multiplier", "8"); 
    conf.set("hbase.regionserver.handler.count", "30"); 
    conf.set("hbase.regions.percheckin", "30"); 
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3"); 
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15"); 

    Job job = new Job(conf); 
    job.setJarByClass(BulkLoadMapReduce.class); 
    job.setJobName(NAME); 
    TextInputFormat.setInputPaths(job, new Path(args[0])); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapperClass(CustomMap.class); 
    job.setOutputKeyClass(ImmutableBytesWritable.class); 
    job.setOutputValueClass(Put.class); 
    job.setNumReduceTasks(0); 
    job.setOutputFormatClass(TableOutputFormat.class); 

    job.waitForCompletion(true); 
    return 0; 
} 

public static void main(String[] args) throws Exception { 
    Long startTime = Calendar.getInstance().getTimeInMillis(); 
    System.out.println("Start time : " + startTime); 

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args); 

    Long endTime = Calendar.getInstance().getTimeInMillis(); 
    System.out.println("End time : " + endTime); 
    System.out.println("Duration milliseconds: " + (endTime-startTime)); 

    System.exit(errCode); 
} 
+0

Supongo que quiere que su título sea "carga masiva" y no "carga bluk" ... pero avíseme si mi corrección fue incorrecta. :-) –

+0

¿Has leído esto? http://hbase.apache.org/docs/r0.89.20100621/bulk-loads.html –

+0

Además, ¿ha dividido previamente sus regiones? Si no, básicamente tienes un escritor de un único subproceso, lo cual lo explicaría. Básicamente tienes un escritor por región. –

Respuesta

16

He pasado por un proceso que es probablemente muy similar a la suya de tratar de encontrar una manera eficiente para cargar datos de un MR en HBase. Lo que encontré para trabajar es usar HFileOutputFormat como OutputFormatClass del MR.

A continuación está la base de mi código que tengo que generar la función job y Mapper map que escribe los datos. Esto fue rápido. Ya no lo usamos, así que no tengo números disponibles, pero fueron alrededor de 2,5 millones de registros en menos de un minuto.

Aquí es el (simplificada) la función que escribí para generar el trabajo para mi proceso de MapReduce para poner datos en HBase

private Job createCubeJob(...) { 
    //Build and Configure Job 
    Job job = new Job(conf); 
    job.setJobName(jobName); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapOutputValueClass(Put.class); 
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper 
    job.setJarByClass(CubeBuilderDriver.class); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(HFileOutputFormat.class); 

    TextInputFormat.setInputPaths(job, hiveOutputDir); 
    HFileOutputFormat.setOutputPath(job, cubeOutputPath); 

    Configuration hConf = HBaseConfiguration.create(conf); 
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); 
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); 

    HTable hTable = new HTable(hConf, tableName); 

    HFileOutputFormat.configureIncrementalLoad(job, hTable); 
    return job; 
} 

Ésta es mi función de mapa de la clase HiveToHBaseMapper (ligeramente editado).

public void map(WritableComparable key, Writable val, Context context) 
     throws IOException, InterruptedException { 
    try{ 
     Configuration config = context.getConfiguration(); 
     String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); 
     String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); 
     String column = strs[COLUMN_INDEX]; 
     String Value = strs[VALUE_INDEX]; 
     String sKey = generateKey(strs, config); 
     byte[] bKey = Bytes.toBytes(sKey); 
     Put put = new Put(bKey); 
     put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
         ? Bytes.toBytes(Double.MIN_VALUE) 
         : Bytes.toBytes(value)); 

     ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); 
     context.write(ibKey, put); 

     context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); 
    } 
    catch(Exception e){ 
     context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);  
    } 

} 

bastante seguro de que esto no va a ser una solución copiar y pegar & para usted. Obviamente, los datos con los que estaba trabajando aquí no necesitaban ningún procesamiento personalizado (eso se hizo en un trabajo de MR anterior a este). Lo principal que deseo proporcionarle es HFileOutputFormat. El resto es solo un ejemplo de cómo lo usé. :)
Espero que te lleve a un camino sólido hacia una buena solución. :

+1

Intenté usar 'HfileOutputFormat' en mi código, pero sigo recibiendo excepciones, ¿alguna idea? 'java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put no se puede convertir a org.apache.hadoop.hbase.KeyValue \t en org.apache.hadoop.hbase.mapreduce.HFileOutputFormat $ 1.Escribir (HFileOutputFormat.java:82) \t en org.apache.hadoop.mapred.ReduceTask $ NewTrackingRecordWriter.write (ReduceTask.java:508) \t en org.apache.hadoop.mapreduce.TaskInputOutputContext.write (TaskInputOutputContext.java:80) \t en org.apache.hadoop.mapreduce.Reducer.reduce (Reducer.java:156) \t ... ' –

+0

@kramer Más que, intentando 'escribir' un tipo diferente al que está esperando (de ahí el reparto) error) no realmente. Tendría que ver el código para tomar una foto de eso. – Nija

+0

¿HFileOutputFormat es más rápido que TableOutputFormat? Dada la misma situación que la división de la región. –

0

Una cosa interesante es que durante la inserción de 1,000,000 de filas, se crean 25 Mappers (tareas) pero se ejecutan en serie (una tras otra); ¿esto es normal?

mapreduce.tasktracker.map.tasks.maximum parámetro que está predeterminado en 2 determina el número máximo de tareas que se pueden ejecutar en paralelo en un nodo. A menos que haya cambiado, debería ver 2 tareas de mapa ejecutándose simultáneamente en cada nodo.

+0

Lo intenté, pero el resultado no cambió. –

+0

¿Dónde especificaste el parámetro? Se debe especificar en mapred-site.xml en todos los nodos antes de que los daemons de Hadoop comiencen. Verifique esta [documentación] (http://wiki.apache.org/hadoop/FAQ#I_see_a_maximum_of_2_maps.2BAC8-reduces_spawned_concurrently_on_each_TaskTracker.2C_how_do_I_increase_that.3F). ¿Cómo lo verificaste? Se puede verificar desde JobTracker Web Console. –

Cuestiones relacionadas