2012-06-15 14 views
9

Cuando se ejecuta una gran cantidad de tareas (con parámetros grandes) usando Pool.apply_async, los procesos se asignan y pasan a un estado de espera, y no hay límite para la cantidad de procesos en espera. Esto puede terminar por el consumo de toda la memoria, como en el siguiente ejemplo:Multiproceso de Python: cómo limitar el número de procesos en espera?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

Estoy buscando una manera de limitar la cola de espera, de tal manera que sólo hay un número limitado de procesos en espera, y Pool.apply_async está bloqueado mientras la cola de espera está llena.

+0

ejemplo Nice (1). – mgilson

Respuesta

6

multiprocessing.Pool tiene un _taskqueue miembro del tipo multiprocessing.Queue, que toma un parámetro opcional maxsize; lamentablemente lo construye sin el conjunto de parámetros maxsize.

me gustaría recomendar la subclasificación multiprocessing.Pool con un copy-paste de multiprocessing.Pool.__init__ que pasa maxsize-_taskqueue constructor.

mono-parchear el objeto (la piscina o la cola) también funcionaría, pero usted tendría que monkeypatch pool._taskqueue._maxsize y pool._taskqueue._sem por lo que sería muy frágil:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

Estoy usando Python 2.7.3, y _taskqueue es del tipo Queue.Queue. Significa que es una cola simple, y no un multiproceso. Queue. Subclassing multiprocessing.Pool y overriding __init__ funciona bien, pero el parche de mono no funciona como se esperaba. Sin embargo, este es el truco que estaba buscando, gracias. –

0

Se podría añadir cola explícita con el parámetro maxsize y use queue.put() en lugar de pool.apply_async() en este caso. A continuación, los procesos de trabajo podrían:

for a, b in iter(queue.get, sentinel): 
    # process it 

Si desea limitar el número de argumentos de entrada creadas/resultados que se encuentran en la memoria a aproximadamente el número de procesos de trabajo activas Posteriormente, se podría utilizar pool.imap*() métodos:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

Usar 'imap' no hace diferencia. La cola de entrada aún es ilimitada y el uso de esta solución terminará consumiendo toda la memoria. – Radim

+0

@Radim: el código 'imap' en la respuesta funciona incluso si le da un generador infinito. – jfs

+0

No en Python 2, desafortunadamente (no he visto el código en py3). Para algunas soluciones, consulte [esta respuesta SO] (http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration). – Radim

1

esperar si pool._taskqueue es sobre el tamaño deseado:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

Solo quiero agregar que encontré esta es la solución más fácil para mis problemas de memoria con multiprocesamiento. Usé max_apply_size = 10 y eso funciona bien para mi problema, que es una lenta conversión de archivos. Usar un semáforo como sugiere @ecatmur parece ser una solución más robusta, pero podría ser exagerado para scripts simples. – Nate

Cuestiones relacionadas