2012-01-27 18 views
26

Estoy tratando de usar un pool de trabajadores en python usando objetos Process. Cada trabajador (un Proceso) realiza alguna inicialización (toma una cantidad de tiempo no trivial), obtiene una serie de trabajos (idealmente usando map()) y devuelve algo. No se necesita comunicación más allá de eso. Sin embargo, parece que no puedo entender cómo usar map() para usar la función compute() de mi trabajador.python Pool with worker Procesos

from multiprocessing import Pool, Process 

class Worker(Process): 
    def __init__(self): 
     print 'Worker started' 
     # do some initialization here 
     super(Worker, self).__init__() 

    def compute(self, data): 
     print 'Computing things!' 
     return data * data 

if __name__ == '__main__': 
    # This works fine 
    worker = Worker() 
    print worker.compute(3) 

    # workers get initialized fine 
    pool = Pool(processes = 4, 
       initializer = Worker) 
    data = range(10) 
    # How to use my worker pool? 
    result = pool.map(compute, data) 

es un trabajo en cola el camino a seguir en su lugar, o puedo utilizar map()?

+0

Todos los objetos de proceso son con estado. Es posible que desee eliminar esa palabra del título. También. 'compute' es un método de un trabajador. En los ejemplos, generalmente es una función completamente independiente. ¿Por qué no escribir la función de cálculo para simplemente incluir tanto la inicialización como el procesamiento? –

+0

Muy bien, gracias. La inicialización lleva mucho tiempo, por lo que solo quiero hacerlo una vez por proceso de trabajo. – Felix

+0

Debe destacar la parte de la pregunta "se pasa una serie de trabajos". Dado que eso no era obvio. –

Respuesta

50

Le sugiero que utilice una cola para esto.

class Worker(Process): 
    def __init__(self, queue): 
     super(Worker, self).__init__() 
     self.queue= queue 

    def run(self): 
     print 'Worker started' 
     # do some initialization here 

     print 'Computing things!' 
     for data in iter(self.queue.get, None): 
      # Use data 

ya se puede empezar una pila de ellos, todo el trabajo de ir de una sola cola

request_queue = Queue() 
for i in range(4): 
    Worker(request_queue).start() 
for data in the_real_source: 
    request_queue.put(data) 
# Sentinel objects to allow clean shutdown: 1 per worker. 
for i in range(4): 
    request_queue.put(None) 

Ese tipo de cosas debe permitirle amortizar el coste de lanzamiento a través de múltiples caras trabajadores.

+0

Eso es lo que pensé, ¡gracias! Terminé usando una cola de trabajos (entrada) y cola de resultados (salida) para sincronizar todo. – Felix

+0

tu ejemplo es increíble. Intento ahora cómo ingresar los objetos centinela cuando se presiona strg + c sin una excepción – Dukeatcoding

+0

@ S.Lott: ¿No es que Queue no es pickle-able? y es por eso que usa [multiprocesamiento.Manager() .Queue] (http://stackoverflow.com/questions/3217002/how-do-you-pass-a-queue-reference-to-a-function-managed-by -pool-map-async)? – zuuz

4

initializer espera una invocación arbitraria que inicia, por ejemplo, puede establecer algunos globales, no una subclase Process; map acepta un iterable arbitrario:

#!/usr/bin/env python 
import multiprocessing as mp 

def init(val): 
    print('do some initialization here') 

def compute(data): 
    print('Computing things!') 
    return data * data 

def produce_data(): 
    yield -100 
    for i in range(10): 
     yield i 
    yield 100 

if __name__=="__main__": 
    p = mp.Pool(initializer=init, initargs=('arg',)) 
    print(p.map(compute, produce_data()))