2011-06-28 11 views
9

Estoy intentando que los tiempos de espera funcionen en python3.2 utilizando el módulo concurrent.futures. Sin embargo, cuando se agota el tiempo de espera, realmente no detiene la ejecución. Intenté con los subprocesadores y los ejecutores del grupo de procesos ninguno de ellos detuvo la tarea, y solo hasta que finalice se aumenta el tiempo de espera. Entonces, ¿alguien sabe si es posible hacer que esto funcione?Cómo usar concurrent.futures con tiempos de espera?

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

def run_loop(max_number): 
    print("Started:", datetime.datetime.now(), max_number) 
    last_number = 0; 
    for i in range(1, max_number + 1): 
     last_number = i * i 
    return last_number 

def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 

if __name__ == '__main__': 
    main() 

Respuesta

15

Por lo que yo puedo decir, TimeoutError se planteó realmente cuando era de esperar, y no después de finalizada la tarea.

Sin embargo, su programa se seguirá ejecutando hasta que se hayan completado todas las tareas en ejecución. Esto se debe a que las tareas que se ejecutan actualmente (en su caso, probablemente todas las tareas enviadas, ya que el tamaño de su grupo de servidores equivale a la cantidad de tareas), en realidad no se "cancelan".

TimeoutError aumenta, por lo que puede elegir no esperar hasta que la tarea finalice (y hacer otra cosa), pero la tarea seguirá ejecutándose hasta que se complete. Y Python no saldrá mientras haya tareas pendientes en los hilos/subprocesos de su Ejecutor.

Por lo que yo sé, no es posible simplemente "detener" la ejecución de futuros en curso, solo puede "cancelar" las tareas programadas que aún no se han iniciado. En su caso, no habrá ninguno, pero imagínese que tiene un conjunto de 5 subprocesos/procesos y desea procesar 100 elementos. En algún momento, puede haber 20 tareas completadas, 5 tareas en ejecución y 75 tareas programadas. En este caso, podría cancelar esas 76 tareas programadas, pero las 4 que se están ejecutando continuarán hasta que se completen, ya sea que espere el resultado o no.

Aunque no se puede hacer de esa manera, creo que debería haber formas de lograr el resultado final deseado. Tal vez esta versión se puede ayudar en el camino (no estoy seguro si lo hace exactamente lo que quería, pero podría ser de alguna utilidad):

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

class Task: 
    def __init__(self, max_number): 
     self.max_number = max_number 
     self.interrupt_requested = False 

    def __call__(self): 
     print("Started:", datetime.datetime.now(), self.max_number) 
     last_number = 0; 
     for i in xrange(1, self.max_number + 1): 
      if self.interrupt_requested: 
       print("Interrupted at", i) 
       break 
      last_number = i * i 
     print("Reached the end") 
     return last_number 

    def interrupt(self): 
     self.interrupt_requested = True 

def main(): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor: 
     tasks = [Task(num) for num in max_numbers] 
     for task, future in [(i, executor.submit(i)) for i in tasks]: 
      try: 
       print(future.result(timeout=1)) 
      except concurrent.futures.TimeoutError: 
       print("this took too long...") 
       task.interrupt() 


if __name__ == '__main__': 
    main() 

Al crear un objeto exigible para cada "tarea", y dando a aquellos para el ejecutor en lugar de solo una función simple, puede proporcionar una forma de "interrumpir" la tarea. Consejo: quitar la línea task.interrupt() y ver lo que sucede, puede hacer que sea más fácil de entender mi larga explicación anterior ;-)

4

Recientemente también me golpeó este tema, y ​​finalmente llegar a la siguiente solución usando ProcessPoolExecutor:


def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 
      stop_process_pool(executor) 

def stop_process_pool(executor): 
    for pid, processes in executor._processes.items(): 
     process.terminate() 
    executor.shutdown() 
+0

txmc puede que acaba de matar a uno de los procesos? ¿O tienes que matar a todos? –

Cuestiones relacionadas