2011-07-27 15 views
6

tengo trabajo mapreduce: mi clase de código Mapa:Hadoop MapReduce: Driver para encadenar creadores de mapas dentro de un trabajo MapReduce

public static class MapClass extends Mapper<Text, Text, Text, LongWritable> { 

    @Override 
    public void map(Text key, Text value, Context context) 
     throws IOException, InterruptedException { 
    } 
} 

Y quiero usar ChainMapper:

1. Job job = new Job(conf, "Job with chained tasks"); 
2. job.setJarByClass(MapReduce.class); 
3. job.setInputFormatClass(TextInputFormat.class); 
4. job.setOutputFormatClass(TextOutputFormat.class); 

5. FileInputFormat.setInputPaths(job, new Path(InputFile)); 
6. FileOutputFormat.setOutputPath(job, new Path(OutputFile)); 

7. JobConf map1 = new JobConf(false); 

8. ChainMapper.addMapper(
     job, 
     MapClass.class, 
     Text.class, 
     Text.class, 
     Text.class, 
     Text.class, 
     true, 
     map1 
     ); 

pero su informe tiene un error en la línea 8:

Múltiples marcadores en esta línea - Ocurrencia de 'addMappe r ' - El método addMapper (JobConf, Class>, Class, Class, Class, Class, boolean, JobConf) en el tipo ChainMapper no es aplicable para los argumentos (Job, Clase, Clase, Clase, Clase, Clase, boolean, Configuración) - Puntero de instrucción actual de depuración - El método addMapper (JobConf, Class>, Class, Class, Class, Class, boolean, JobConf) en el tipo ChainMapper no es aplicable para los argumentos (JobConf, Class, clase, clase, clase, clase, boolean, JobConf)

Respuesta

0

usted tiene que utilizar en lugar de ConfigurationJobConf. JobConf es una subclase de Configuration, por lo que debería existir un constructor para eso.

0

Para el primer argumento de su ChainMapper.addMapper(), ha pasado el objeto job. Mientras la función está esperando un objeto de tipo JobConf. Reescribiendo a:

 
ChainMapper.addMapper(
      (JobConf)conf, 
      MapClass.class, 
      Text.class, 
      Text.class, 
      Text.class, 
      Text.class, 
      true, 
      map1 
      ); 

debe resolver el problema ..

+0

él ya tiene un jobconf, él necesita una configuración. Casting no será la elección correcta aquí. Se trata de map1 y no de conf. –

+1

su clase de mapa tiene que extenderse: org.apache.hadoop.mapred.Mapper no org.apache.hadoop.mapreduce.Mapper – user864846

7

Después de una gran cantidad de "Kung Fu", que fue capaz de utilizar ChainMapper/ChainReducer. Gracias por el último comentario user864846.

/** 
* Licensed to the Apache Software Foundation (ASF) under one 
* or more contributor license agreements. See the NOTICE file 
* distributed with this work for additional information 
* regarding copyright ownership. The ASF licenses this file 
* to you under the Apache License, Version 2.0 (the 
* "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 

package myPKG; 

/* 
* Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2. 
*/ 

import java.io.IOException; 
import java.util.Iterator; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.mapred.lib.ChainMapper; 
import org.apache.hadoop.mapred.lib.ChainReducer; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class ChainWordCount extends Configured implements Tool { 

    public static class Tokenizer extends MapReduceBase 
    implements Mapper<LongWritable, Text, Text, IntWritable> { 

     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 

     public void map(LongWritable key, Text value, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      String line = value.toString(); 
      System.out.println("Line:"+line); 
      StringTokenizer itr = new StringTokenizer(line); 
      while (itr.hasMoreTokens()) { 
       word.set(itr.nextToken()); 
       output.collect(word, one); 
      } 
     } 
    } 

    public static class UpperCaser extends MapReduceBase 
    implements Mapper<Text, IntWritable, Text, IntWritable> { 

     public void map(Text key, IntWritable value, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      String word = key.toString().toUpperCase(); 
      System.out.println("Upper Case:"+word); 
      output.collect(new Text(word), value);  
     } 
    } 

    public static class Reduce extends MapReduceBase 
    implements Reducer<Text, IntWritable, Text, IntWritable> { 

     public void reduce(Text key, Iterator<IntWritable> values, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      int sum = 0; 
      while (values.hasNext()) { 
       sum += values.next().get(); 
      } 
      System.out.println("Word:"+key.toString()+"\tCount:"+sum); 
      output.collect(key, new IntWritable(sum)); 
     } 
    } 

    static int printUsage() { 
     System.out.println("wordcount <input> <output>"); 
     ToolRunner.printGenericCommandUsage(System.out); 
     return -1; 
    } 

    public int run(String[] args) throws Exception { 
     JobConf conf = new JobConf(getConf(), ChainWordCount.class); 
     conf.setJobName("wordcount"); 

     if (args.length != 2) { 
      System.out.println("ERROR: Wrong number of parameters: " + 
        args.length + " instead of 2."); 
      return printUsage(); 
     } 
     FileInputFormat.setInputPaths(conf, args[0]); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     JobConf mapAConf = new JobConf(false); 
     ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf); 

     JobConf mapBConf = new JobConf(false); 
     ChainMapper.addMapper(conf, UpperCaser.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf); 

     JobConf reduceConf = new JobConf(false); 
     ChainReducer.setReducer(conf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf); 

     JobClient.runJob(conf); 
     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new ChainWordCount(), args); 
     System.exit(res); 
    } 
} 

EDITAR en versión más reciente (al menos desde hadoop 2.6), no es necesaria la bandera true en addMapper. (de hecho, la firma tiene supresión de cambio it`).

por lo que sería simplemente

JobConf mapAConf = new JobConf(false); 
ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, 
         Text.class, IntWritable.class, mapAConf); 
0

En realidad, la clase Mapper ha de aplicar la interfaz org.apache.hadoop.mapred.Mapper. Tuve el mismo problema, pero esto lo resolvió.

Cuestiones relacionadas