2008-09-17 22 views
13

No pude encontrar una implementación ThreadPool decente para Ruby, así que escribí la mía (basada parcialmente en el código de aquí: http://snippets.dzone.com/posts/show/3276, pero cambié a esperar/señalizar y otra implementación para el apagado de ThreadPool. corriendo (que tiene 100 hilos y gastos de alrededor de 1300 tareas), se muere con el callejón sin salida en la línea 25 -. espera a que un nuevo trabajo allí cualquier idea, por lo que podría sucederPunto muerto en ThreadPool

require 'thread' 
begin 
    require 'fastthread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(callback) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @callback = callback 
     @mutex.synchronize {@running = true} 
     @thread = Thread.new do 
     while @mutex.synchronize {@running} 
      block = get_block 
      if block 
      block.call 
      reset_block 
      # Signal the ThreadPool that this worker is ready for another job 
      @callback.signal 
      else 
      # Wait for a new job 
      @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25? 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @mutex.synchronize {@block} 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @mutex.synchronize {@block = nil} 
    end 

    def busy? 
     @mutex.synchronize {[email protected]?} 
    end 

    def stop 
     @mutex.synchronize {@running = false} 
     # Signal the thread not to wait for a new job 
     @cv.signal 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @workers = [] 
    @mutex = Mutex.new 
    @cv = ConditionVariable.new 
    end 

    def size 
    @mutex.synchronize {@workers.size} 
    end 

    def busy? 
    @mutex.synchronize {@workers.any? {|w| w.busy?}} 
    end 

    def shutdown 
    @mutex.synchronize {@workers.each {|w| w.stop}} 
    end 
    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    while true 
     @mutex.synchronize do 
     worker = get_worker 
     if worker 
      return worker.set_block(block) 
     else 
      # Wait for a free worker 
      @cv.wait(@mutex) 
     end 
     end 
    end 
    end 

    # Used by workers to report ready status 
    def signal 
    @cv.signal 
    end 

    private 
    def get_worker 
    free_worker || create_worker 
    end 

    def free_worker 
    @workers.each {|w| return w unless w.busy?}; nil 
    end 

    def create_worker 
    return nil if @workers.size >= @max_size 
    worker = Worker.new(self) 
    @workers << worker 
    worker 
    end 
end 

Respuesta

10

Ok, entonces el principal problema con la implementación es: ¿cómo asegurarse de que no se pierda ninguna señal y evitar bloqueos muertos?

En mi experiencia, esto es REALMENTE difícil de lograr con variables de condición y mutex, pero es fácil con semáforos. Sucede que Ruby implementa un objeto llamado Queue (o SizedQueue) que debería resolver el problema.Aquí está mi aplicación sugerida:

require 'thread' 
begin 
    require 'fasttread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(thread_queue) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @queue = thread_queue 
     @running = true 
     @thread = Thread.new do 
     @mutex.synchronize do 
      while @running 
      @cv.wait(@mutex) 
      block = get_block 
      if block 
       @mutex.unlock 
       block.call 
       @mutex.lock 
       reset_block 
      end 
      @queue << self 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @block 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @block = nil 
    end 

    def busy? 
     @mutex.synchronize { [email protected]? } 
    end 

    def stop 
     @mutex.synchronize do 
     @running = false 
     @cv.signal 
     end 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @queue = Queue.new 
    @workers = [] 
    end 

    def size 
    @workers.size 
    end 

    def busy? 
    @queue.size < @workers.size 
    end 

    def shutdown 
    @workers.each { |w| w.stop } 
    @workers = [] 
    end 

    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    worker = get_worker 
    worker.set_block(block) 
    end 

    private 

    def get_worker 
    if [email protected]? or @workers.size == @max_size 
     return @queue.pop 
    else 
     worker = Worker.new(@queue) 
     @workers << worker 
     worker 
    end 
    end 

end 

Y aquí es un código simple prueba:

tp = ThreadPool.new 500 
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } 
tp.shutdown 
+0

1. ¿No debería sincronizarse el acceso a @workers? 2. ¿Por qué todavía hay necesidad de bloquear y desbloquear en el hilo de trabajo? – Roman

+0

El acceso al trabajador siempre se realiza desde el mismo hilo ... por lo que no es necesaria la sincronización. En cuanto al bloqueo en el hilo de trabajo, necesita que despierten el hilo de forma segura. – PierreBdR

+0

Todavía hay un problema con esto: existe la posibilidad de un punto muerto: cuando el hilo de trabajo se agrega a la cola, el ThreadPool puede sacarlo de la cola y asignarle una tarea. En ese caso, se enviará una señal. Sin embargo, si el subproceso de trabajo no está esperando en un cv, la señal se perderá. – Roman

1

estoy poco parcial aquí,? pero sugiero modelar esto en algún lenguaje de proceso y verificarlo en el modelo. Las herramientas libremente disponibles son, por ejemplo, el conjunto de herramientas mCRL2 (usando un lenguaje basado en ACP), Mobility Workbench (pi-cálculo) y Spin (PROMELA).

De lo contrario, sugeriría eliminar cada bit de código que no es esencial para el problema y encontrar un caso mínimo donde se produce el interbloqueo. Dudo que los 100 hilos y las 1300 tareas sean esenciales para lograr un punto muerto. Con un caso más pequeño, probablemente solo pueda agregar algunas impresiones de depuración que brinden suficiente información para resolver el problema.

+0

El código en cuestión solo falló al procesar 1300 tareas de 180000, no pudo reproducirlo con un conjunto más pequeño, desafortunadamente ... – Roman

1

Bien, el problema parece estar en su método de señal ThreadPool #. Lo que puede ocurrir es:

1 - Todo a su trabajador están ocupados y se intenta procesar un nuevo trabajo

2 - línea 90 obtiene un trabajador nula

3 - un trabajador obtener liberados y señales, pero la señal se pierde porque el ThreadPool no lo está esperando

4 - pasa a la línea 95, esperando a pesar de que hay un trabajador libre.

El error aquí es que puede señalar a un trabajador gratuito incluso cuando nadie está escuchando. Este método de señal ThreadPool # debe ser:

def signal 
    @mutex.synchronize { @cv.signal } 
end 

Y el problema es el mismo en el objeto Worker. Lo que podría ocurrir es:

1 - El trabajador acaba de terminar un trabajo

2 - Se comprueba (línea 17) si hay una espera de trabajo: no hay

3 - El envío del grupo de subprocesos un nuevo trabajo y señales ... pero la señal se pierde

4 - la espera trabajador por una señal, a pesar de que está marcado como ocupado

Usted debe poner su método initialize como:

def initialize(callback) 
    @mutex = Mutex.new 
    @cv = ConditionVariable.new 
    @callback = callback 
    @mutex.synchronize {@running = true} 
    @thread = Thread.new do 
    @mutex.synchronize do 
     while @running 
     block = get_block 
     if block 
      @mutex.unlock 
      block.call 
      @mutex.lock 
      reset_block 
      # Signal the ThreadPool that this worker is ready for another job 
      @callback.signal 
     else 
      # Wait for a new job 
      @cv.wait(@mutex) 
     end 
     end 
    end 
    end 
end 

A continuación, los métodos Worker # get_block y Worker # reset_block ya no se deben sincronizar. De esta forma, no se puede asignar un bloque a un trabajador entre la prueba de un bloque y la espera de una señal.

+0

¡Creo que tienes razón! Voy a probar esto de inmediato, ¡gracias! – Roman

+0

Hmm ... ahora hay un punto muerto cuando estoy esperando que se completen los subprocesos (por ejemplo, llamar para unirse al ThreadPool). Estoy tratando de descubrir por qué. – Roman

8

Usted puede tratar de la gema work_queue, diseñado para coordinar el trabajo entre un productor y un grupo de subprocesos de trabajo.