Tengo un archivo de texto simple con posiblemente millones de líneas que necesita un análisis personalizado y quiero cargarlo en una tabla HBase lo más rápido posible (usando el cliente Hadoop o HBase Java).¿Cuál es la forma más rápida de cargar datos de manera masiva en HBase mediante programación?
Mi solución actual se basa en un trabajo MapReduce sin la pieza Reducir. Yo uso FileInputFormat
para leer el archivo de texto para que cada línea se pase al método map
de mi clase Mapper
. En este punto, la línea se analiza para formar un objeto Put
que se escribe en el context
. Luego, TableOutputFormat
toma el objeto Put
y lo inserta en la tabla.
Esta solución produce una tasa de inserción promedio de 1,000 filas por segundo, que es menor de lo que esperaba. Mi configuración HBase está en modo pseudo distribuido en un único servidor.
Una cosa interesante es que durante la inserción de 1,000,000 filas, se crean 25 Mappers (tareas) pero se ejecutan en serie (una tras otra); ¿esto es normal?
Este es el código para mi solución actual:
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}
conf.set("hbase.mapred.outputtable", args[1]);
// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");
Job job = new Job(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);
Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));
System.exit(errCode);
}
Supongo que quiere que su título sea "carga masiva" y no "carga bluk" ... pero avíseme si mi corrección fue incorrecta. :-) –
¿Has leído esto? http://hbase.apache.org/docs/r0.89.20100621/bulk-loads.html –
Además, ¿ha dividido previamente sus regiones? Si no, básicamente tienes un escritor de un único subproceso, lo cual lo explicaría. Básicamente tienes un escritor por región. –