2009-12-11 21 views

Respuesta

59

Sugiero crear instancias de un Queue.Queue antes de iniciar el hilo y pasarlo como una de argumentos de la rosca: antes de que acabe hilo, que .put s el resultado en la cola que recibe como argumento. El padre puede .get o .get_nowait a voluntad.

colas son generalmente la mejor manera de organizar la sincronización de hilos y comunicación en Python: son intrínsecamente seguro para subprocesos, los vehículos de paso de mensajes - la mejor manera de organizar la multitarea en general -)

+2

'antes de que el hilo termine, se obtiene el resultado en la cola que recibió como argumento' ¿quiere decir que esto será hecho automáticamente por python? si no (se entiende como consejo de diseño), ¿podría dejarlo en claro en la respuesta? – n611x007

+3

Es feo para especializar una función existente para eso; y la cola tiene una sobrecarga innecesaria para un único problema de resultado. Más clara y eficientemente subclass 'threading.Thread' y el nuevo método run() simplemente almacena el resultado como atributo como' self.ret = ... '(Mucho más cómodo sería una subclase de Thread que maneja valores de retorno/excepciones de la función de destino personalizada. De hecho, 'threading.Thread' debe ampliarse para ofrecerlo de la caja, ya que sería compatible con el antiguo comportamiento" return None ".) – kxr

1

Bueno, en el módulo de subprocesamiento de Python, hay objetos de condición que están asociados a bloqueos. Un método acquire() devolverá cualquier valor devuelto por el método subyacente. Para más información: Python Condition Objects

5

Otro enfoque! es pasar una función de devolución de llamada al hilo. Esto proporciona una forma simple, segura y flexible de devolver un valor al padre, en cualquier momento del nuevo hilo.

# A sample implementation 

import threading 
import time 

class MyThread(threading.Thread): 
    def __init__(self, cb): 
     threading.Thread.__init__(self) 
     self.callback = cb 

    def run(self): 
     for i in range(10): 
      self.callback(i) 
      time.sleep(1) 


# test 

import sys 

def count(x): 
    print x 
    sys.stdout.flush() 

t = MyThread(count) 
t.start() 
+8

El problema es que la devolución de llamada aún se ejecuta en el hilo hijo, en lugar de en el hilo original. – babbageclunk

+0

@wilberforce ¿podría explicarnos qué problemas puede causar? –

+4

Ok. Un ejemplo sería si la devolución de llamada escribe en un archivo de registro al que también se escribe el subproceso principal mientras se está ejecutando el subproceso. Como la devolución de llamada se ejecuta en el subproceso secundario, existe el riesgo de que las dos escrituras ocurran al mismo tiempo y se produzcan colisiones: podría producirse una salida confusa o intercalada, o un bloqueo si el marco de trabajo de registro realizara alguna contabilidad interna. Usar una cola segura para subprocesos y tener un hilo de rosca hace que todo lo que se escriba evite esto. Este tipo de problemas pueden ser desagradables porque no son deterministas: pueden aparecer solo en la producción y pueden ser difíciles de reproducir. – babbageclunk

12

Si estuviera llamando a unirse() para esperar a que el hilo para completar, se puede simplemente conectar el resultado de la propia instancia de rosca y luego recuperarla desde el hilo principal después de la unión retornos().

Por otro lado, no nos dice cómo piensa descubrir que el hilo está hecho y que el resultado está disponible. Si ya tiene una forma de hacerlo, probablemente le indique a usted (y a nosotros, si nos lo diga) la mejor forma de obtener los resultados.

+0

* simplemente podría adjuntar el resultado a la propia instancia de Thread * ¿Cómo pasa la instancia de Thread al objetivo que se ejecuta para que el objetivo pueda adjuntar el resultado a esta instancia? –

+1

Piotr Dobrogost, si no está subclasificando Thread para su instancia, puede usar threading.current_thread() desde el final de su destino invocable. Llamaría eso un poco feo, pero el enfoque de Alex siempre fue el más elegante. Este es simplemente más conveniente en algunos casos. –

+3

Sería bueno si 'join()' devolviera lo que sea que devuelva el método llamado ... parece tonto que en cambio devuelva 'None'. – ArtOfWarfare

10

Debe pasar una instancia de Queue como un parámetro, luego debe poner() su objeto de retorno en la cola. Puede recopilar el valor de retorno a través de queue.get() cualquier objeto que ponga.

muestra:

queue = Queue.Queue() 
thread_ = threading.Thread(
       target=target_method, 
       name="Thread1", 
       args=[params, queue], 
       ) 
thread_.start() 
thread_.join() 
queue.get() 

def target_method(self, params, queue): 
""" 
Some operations right here 
""" 
your_return = "Whatever your object is" 
queue.put(your_return) 

uso de múltiples hilos:

#Start all threads in thread pool 
    for thread in pool: 
     thread.start() 
     response = queue.get() 
     thread_results.append(response) 

#Kill all threads 
    for thread in pool: 
     thread.join() 

Yo uso esta aplicación y funciona muy bien para mí. Desearía que lo hicieras.

+1

¿No extrañas tu thread_.start() ?? – sadmicrowave

+1

Por supuesto, empiezo el hilo de rosca que solo extraño poner la línea aquí :) Gracias por avisarme. –

+0

¿Cómo sería esto si tuviera múltiples hilos? que.get() devuelve el resultado de un hilo solo para mí? – ABros

6

Uso lambda para envolver su función de subproceso de destino y pasar su valor de retorno de nuevo a la rosca de los padres usando una cola. (La función de su objetivo original se mantiene sin cambios y sin parámetro de cola adicional.) Código

muestra:

import threading 
import queue 
def dosomething(param): 
    return param * 2 
que = queue.Queue() 
thr = threading.Thread(target = lambda q, arg : q.put(dosomething(arg)), args = (que, 2)) 
thr.start() 
thr.join() 
while not que.empty(): 
    print(que.get()) 

Salida:

4 
2

POC:

import random 
import threading 

class myThread(threading.Thread): 
    def __init__(self, arr): 
     threading.Thread.__init__(self) 
     self.arr = arr 
     self.ret = None 

    def run(self): 
     self.myJob(self.arr) 

    def join(self): 
     threading.Thread.join(self) 
     return self.ret 

    def myJob(self, arr): 
     self.ret = sorted(self.arr) 
     return 

#Call the main method if run from the command line. 
if __name__ == '__main__': 
    N = 100 

    arr = [ random.randint(0, 100) for x in range(N) ] 
    th = myThread(arr) 
    th.start() 
    sortedArr = th.join() 

    print "arr2: ", sortedArr 
3

Puede utilizar sincronizada queue módulo.
Considérese es necesario comprobar una base de datos de usuarios de informaciones con una identificación conocida:

def check_infos(user_id, queue): 
    result = send_data(user_id) 
    queue.put(result) 

Ahora puede obtener los datos de la siguiente manera:

import queue, threading 
queued_request = queue.Queue() 
check_infos_thread = threading.Thread(target=check_infos, args=(user_id, queued_request)) 
check_infos_thread.start() 
final_result = queued_request.get() 
6

me sorprende que nadie ha mencionado que sólo podía pasarlo mutable:

>>> thread_return={'success': False} 
>>> from threading import Thread 
>>> def task(thread_return): 
... thread_return['success'] = True 
... 
>>> Thread(target=task, args=(thread_return,)).start() 
>>> thread_return 
{'success': True} 

quizás esto tiene problemas importantes de los cuales no estoy al tanto.

+1

¡Esto funciona perfectamente! Realmente me gustaría escuchar alguna opinión sobre las cosas que faltan con este enfoque, si es que hay alguno. –

+0

funciona. Es feo especializar una función existente, y esas muchas cosas confusas (legibilidad), ver comentario en la primera respuesta. – kxr

+0

¿Qué tal con el hilo múltiple? – backslash112

0

La siguiente función de contenedor envolverá una función existente y devolver un objeto que apunta tanto a la rosca (por lo que se puede llamar start(), join(), etc. en ella), así como el acceso/ver su valor de retorno eventual.

def threadwrap(func,args,kwargs): 
    class res(object): result=None 
    def inner(*args,**kwargs): 
    res.result=func(*args,**kwargs) 
    import threading 
    t = threading.Thread(target=inner,args=args,kwargs=kwargs) 
    res.thread=t 
    return res 

def myFun(v,debug=False): 
    import time 
    if debug: print "Debug mode ON" 
    time.sleep(5) 
    return v*2 

x=threadwrap(myFun,[11],{"debug":True}) 
x.thread.start() 
x.thread.join() 
print x.result 

Se ve bien, y la clase threading.Thread parece extenderse fácilmente (*) con este tipo de funcionalidad, por lo que me pregunto por qué no está ya allí. ¿Hay algún defecto con el método anterior?

(*) Tenga en cuenta que la respuesta de husanu para esta pregunta hace exactamente esto, subclasificando threading.Thread dando como resultado una versión donde join() da el valor de retorno.

1

Según la sugerencia de jcomeau_ictx. El más simple que encontré. El requisito aquí era obtener el estado de salida de tres procesos diferentes que se ejecutan en el servidor y desencadenar otro script si los tres son exitosos. Esto parece estar funcionando bien

class myThread(threading.Thread): 
     def __init__(self,threadID,pipePath,resDict): 
      threading.Thread.__init__(self) 
      self.threadID=threadID 
      self.pipePath=pipePath 
      self.resDict=resDict 

     def run(self): 
      print "Starting thread %s " % (self.threadID) 
      if not os.path.exists(self.pipePath): 
      os.mkfifo(self.pipePath) 
      pipe_fd = os.open(self.pipePath, os.O_RDWR | os.O_NONBLOCK) 
      with os.fdopen(pipe_fd) as pipe: 
       while True: 
        try: 
        message = pipe.read() 
        if message: 
         print "Received: '%s'" % message 
         self.resDict['success']=message 
         break 
        except: 
         pass 

    tResSer={'success':'0'} 
    tResWeb={'success':'0'} 
    tResUisvc={'success':'0'} 


    threads = [] 

    pipePathSer='/tmp/path1' 
    pipePathWeb='/tmp/path2' 
    pipePathUisvc='/tmp/path3' 

    th1=myThread(1,pipePathSer,tResSer) 
    th2=myThread(2,pipePathWeb,tResWeb) 
    th3=myThread(3,pipePathUisvc,tResUisvc) 

    th1.start() 
    th2.start() 
    th3.start() 

    threads.append(th1) 
    threads.append(th2) 
    threads.append(th3) 

    for t in threads: 
     print t.join() 

    print "Res: tResSer %s tResWeb %s tResUisvc %s" % (tResSer,tResWeb,tResUisvc) 
    # The above statement prints updated values which can then be further processed 
Cuestiones relacionadas