Intentando usar TThreadedQueue (Generics.Collections) en un esquema de consumidor múltiple de un solo productor. (Delphi-XE). La idea es insertar objetos en una cola y dejar que varios hilos de trabajo agoten la cola.TThreadedQueue no es capaz de múltiples consumidores?
Aunque no funciona como se esperaba. Cuando dos o más subprocesos de trabajo están llamando a PopItem, las violaciones de acceso se producen desde TThreadedQueue.
Si la llamada a PopItem está serializada con una sección crítica, todo está bien.
Seguramente el TThreadedQueue debería ser capaz de manejar a múltiples consumidores, entonces ¿me estoy perdiendo algo o es esto un error en TThreadedQueue?
Aquí hay un ejemplo simple para producir el error.
program TestThreadedQueue;
{$APPTYPE CONSOLE}
uses
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
Windows,
Messages,
Classes,
SysUtils,
SyncObjs,
Generics.Collections;
type TThreadTaskMsg =
class(TObject)
private
threadID : integer;
threadMsg : string;
public
Constructor Create(ID : integer; const msg : string);
end;
type TThreadReader =
class(TThread)
private
fPopQueue : TThreadedQueue<TObject>;
fSync : TCriticalSection;
fMsg : TThreadTaskMsg;
fException : Exception;
procedure DoSync;
procedure DoHandleException;
public
Constructor Create(popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
procedure Execute; override;
end;
Constructor TThreadReader.Create(popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
begin
fPopQueue:= popQueue;
fMsg:= nil;
fSync:= sync;
Self.FreeOnTerminate:= FALSE;
fException:= nil;
Inherited Create(FALSE);
end;
procedure TThreadReader.DoSync ;
begin
WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;
procedure TThreadReader.DoHandleException;
begin
WriteLn('Exception ->' + fException.Message);
end;
procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
NameThreadForDebugging('QueuePop worker');
while not Terminated do
begin
try
{- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
Sleep(20);
{- Serializing calls to PopItem works }
if Assigned(fSync) then fSync.Enter;
try
signal:= fPopQueue.PopItem(TObject(fMsg));
finally
if Assigned(fSync) then fSync.Release;
end;
if (signal = wrSignaled) then
begin
try
if Assigned(fMsg) then
begin
fMsg.threadMsg:= '<Thread id :' +IntToStr(Self.threadId) + '>';
fMsg.Free; // We are just dumping the message in this test
//Synchronize(Self.DoSync);
//PostMessage(fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
end;
except
on E:Exception do begin
end;
end;
end;
except
FException:= Exception(ExceptObject);
try
if not (FException is EAbort) then
begin
{Synchronize(} DoHandleException; //);
end;
finally
FException:= nil;
end;
end;
end;
end;
Constructor TThreadTaskMsg.Create(ID : Integer; Const msg : string);
begin
Inherited Create;
threadID:= ID;
threadMsg:= msg;
end;
var
fSync : TCriticalSection;
fThreadQueue : TThreadedQueue<TObject>;
fReaderArr : array[1..4] of TThreadReader;
i : integer;
begin
try
IsMultiThread:= TRUE;
fSync:= TCriticalSection.Create;
fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
try
{- Calling without fSync throws exceptions when two or more threads calls PopItem
at the same time }
WriteLn('Creating worker threads ...');
for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,Nil);
{- Calling with fSync works ! }
//for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,fSync);
WriteLn('Init done. Pushing items ...');
for i:= 1 to 100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,''));
ReadLn;
finally
for i:= 1 to 4 do fReaderArr[i].Free;
fThreadQueue.Free;
fSync.Free;
end;
except
on E: Exception do
begin
Writeln(E.ClassName, ': ', E.Message);
ReadLn;
end;
end;
end.
actualización: El error en TMonitor que causó TThreadedQueue se bloquee, se fija en Delphi XE2.
Actualización 2: La prueba anterior destacó la cola en el estado vacío. Darian Miller descubrió que al hacer hincapié en la cola en estado completo, aún podría reproducir el error en XE2. El error una vez más está en el TMonitor. Consulte su respuesta a continuación para obtener más información. Y también un enlace al QC101114.
Actualización 3: Con la actualización de Delphi XE2-4 hubo un arreglo anunciado para TMonitor
que curar los problemas en TThreadedQueue
. Mis pruebas hasta el momento ya no pueden reproducir ningún error en TThreadedQueue
. Hilos probados de un solo productor/consumidor múltiple cuando la cola está vacía y llena. También probado múltiples productores/consumidores múltiples. Varié los hilos del lector y los hilos del escritor del 1 al 100 sin ningún problema. Pero conociendo la historia, me atrevo a otros a romper TMonitor
.
Hi LU RD! Bienvenido a StackOverflow. Esta es una buena pregunta que tiene, pero podría ser más fácil probar si el código se publicó de una manera un poco diferente. Has incluido la mitad del formulario .pas sin el correspondiente DFM, y eso hace que sea más difícil para nosotros duplicar e investigar. El problema no parece estar relacionado con la interfaz de usuario, así que, ¿hay alguna forma de que puedas reducir esto a una aplicación de consola? Gracias. –
Mason, aplicación de consola hecha. –
Todavía hay problemas en XE2 ... –