2011-01-31 14 views
43

Intentando usar TThreadedQueue (Generics.Collections) en un esquema de consumidor múltiple de un solo productor. (Delphi-XE). La idea es insertar objetos en una cola y dejar que varios hilos de trabajo agoten la cola.TThreadedQueue no es capaz de múltiples consumidores?

Aunque no funciona como se esperaba. Cuando dos o más subprocesos de trabajo están llamando a PopItem, las violaciones de acceso se producen desde TThreadedQueue.

Si la llamada a PopItem está serializada con una sección crítica, todo está bien.

Seguramente el TThreadedQueue debería ser capaz de manejar a múltiples consumidores, entonces ¿me estoy perdiendo algo o es esto un error en TThreadedQueue?

Aquí hay un ejemplo simple para producir el error.

program TestThreadedQueue; 

{$APPTYPE CONSOLE} 

uses 
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas', 
    Windows, 
    Messages, 
    Classes, 
    SysUtils, 
    SyncObjs, 
    Generics.Collections; 

type TThreadTaskMsg = 
     class(TObject) 
     private 
      threadID : integer; 
      threadMsg : string; 
     public 
      Constructor Create(ID : integer; const msg : string); 
     end; 

type TThreadReader = 
     class(TThread) 
     private 
      fPopQueue : TThreadedQueue<TObject>; 
      fSync  : TCriticalSection; 
      fMsg  : TThreadTaskMsg; 
      fException : Exception; 
      procedure DoSync; 
      procedure DoHandleException; 
     public 
      Constructor Create(popQueue : TThreadedQueue<TObject>; 
           sync  : TCriticalSection); 
      procedure Execute; override; 
     end; 

Constructor TThreadReader.Create(popQueue : TThreadedQueue<TObject>; 
            sync  : TCriticalSection); 
begin 
    fPopQueue:=   popQueue; 
    fMsg:=     nil; 
    fSync:=    sync; 
    Self.FreeOnTerminate:= FALSE; 
    fException:=   nil; 

    Inherited Create(FALSE); 
end; 

procedure TThreadReader.DoSync ; 
begin 
    WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); 
end; 

procedure TThreadReader.DoHandleException; 
begin 
    WriteLn('Exception ->' + fException.Message); 
end; 

procedure TThreadReader.Execute; 
var signal : TWaitResult; 
begin 
    NameThreadForDebugging('QueuePop worker'); 
    while not Terminated do 
    begin 
    try 
     {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } 
     Sleep(20); 
     {- Serializing calls to PopItem works } 
     if Assigned(fSync) then fSync.Enter; 
     try 
     signal:= fPopQueue.PopItem(TObject(fMsg)); 
     finally 
     if Assigned(fSync) then fSync.Release; 
     end; 
     if (signal = wrSignaled) then 
     begin 
     try 
      if Assigned(fMsg) then 
      begin 
      fMsg.threadMsg:= '<Thread id :' +IntToStr(Self.threadId) + '>'; 
      fMsg.Free; // We are just dumping the message in this test 
      //Synchronize(Self.DoSync); 
      //PostMessage(fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); 
      end; 
     except 
      on E:Exception do begin 
      end; 
     end; 
     end; 
     except 
     FException:= Exception(ExceptObject); 
     try 
     if not (FException is EAbort) then 
     begin 
      {Synchronize(} DoHandleException; //); 
     end; 
     finally 
     FException:= nil; 
     end; 
    end; 
    end; 
end; 

Constructor TThreadTaskMsg.Create(ID : Integer; Const msg : string); 
begin 
    Inherited Create; 

    threadID:= ID; 
    threadMsg:= msg; 
end; 

var 
    fSync : TCriticalSection; 
    fThreadQueue : TThreadedQueue<TObject>; 
    fReaderArr : array[1..4] of TThreadReader; 
    i : integer; 

begin 
    try 
    IsMultiThread:= TRUE; 

    fSync:=  TCriticalSection.Create; 
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100); 
    try 
     {- Calling without fSync throws exceptions when two or more threads calls PopItem 
     at the same time } 
     WriteLn('Creating worker threads ...'); 
     for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,Nil); 
     {- Calling with fSync works ! } 
     //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,fSync); 
     WriteLn('Init done. Pushing items ...'); 

     for i:= 1 to 100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 

     ReadLn; 

    finally 
     for i:= 1 to 4 do fReaderArr[i].Free; 
     fThreadQueue.Free; 
     fSync.Free; 
    end; 

    except 
    on E: Exception do 
     begin 
     Writeln(E.ClassName, ': ', E.Message); 
     ReadLn; 
     end; 
    end; 
end. 

actualización: El error en TMonitor que causó TThreadedQueue se bloquee, se fija en Delphi XE2.

Actualización 2: La prueba anterior destacó la cola en el estado vacío. Darian Miller descubrió que al hacer hincapié en la cola en estado completo, aún podría reproducir el error en XE2. El error una vez más está en el TMonitor. Consulte su respuesta a continuación para obtener más información. Y también un enlace al QC101114.

Actualización 3: Con la actualización de Delphi XE2-4 hubo un arreglo anunciado para TMonitor que curar los problemas en TThreadedQueue. Mis pruebas hasta el momento ya no pueden reproducir ningún error en TThreadedQueue. Hilos probados de un solo productor/consumidor múltiple cuando la cola está vacía y llena. También probado múltiples productores/consumidores múltiples. Varié los hilos del lector y los hilos del escritor del 1 al 100 sin ningún problema. Pero conociendo la historia, me atrevo a otros a romper TMonitor.

+4

Hi LU RD! Bienvenido a StackOverflow. Esta es una buena pregunta que tiene, pero podría ser más fácil probar si el código se publicó de una manera un poco diferente. Has incluido la mitad del formulario .pas sin el correspondiente DFM, y eso hace que sea más difícil para nosotros duplicar e investigar. El problema no parece estar relacionado con la interfaz de usuario, así que, ¿hay alguna forma de que puedas reducir esto a una aplicación de consola? Gracias. –

+0

Mason, aplicación de consola hecha. –

+1

Todavía hay problemas en XE2 ... –

Respuesta

19

Bueno, es difícil estar seguro sin muchas pruebas, pero ciertamente parece que se trata de un error, ya sea en TThreadedQueue o en TMonitor. De cualquier forma está en el RTL y no en tu código. Debería presentar esto como un informe de control de calidad y usar su ejemplo anterior como el código "cómo reproducir".

+0

Mason, gracias. Lo controlaré mañana, a menos que alguien más tenga una opinión diferente. Parece que el error está en el TMonitor. –

+7

QC# 91246 TThreadedQueue falla con múltiples consumidores. Vota por eso si quieres. –

+5

LInk al QCReport: [http://qc.embarcadero.com/wc/qcmain.aspx?d=91246](http://qc.embarcadero.com/wc/qcmain.aspx?d=91246) – jachguate

10

Te recomiendo que uses OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary al trabajar con hilos, paralelismo, etc. Primoz hizo un muy buen trabajo, y en el sitio encontrarás mucha documentación útil.

+1

Conozco bien la OmniThreadLibrary y también las AsyncCalls de Andreas Hausladenhttp: //andy.jgknet.de/blog/bugfixunits/asynccalls-29-asynchronous-function-calls/. –

1

No creo que TThreadedQueue soporte a múltiples consumidores. Es un FIFO, según el archivo de ayuda. Tengo la impresión de que hay un hilo que empuja y otro (¡solo uno!) Explotando.

+8

FIFO es solo una forma de decir cómo se vacía la cola. No significa que solo puede haber un hilo extrayendo trabajos de la cola. Especialmente no cuando se llama * ThreadedQueue *. –

+2

Se llama ThreadedQueue porque el empujador y el popper pueden estar en diferentes hilos. En el mundo de los hilos múltiples, nada es gratis, por lo tanto, creo que los documentos habrían mencionado soporte para múltiples productores y/o consumidores si estuviera disponible. No se menciona, así que creo que no debería funcionar. – Giel

+3

la cola está protegida por un monitor. El monitor en sí debe ser seguro en un entorno de subprocesos múltiples. Si la cola no era segura para múltiples consumidores, al menos debería lanzar una excepción que podría ser atrapada. –

3

Busqué la clase TThreadedQueue pero no parece tenerla en mi D2009. No voy a suicidarme exactamente con esto: el soporte del hilo Delphi siempre ha sido erróneo ... errm ... 'no óptimo' y sospecho que TThreadedQueue no es diferente :)

¿Por qué usar generics para PC? Objeto/Consumidor) ¿objetos? Un simple descendiente TObjectQueue hará muy bien - estado utilizando durante décadas - funciona bien con varios productores/consumidores:

unit MinimalSemaphorePCqueue; 

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore. 

The semaphore count reflects the queue count 
'push' will always succeed unless memory runs out, then you're stuft anyway. 
'pop' has a timeout parameter as well as the address of where any received 
object is to be put. 
'pop' returns immediately with 'true' if there is an object on the queue 
available for it. 
'pop' blocks the caller if the queue is empty and the timeout is not 0. 
'pop' returns false if the timeout is exceeded before an object is available 
from the queue. 
'pop' returns true if an object is available from the queue before the timeout 
is exceeded. 
If multiple threads have called 'pop' and are blocked because the queue is 
empty, a single 'push' will make only one of the waiting threads ready. 


Methods to push/pop from the queue 
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. 
When the handle is signaled, the 'peek' method will retrieve the queued object. 
} 
interface 

uses 
    Windows, Messages, SysUtils, Classes,syncObjs,contnrs; 


type 

pObject=^Tobject; 


TsemaphoreMailbox=class(TobjectQueue) 
private 
    countSema:Thandle; 
protected 
    access:TcriticalSection; 
public 
    property semaHandle:Thandle read countSema; 
    constructor create; virtual; 
    procedure push(aObject:Tobject); virtual; 
    function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; 
    function peek(pResObject:pObject):boolean; virtual; 
    destructor destroy; override; 
end; 


implementation 

{ TsemaphoreMailbox } 

constructor TsemaphoreMailbox.create; 
begin 
{$IFDEF D2009} 
    inherited Create; 
{$ELSE} 
    inherited create; 
{$ENDIF} 
    access:=TcriticalSection.create; 
    countSema:=createSemaphore(nil,0,maxInt,nil); 
end; 

destructor TsemaphoreMailbox.destroy; 
begin 
    access.free; 
    closeHandle(countSema); 
    inherited; 
end; 

function TsemaphoreMailbox.pop(pResObject: pObject; 
    timeout: DWORD): boolean; 
// dequeues an object, if one is available on the queue. If the queue is empty, 
// the caller is blocked until either an object is pushed on or the timeout 
// period expires 
begin // wait for a unit from the semaphore 
    result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); 
    if result then // if a unit was supplied before the timeout, 
    begin 
    access.acquire; 
    try 
     pResObject^:=inherited pop; // get an object from the queue 
    finally 
     access.release; 
    end; 
    end; 
end; 

procedure TsemaphoreMailbox.push(aObject: Tobject); 
// pushes an object onto the queue. If threads are waiting in a 'pop' call, 
// one of them is made ready. 
begin 
    access.acquire; 
    try 
    inherited push(aObject); // shove the object onto the queue 
    finally 
    access.release; 
    end; 
    releaseSemaphore(countSema,1,nil); // release one unit to semaphore 
end; 

function TsemaphoreMailbox.peek(pResObject: pObject): boolean; 
begin 
    access.acquire; 
    try 
    result:=(count>0); 
    if result then pResObject^:=inherited pop; // get an object from the queue 
    finally 
    access.release; 
    end; 
end; 
end. 
+0

gracias por su respuesta. Vi la clase TThreadedQueue en la documentación para XE e hice una prueba simple para una aplicación real que tenía. Esta fue mi primera oportunidad de genéricos y no resultó bien. Como puede ver en otros comentarios, el error está en la clase TMonitor, lo que tendrá implicaciones si alguien construye una aplicación paralela multiproceso. Mi implementación terminó usando una cola simple protegida con una sección crítica para empujar y estallar. –

4

Su ejemplo parece funcionar bien en XE2, pero si llenamos la cola falla con AV en una PushItem.(Probado bajo XE2 Update1)

reproducir, simplemente aumentar su creación tarea desde 100 a 1100 (su profundidad de la cola se fijó en 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 

Esta muere por mí cada vez en Windows 7. Al principio me Probé un empujón continuo para probarlo y falló en el ciclo 30 ... luego en el ciclo 16 ... luego en el 65, por lo que a diferentes intervalos, pero falló sistemáticamente en algún momento.

iLoop := 0; 
    while iLoop < 1000 do 
    begin 
    Inc(iLoop); 
    WriteLn('Loop: ' + IntToStr(iLoop)); 
    for i:= 1 to 100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 
    end; 
+0

Oh no, en algún momento temí que esto también podría ser un punto de ruptura, al igual que cuando la cola estaba vacía. Incluso hice un comentario sobre esto en otra publicación en SO. Tonto de mi parte no haberlo probado. Haré algunas pruebas más para confirmarlo. –

+0

Yepp, falla consistentemente en Windows 7 de 64 bits (Actualización XE2 2), ambos en 32 y 64 bit.exe. ¿Lo controlarás o lo haré? –

+0

Informó como [QC101114] (http://qc.embarcadero.com/wc/qcmain.aspx?d=101114) –

Cuestiones relacionadas