2012-08-02 11 views
6

No obtengo el siguiente error cuando ejecuto el código en pequeños datos. Pero obtengo el siguiente error al usar múltiples salidas, cuando ejecuto el mismo código en un conjunto de datos más grande. Pls Ayuda!hadoop múltiple ya se está creando excepción

org.apache.hadoop.ipc.RemoteException: 
org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file 
/home/users/mlakshm/alop176/data-r-00001 for 
DFSClient_attempt_201208010142_0043_r_000001_1 on client 10.0.1.100, because this file 
is already being created by DFSClient_attempt_201208010142_0043_r_000001_0 on  10.0.1.130 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:1406) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1246) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1188) 
    at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:628) 
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:616) 
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:416) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) 

    at org.apache.hadoop.ipc.Client.call(Client.java:1070) 
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) 
    at $Proxy2.create(Unknown Source) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:616) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) 
    at $Proxy2.create(Unknown Source) 
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.<init>(DFSClient.java:3248) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:713) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:182) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:555) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:455) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:118) 
    at com.a.MultipleOutputs$InternalFileOutputFormat.getRecordWriter(MultipleOutputs.java:565) 
    at com.a.MultipleOutputs.getRecordWriter(MultipleOutputs.java:432) 
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:518) 
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:482) 
    at com.a.ReduceThree1.reduce(ReduceThree1.java:56) 
    at com.a.ReduceThree1.reduce(ReduceThree1.java:1) 
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:416) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 

en org.apache.hadoop.mapred.Child.main (Child.java:249)


La reducir clase es el siguiente:

public class ReduceThree1 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ 
     // @SuppressWarnings("unchecked") 
     private MultipleOutputs mos; 

     public void configure(JobConf conf1) { 

     mos = new MultipleOutputs(conf1); 

     } 

      public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 


       // MultipleOutputs mos; 
       int sum = 0; 
       ArrayList<CustomMapI> alcmap = new ArrayList<CustomMapI>(); 
       while(values.hasNext()) 
       { 

        String val = values.next().toString(); 
        StringTokenizer st = new StringTokenizer(val); 
        String uid = st.nextToken(); 
        String f_val = st.nextToken(); 
        CustomMapI cmap = new CustomMapI(uid, f_val); 
        alcmap.add(cmap); 
        sum += Integer.parseInt(f_val); 

       } 

       StringTokenizer st = new StringTokenizer(key.toString()); 
       String t = st.nextToken(); 
       String data = st.nextToken(); 

       for(int i = 0; i<alcmap.size(); i++) 
       { 

        String str_key = t+" "+alcmap.get(i).getUid(); 
        String str_val = data+" "+alcmap.get(i).getF_val()+" "+sum; 

       // output.collect(new Text(str_key), new Text(str_val)); 
        mos.getCollector("/home/users/mlakshm/alop176/data", reporter).collect(new Text(str_key), new Text(str_val)); 

        for(int j = 1; j<alcmap.size(); j++) 
        { 
         if((j>i)&&(!alcmap.get(i).equals(alcmap.get(j)))) 
         { 
          String mul_key = "null"; 


          String uidi = alcmap.get(i).getUid(); 
          String uidj = alcmap.get(j).getUid(); 


          ArrayList<String> alsort = new ArrayList<String>(); 
          alsort.add(uidi); 
          alsort.add(uidj); 
          Collections.sort(alsort); 
          int fi = Integer.parseInt(alcmap.get(i).getF_val()); 

          int fj = Integer.parseInt(alcmap.get(j).getF_val()); 
          String intersection = "null"; 
          if(fi<fj) 
          { 
          intersection = String.valueOf(fi); 
          } 
          else 
          { 
           intersection = String.valueOf(fj); 
          } 

          String mul_val = t+" "+alsort.get(0)+" "+alsort.get(1)+" "+intersection; 
         // System.out.println(mul_key+ " "+mul_val); 

          mos.getCollector("/home/users/mlakshm/alop177/datepairs", reporter).collect(new Text(mul_key), new Text(mul_val)); 
         } 
        } 

       } 


      } 

      public void close() throws IOException { 
       mos.close(); 

       } 
} 

El Conf Trabajo es el siguiente:

Configuración config1 = nueva Configuración();

  JobConf conf1 = new JobConf(config1, DJob.class); 

      conf1.setJobName("DJob1"); 
      conf1.setOutputKeyClass(Text.class); 
      conf1.setOutputValueClass(Text.class); 
     // conf.setMapOutputValueClass(Text.class); 
     // conf.setMapOutputKeyClass(Text.class); 
     // conf.setNumMapTasks(20); 
      conf.setNumReduceTasks(10); 
      conf1.setMapperClass(MapThree1.class); 
     // conf.setCombinerClass(Combiner.class); 
      conf1.setReducerClass(ReduceThree1.class); 
      conf1.setPartitionerClass(CustomPartitioner.class); 

      conf1.setInputFormat(TextInputFormat.class); 
      conf1.setOutputFormat(TextOutputFormat.class); 
     // mos = new MultipleOutputs(conf1); 
      MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop176/data", TextOutputFormat.class, LongWritable.class, Text.class); 
      MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop177/datepairs", TextOutputFormat.class, LongWritable.class, Text.class); 


      FileInputFormat.setInputPaths(conf1, new Path(other_args.get(2))); 
      FileOutputFormat.setOutputPath(conf1, new Path(other_args.get(3))); 

     JobClient.runJob(conf1); 

Respuesta

3

Lo más probable que tenga en ejecución especulativa, y dos intentos diferentes para reducir la tarea 1 están tratando de escribir en el camino /home/users/mlakshm/alop176/data-r-00001. Esto probablemente tenga éxito para tareas más pequeñas, ya que terminan antes de que hadoop especulativamente ejecute un segundo intento.

Veo que su implementación de MultipleOutputs es personalizada (com.a.MultipleOutputs), debe escribir todos los datos de HDFS en el directorio de trabajo de tareas y dejar que OutputComitter lo mueva al directorio de salida final en la confirmación de salida. Si puede, pegue el código y podemos echarle un vistazo.

+0

Hola, Chris, he publicado el código como mi respuesta. ¿Puedes ver eso y ayudarme con el problema? ¡Gracias! –

+0

@MahalakshmiLakshminarayanan resolvió este problema? ¿Hizo cambios en el código o en cualquier cambio de configuración? Déjame saber. Gracias por adelantado. – Shash

+0

@shash Agregué esto a mi código en el trabajo conf: jobConf.setSpeculativeExecution (false); –

Cuestiones relacionadas