No pude encontrar una implementación ThreadPool decente para Ruby, así que escribí la mía (basada parcialmente en el código de aquí: http://snippets.dzone.com/posts/show/3276, pero cambié a esperar/señalizar y otra implementación para el apagado de ThreadPool. corriendo (que tiene 100 hilos y gastos de alrededor de 1300 tareas), se muere con el callejón sin salida en la línea 25 -. espera a que un nuevo trabajo allí cualquier idea, por lo que podría sucederPunto muerto en ThreadPool
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {[email protected]?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
1. ¿No debería sincronizarse el acceso a @workers? 2. ¿Por qué todavía hay necesidad de bloquear y desbloquear en el hilo de trabajo? – Roman
El acceso al trabajador siempre se realiza desde el mismo hilo ... por lo que no es necesaria la sincronización. En cuanto al bloqueo en el hilo de trabajo, necesita que despierten el hilo de forma segura. – PierreBdR
Todavía hay un problema con esto: existe la posibilidad de un punto muerto: cuando el hilo de trabajo se agrega a la cola, el ThreadPool puede sacarlo de la cola y asignarle una tarea. En ese caso, se enviará una señal. Sin embargo, si el subproceso de trabajo no está esperando en un cv, la señal se perderá. – Roman