2009-11-24 27 views
8

Soy nuevo en python y avanzo un poco con threading - estoy haciendo algo de conversión de archivos de música y quiero poder utilizar los múltiples núcleos en mi máquina (un hilo de conversión activo por núcleo).¿Cómo puedo limitar el número de subprocesos activos en Python?

class EncodeThread(threading.Thread): 
    # this is hacked together a bit, but should give you an idea 
    def run(self): 
     decode = subprocess.Popen(["flac","--decode","--stdout",self.src], 
          stdout=subprocess.PIPE) 
     encode = subprocess.Popen(["lame","--quiet","-",self.dest], 
           stdin=decode.stdout) 
     encode.communicate() 

# some other code puts these threads with various src/dest pairs in a list 

for proc in threads: # `threads` is my list of `threading.Thread` objects 
    proc.start() 

Todo funciona, todos los archivos se codifican, ¡bravo! ... sin embargo, todos los procesos aparecen inmediatamente, pero solo deseo ejecutar dos a la vez (uno para cada núcleo). Tan pronto como haya terminado, quiero que pase al siguiente en la lista hasta que finalice, luego continúe con el programa.

¿Cómo puedo hacer esto?

(He mirado en las funciones de grupo de subprocesos y la cola, pero no puedo encontrar una respuesta simple.)

Editar: quizás debería añadir que cada uno de mis hilos está utilizando para ejecutar un subprocess.Popen línea de comando separada decodificador (flac) canalizado a stdout que se alimenta a una línea de comandos codificador (lame/mp3).

+0

¿Por qué molestarse? ¿Qué hay de malo en dejar que tus hilos compitan entre sí? Es más rápido tener cada núcleo completamente saturado de trabajo. –

+0

Bueno, supongo que no lo había pensado así ... con una biblioteca de música de más de 2.000 archivos, pensé que el desove (a la vez) 2.000 proceso de decodificación (flac) canalizaba a 2.000 procesos de codificación (cojo) al mismo tiempo sería subóptimo. ¿Me equivoco? – thornomad

+0

@thornomad: Sí, estás equivocado. Limitarse a exactamente 2 procesos porque tiene 2 núcleos es incorrecto. Un proceso no saturará un núcleo con trabajo. Incluso una tubería de proceso de tres partes puede tener suficiente E/S que el núcleo no está totalmente ocupado. –

Respuesta

4

"Cada uno de mis hilos está usando subprocess.Popen para ejecutar una línea de comando [proceso] separada".

¿Por qué tener un haz de hilos gestionar un montón de procesos? Eso es exactamente lo que un sistema operativo hace por usted. ¿Por qué administrar micro lo que el sistema operativo ya administra?

En lugar de perder el tiempo con hilos de procesos de supervisión, solo un fork de procesos. Es probable que su tabla de procesos no pueda manejar 2000 procesos, pero puede manejar algunas docenas (quizás unos cientos) con bastante facilidad.

Desea tener más trabajo que la CPU puede manejar en cola. La verdadera pregunta es de memoria, no de procesos o hilos. Si la suma de todos los datos activos para todos los procesos excede la memoria física, entonces los datos deben intercambiarse, y eso lo hará más lento.

Si sus procesos tienen una bastante pequeña huella de memoria, puede tener mucha, mucha corriente. Si sus procesos tienen una gran huella de memoria, no puede tener muchos en ejecución.

+0

Heh. Ahora veo el truco de mi enfoque pirateado: es un poco redundante. Entonces, ¿hay alguna manera con el subproceso de administrar un "grupo" (como otros han sugerido)? Gracias por tu contribución. Aprendiendo sobre la marcha ... ¿solo se trataría de usar 'subprocess.poll()' para ver qué se hace y qué se está ejecutando? Gracias de nuevo. – thornomad

+0

Correcto. Puede usar un conjunto simple de procesos; eliminar los que están terminados. Agregue unos y mantenga el tamaño del conjunto bajo algún límite. Es solo un conjunto con 'add' y' remove'. –

1

Si está utilizando la versión predeterminada de "cpython", esto no lo ayudará, ya que solo un subproceso se puede ejecutar a la vez; mira hacia arriba Global Interpreter Lock. En cambio, sugiero mirar el multiprocessingmodule en Python 2.6 - hace que la programación paralela sea muy fácil. Puede crear un objeto Pool con procesos 2*num_threads, y darle varias tareas para hacer. Ejecutará hasta 2*num_threads tareas a la vez, hasta que todo esté listo.

En el trabajo, recientemente he migrado un montón de herramientas XML de Python (un diferido, xpath grepper y transformador bulk xslt) para usar esto, y he tenido muy buenos resultados con dos procesos por procesador.

+1

Si sus subprocesos ejecutarán funciones en su código de Python, el módulo de multiprocesamiento es excelente. Si está llamando a un programa externo, este módulo no ofrecería una ventaja sobre el módulo de subproceso ... porque esos programas externos no tendrán ningún medio de devolver sus resultados al padre que no sean archivos temporales, tuberías, etc. Las enormes ventajas de IPC del módulo de multiprocesamiento se pierden en los programas externos que ejecutaría. (Tener cada proceso en un subproceso de llamada multiproceso suena bastante tonto, por ejemplo). –

0

No soy un experto en esto, pero he leído algo sobre "Lock" s. This article podría ayudarle a cabo

Esperanza esto ayuda

1

A mi me parece que lo que quiere es una piscina de algún tipo, y en esa piscina que le gustaría que la tienen roscas n donde n == el número de procesadores en tu sistema. Luego tendrías otro hilo cuyo único trabajo era alimentar trabajos en una cola que los hilos de trabajo podrían recoger y procesar a medida que se volvieran gratuitos (por lo que para una máquina de código dual, tendrías tres hilos pero el hilo principal estaría haciendo muy poco).

Como eres nuevo en Python, supongo que no conoces el GIL y sus efectos secundarios con respecto al enhebrado. Si lees el artículo que he vinculado, pronto entenderás por qué las soluciones tradicionales de subprocesamiento múltiple no siempre son las mejores en el mundo de Python. En su lugar, debe considerar usar el módulo multiprocessing (nuevo en Python 2.6, en 2.5 puede use this backport) para lograr el mismo efecto. Da un paso al frente del problema del GIL al usar procesos múltiples como si fueran hilos dentro de la misma aplicación. Existen algunas restricciones sobre cómo compartir datos (está trabajando en diferentes espacios de memoria) pero en realidad esto no es malo: solo fomentan buenas prácticas, como minimizar los puntos de contacto entre hilos (o procesos en este caso).

En su caso, probablemente esté interesada en utilizar un grupo como se especifica here.

+0

Gracias - Veré el multiproceso ... y edité mi pregunta para tener un poco más de detalle ... parece que el subproceso.Popen se rompe y hace lo suyo. – thornomad

+0

El módulo de multiprocesamiento, BTW, es una maravillosa adición a 2.6 (del módulo de terceros de pyprocessing que admite 2.4 y 2.5). Sin embargo, no es tan adecuado para ejecutar programas externos. Las principales ventajas del módulo de multiprocesamiento radican en la forma en que se modela después del soporte de subprocesamiento. Puede crear Queue() s como el principal mecanismo de comunicaciones entre (hilos/proceso) para eliminar la mayor parte de la necesidad de su propio bloqueo explícito. (Queue() s proporciona soporte coherente para múltiples productores y consumidores de objetos arbitrarios). Genial si los niños ejecutan código Python. –

1

Respuesta corta: no usar hilos.

Para un ejemplo de trabajo, se puede ver en algo que he sacudido recientemente juntos en el trabajo. Es una pequeña envoltura alrededor de ssh que ejecuta un número configurable de subprocesos Popen(). Lo publiqué en: Bitbucket: classh (Cluster Admin's ssh Wrapper).

Como se señaló, no uso hilos; Acabo de engendrar a los niños, los repito llamando a sus métodos .poll() y comprobando los tiempos de espera (también configurables) y repongo el grupo cuando reúno los resultados. He jugado con diferentes valores de sleep() y en el pasado he escrito una versión (antes del subproceso módulo se agregó a Python) que utiliza el señal módulo (SIGCHLD y SIGALRM) y el os.fork() y os.execve() funciones --- que mi tubería y descriptor de plomería, etc.).

En mi caso estoy imprimiendo incrementalmente los resultados mientras los reúno ... y recuerdo todos ellos para resumir al final (cuando todos los trabajos se han completado o se han matado por exceder el tiempo de espera).

me corrieron de que, tal como fue anunciado, en una lista de 25.000 hosts internos (muchos de los cuales están abajo, jubilados, que se encuentra a nivel internacional, no se puede acceder a mi cuenta de prueba, etc.). Completó el trabajo en poco más de dos horas y no tuvo problemas. (Hubo alrededor de 60 de ellos que fueron tiempos de espera debido a los sistemas en estados degenerados/azotes - lo que demuestra que mi manejo de tiempo de espera funciona correctamente).

así que sé este modelo funciona de forma fiable. Ejecutar 100 procesos actuales ssh con este código no parece causar ningún impacto notable. (Es una caja de FreeBSD moderadamente antigua). Solía ​​ejecutar la versión antigua (antes del subproceso) con 100 procesos simultáneos en mi vieja computadora portátil de 512MB sin problemas).

(BTW: planeo limpiar esto y agregarle funciones; no dude en contribuir o clonar su propia rama; para eso es Bitbucket.org).

+0

Gracias - Lo veré un poco más de cerca hoy. Subí rápidamente con un conjunto bastante simple de bucles while que parece funcionar solo comprobando el método 'p.communicate()'. (PD: creo que te falta un '' '' de cierre en la línea 4 de la fuente). – thornomad

30

Si desea limitar el número de hilos paralelos, utilizar un semaphore:

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads) 

class EncodeThread(threading.Thread): 

    def run(self): 
     threadLimiter.acquire() 
     try: 
      <your code here> 
     finally: 
      threadLimiter.release() 

Start todos los hilos a la vez. Todos excepto maximumNumberOfThreads esperarán en threadLimiter.acquire() y un hilo en espera solo continuará una vez que otro hilo pase por threadLimiter.release().

+1

Esto responde exactamente a la pregunta inicial. Ideal para las personas que terminan aquí buscando en Google. –

Cuestiones relacionadas