2012-02-15 18 views
6

Estoy tratando de aprender a utilizar la API de Python de Yelp para MapReduce, MRJob. Su simple ejemplo de contador de palabras tiene sentido, pero tengo curiosidad por saber cómo manejar una aplicación que involucra múltiples entradas. Por ejemplo, en lugar de simplemente contar las palabras en un documento, multiplicar un vector por una matriz. Se me ocurrió esta solución, que funciona, pero se siente tonta:Entradas múltiples con MRJob

class MatrixVectMultiplyTast(MRJob): 
    def multiply(self,key,line): 
      line = map(float,line.split(" ")) 
      v,col = line[-1],line[:-1] 

      for i in xrange(len(col)): 
        yield i,col[i]*v 

    def sum(self,i,occurrences): 
      yield i,sum(occurrences) 

    def steps(self): 
      return [self.mr (self.multiply,self.sum),] 

if __name__=="__main__": 
    MatrixVectMultiplyTast.run() 

Este código se ejecuta ./matrix.py < input.txt y la razón en que funciona es que la matriz almacenada en entrada.txt por columnas, con el valor correspondiente en el vector final de la línea.

Por lo tanto, la siguiente matriz y el vector:

enter image description here

se representan como entrada.txt como:

enter image description here

En resumen, ¿cómo hago para almacenar la matriz y vector de forma más natural en archivos separados y pasarlos a ambos en MRJob?

Respuesta

3

Si usted está en necesidad de tratamiento de sus datos en bruto en contra de otro (o misma row_i, row_j) conjunto de datos, puede:

1) Crear un depósito de S3 para almacenar una copia de sus datos. Pase la ubicación de esta copia a su clase de tarea, p. self.options.bucket y self.options.my_datafile_copy_location en el siguiente código. Advertencia: Desafortunadamente, parece que todo el archivo debe ser "descargado" a las máquinas de tareas antes de ser procesado. Si las conexiones fallan o tarda demasiado en cargarse, este trabajo puede fallar. Aquí hay un código de Python/MRJob para hacer esto.

poner esto en su función de asignador:

d1 = line1.split('\t', 1) 
v1, col1 = d1[0], d1[1] 
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING) 
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip() 
### CAVEAT: Needs to get the whole file before processing the rest. 
for line2 in data_copy.split('\n'): 
    d2 = line2.split('\t', 1) 
    v2, col2 = d2[0], d2[1] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
conn.close() 

2) Crear un dominio SimpleDB, y almacenar todos sus datos en ese país. Lea aquí el boto y SimpleDB: http://code.google.com/p/boto/wiki/SimpleDbIntro

Su código asignador se vería así:

dline = dline.strip() 
d0 = dline.split('\t', 1) 
v1, c1 = d0[0], d0[1] 
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME) 
for item in domain: 
    v2, c2 = item.name, item['column'] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
sdb.close() 

Esta segunda opción puede funcionar mejor si usted tiene cantidades muy grandes de datos, ya que puede hacer las solicitudes para cada fila de datos en lugar de la cantidad total a la vez. Tenga en cuenta que los valores de SimpleDB solo pueden tener un máximo de 1024 caracteres, por lo que es posible que deba comprimir/descomprimir mediante algún método si los valores de sus datos son más largos.

1

Según entiendo, no usaría MrJob a menos que quisiera aprovechar el clúster Hadoop o los servicios Hadoop de Amazon, incluso si el ejemplo utiliza ejecutarse en archivos locales.

MrJob en principal usa "Hadoop streaming" para enviar el trabajo.

Esto significa que todas las entradas especificadas como archivos o carpetas desde Hadoop se transmiten al asignador y los resultados posteriores al reductor. Todo el mapeador obtiene una porción de entrada y considera que todas las entradas son esquemáticamente iguales para que analice y procese de manera uniforme la clave, el valor para cada segmento de datos.

Derivado de este entendimiento, las entradas son esquemáticamente iguales para el asignador.La única manera posible de incluir dos datos esquemáticos diferentes es intercalarlos en el mismo archivo de tal manera que el mapeador pueda comprender qué datos son vectoriales y cuáles datos de matriz.

You are actually doing it already. 

Simplemente puede mejorar eso teniendo algún especificador si una línea es datos de matriz o un vector de datos. Una vez que ve un vector de datos, se le aplican los datos de matriz anteriores.

matrix, 1, 2, ... 
matrix, 2, 4, ... 
vector, 3, 4, ... 
matrix, 1, 2, ... 
..... 

Pero el proceso que ha mencionado funciona bien. Debe tener todos los datos esquemáticos en un solo archivo.

Esto todavía tiene problemas. K, V map reduce funciona mejor cuando el esquema completo está presente en una sola línea y contiene una unidad de procesamiento única completa.

Según entiendo, ya lo está haciendo correctamente, pero supongo que Map-Reduce no es un mecanismo adecuado para este tipo de datos. Espero que alguien aclare esto aún más que yo.

2

La respuesta real a su pregunta es que mrjob todavía no admite el patrón de combinación de transmisión de hadoop, que es leer la variable de entorno map_input_file (que expone la propiedad map.input.file) para determinar qué tipo de archivo están tratando según su ruta y/o nombre.

Es posible que aún así ser capaz de llevarlo a cabo, si se puede detectar fácilmente de sólo leer los datos en sí, que el tipo al que pertenece, como se muestra en este artículo:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

Sin embargo eso no es siempre es posible ...

De lo contrario, mi trabajo se ve fantástico y me gustaría que pudieran agregar soporte para esto en el futuro. Hasta entonces, esto es bastante un factor decisivo para mí.

1

Así es como utilizo varias entradas y, en función del nombre del archivo, realizo cambios adecuados en la fase del mapeador.

Programa Runner:

from mrjob.hadoop import * 


#Define all arguments 

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S') 
hadoop_bin = '/usr/bin/hadoop' 
mode = 'hadoop' 
hs = HadoopFilesystem([hadoop_bin]) 

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"] 

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin] 
aargs.extend(input_file_names) 
aargs.extend(['-o',output_dir]) 
print aargs 
status_file = True 

mr_job = MRJob(args=aargs) 
with mr_job.make_runner() as runner: 
    runner.run() 
os.environ['HADOOP_HOME'] = '' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 

La Clase MRJob:

class MR_Job(MRJob): 
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value' 
    def mapper(self, _, line): 
    """ 
    This function reads lines from file. 
    """ 
    try: 
     #Need to clean email. 
     input_file_name = get_jobconf_value('map.input.file').split('/')[-2] 
       """ 
       Mapper code 
       """ 
    except Exception, e: 
     print e 

    def reducer(self, email_id,visitor_id__date_time): 
    try: 
     """ 
       Reducer Code 
       """ 
    except: 
     pass 


if __name__ == '__main__': 
    MRV_Email.run() 
Cuestiones relacionadas