2010-01-17 21 views
45

Tengo problemas con el módulo de multiprocesamiento. Estoy usando un grupo de trabajadores con su método de mapa para cargar datos de muchos archivos y para cada uno de ellos analizo datos con una función personalizada. Cada vez que se procesa un archivo, me gustaría tener un contador actualizado para que pueda hacer un seguimiento de cuántos archivos quedan por procesar. Aquí es código de ejemplo:Multiproceso de Python y un contador compartido

def analyze_data(args): 
    # do something 
    counter += 1 
    print counter 


if __name__ == '__main__': 

    list_of_files = os.listdir(some_directory) 

    global counter 
    counter = 0 

    p = Pool() 
    p.map(analyze_data, list_of_files) 

no puedo encontrar una solución para esto.

Respuesta

48

El problema es que la variable counter no se comparte entre sus procesos: cada proceso por separado crea su propia instancia local y la incrementa.

Consulte this section de la documentación de algunas técnicas que puede emplear para compartir estado entre sus procesos. En su caso, es posible que desee compartir una instancia Value entre sus trabajadores

Aquí hay una versión funcional de su ejemplo (con algunos datos de entrada ficticios). Tenga en cuenta que utiliza los valores globales que yo realmente tratar de evitar en la práctica: la clase

from multiprocessing import Pool, Value 
from time import sleep 

counter = None 

def init(args): 
    ''' store the counter for later use ''' 
    global counter 
    counter = args 

def analyze_data(args): 
    ''' increment the global counter, do something with the input ''' 
    global counter 
    # += operation is not atomic, so we need to get a lock: 
    with counter.get_lock(): 
     counter.value += 1 
    print counter.value 
    return args * 10 

if __name__ == '__main__': 
    #inputs = os.listdir(some_directory) 

    # 
    # initialize a cross-process counter and the input lists 
    # 
    counter = Value('i', 0) 
    inputs = [1, 2, 3, 4] 

    # 
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    # 
    p = Pool(initializer = init, initargs = (counter,)) 
    i = p.map_async(analyze_data, inputs, chunksize = 1) 
    i.wait() 
    print i.get() 
+0

¡Excelente respuesta! Tuve el mismo problema en IronPython, y aunque multiprocesamiento.Valor no está disponible, puede hacer algo similar con clr.Reference y System.Threading.Interlocked: http://stackoverflow.com/questions/2255461/how-to-atomically- increment-a-static-member-in-ironpython/2314858 # 2314858 –

+3

@jkp, ¿cómo lo harías sin la variable global? - Estoy tratando de usar una clase, pero no es tan fácil como parece. Consulte http://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-pythons-multiprocessing-pool-ma – Anna

+18

Desafortunadamente, este ejemplo parece estar defectuoso, ya que 'counter.value + = 1 'no es atómico entre procesos, por lo que el valor será incorrecto si se ejecuta lo suficiente con unos pocos procesos –

24

Contador sin la carrera de la condición de error:

class Counter(object): 
    def __init__(self): 
     self.val = multiprocessing.Value('i', 0) 

    def increment(self, n=1): 
     with self.val.get_lock(): 
      self.val.value += n 

    @property 
    def value(self): 
     return self.val.value 
+0

Para código similar que funciona con 'joblib's' Parallel' (el código de esta respuesta no funciona con 'joblib'), consulte https://github.com/davidheryanto/etc/blob/master/python-recipes/parallel-joblib-counter.py –

0

más rápido clase de contador sin necesidad de utilizar el incorporado en la cerradura de Valor dos veces

class Counter(object): 
    def __init__(self, initval=0): 
     self.val = multiprocessing.RawValue('i', initval) 
     self.lock = multiprocessing.Lock() 

    def increment(self): 
     with self.lock: 
      self.val.value += 1 

    @property 
    def value(self): 
     return self.val.value 

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

Cuestiones relacionadas