2009-05-12 18 views
8

Tengo un problema con el monitor entrelazado. Espere y supervise.Pulse en un servidor TCP multiproceso. Para demostrar mis problemas, aquí está mi código de servidor:condición de carrera Monitor.Wait/Pulse en un servidor multiproceso

public class Server 
{ 
    TcpListener listener; 
    Object sync; 
    IHandler handler; 
    bool running; 

    public Server(IHandler handler, int port) 
    { 
     this.handler = handler; 
     IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; 
     listener = new TcpListener(address, port); 
     sync = new Object(); 
     running = false; 
    } 

    public void Start() 
    { 
     Thread thread = new Thread(ThreadStart); 
     thread.Start(); 
    } 

    public void Stop() 
    { 
     lock (sync) 
     { 
      listener.Stop(); 
      running = false; 
      Monitor.Pulse(sync); 
     } 
    } 

    void ThreadStart() 
    { 
     if (!running) 
     { 
      listener.Start(); 
      running = true; 
      lock (sync) 
      { 
       while (running) 
       { 
        try 
        { 
         listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); 
         Monitor.Wait(sync); // Release lock and wait for a pulse 
        } 
        catch (Exception e) 
        { 
         Console.WriteLine(e.Message); 
        } 
       } 
      } 
     } 
    } 

    void Accept(IAsyncResult result) 
    { 
     // Let the server continue listening 
     lock (sync) 
     { 
      Monitor.Pulse(sync); 
     } 

     if (running) 
     { 
      TcpListener listener = (TcpListener)result.AsyncState; 
      using (TcpClient client = listener.EndAcceptTcpClient(result)) 
      { 
       handler.Handle(client.GetStream()); 
      } 
     } 
    } 
} 

Y aquí es mi código de cliente:

class Client 
{ 
    class EchoHandler : IHandler 
    { 
     public void Handle(Stream stream) 
     { 
      System.Console.Out.Write("Echo Handler: "); 
      StringBuilder sb = new StringBuilder(); 
      byte[] buffer = new byte[1024]; 
      int count = 0; 
      while ((count = stream.Read(buffer, 0, 1024)) > 0) 
      { 
       sb.Append(Encoding.ASCII.GetString(buffer, 0, count)); 
      } 
      System.Console.Out.WriteLine(sb.ToString()); 
      System.Console.Out.Flush(); 
     } 
    } 

    static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; 

    public static int Main() 
    { 
     Server server1 = new Server(new EchoHandler(), 1000); 
     Server server2 = new Server(new EchoHandler(), 1001); 

     server1.Start(); 
     server2.Start(); 

     Console.WriteLine("Press return to test..."); 
     Console.ReadLine(); 

     // Note interleaved ports 
     SendMsg("Test1", 1000); 
     SendMsg("Test2", 1001); 
     SendMsg("Test3", 1000); 
     SendMsg("Test4", 1001); 
     SendMsg("Test5", 1000); 
     SendMsg("Test6", 1001); 
     SendMsg("Test7", 1000); 

     Console.WriteLine("Press return to terminate..."); 
     Console.ReadLine(); 

     server1.Stop(); 
     server2.Stop(); 

     return 0; 
    } 

    public static void SendMsg(String msg, int port) 
    { 
     IPEndPoint endPoint = new IPEndPoint(localhost, port); 

     byte[] buffer = Encoding.ASCII.GetBytes(msg); 
     using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) 
     { 
      s.Connect(endPoint); 
      s.Send(buffer); 
     } 
    } 
} 

el cliente envía siete mensajes, pero el servidor sólo imprime cuatro:

 
Press return to test... 

Press return to terminate... 
Echo Handler: Test1 
Echo Handler: Test3 
Echo Handler: Test2 
Echo Handler: Test4 

Sospecho que el monitor se confunde al permitir que ocurra el Pulse (en el método Accept del servidor) antes de que ocurra el Wait (i n el método ThreadStart), aunque el ThreadStart aún debe tener el candado en el objeto sync hasta que llame al Monitor.Wait(), y luego el método Accept puede adquirir el candado y enviar su Pulse. Si comentar estas dos líneas en el método Stop() del servidor:

//listener.Stop(); 
//running = false; 

Los mensajes restantes aparecen cuando se llama al método Stop() del servidor (es decir, despertar objeto del servidor sync hace que despachar los mensajes entrantes restantes). Me parece que esto solo puede ocurrir en una condición de carrera entre los métodos ThreadStart y Accept, pero el bloqueo alrededor del objeto sync debería evitar esto.

¿Alguna idea?

Muchas gracias, Simon.

ps. Tenga en cuenta que soy consciente de que el resultado parece estar fuera de servicio, etc., específicamente estoy preguntando sobre una condición de carrera entre los bloqueos y el monitor. Saludos, SH.

Respuesta

5

El problema es que está utilizando Pulse/Wait como señal. Una señal adecuada, como un AutoResetEvent tiene un estado tal que permanece señalizado hasta que un hilo ha llamado a WaitOne(). Llamar a Pulse sin hilos esperando en él se convertirá en un noop.

Esto se combina con el hecho de que un candado puede tomarse muchas veces con el mismo hilo. Como está utilizando la programación Async, la devolución de llamada de Accept se puede invocar con el mismo hilo que hizo BeginAcceptTcpClient.

Déjame ilustrar. Comenté el segundo servidor y cambié un código en su servidor.

void ThreadStart() 
{ 
    if (!running) 
    { 
     listener.Start(); 
     running = true; 
     lock (sync) 
     { 
      while (running) 
      { 
       try 
       { 
        Console.WriteLine("BeginAccept [{0}]", 
         Thread.CurrentThread.ManagedThreadId); 
        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); 
        Console.WriteLine("Wait [{0}]", 
         Thread.CurrentThread.ManagedThreadId); 
        Monitor.Wait(sync); // Release lock and wait for a pulse 
       } 
       catch (Exception e) 
       { 
        Console.WriteLine(e.Message); 
       } 
      } 
     } 
    } 
} 

void Accept(IAsyncResult result) 
{ 
    // Let the server continue listening 
    lock (sync) 
    { 
     Console.WriteLine("Pulse [{0}]", 
      Thread.CurrentThread.ManagedThreadId); 
     Monitor.Pulse(sync); 
    } 
    if (running) 
    { 
     TcpListener localListener = (TcpListener)result.AsyncState; 
     using (TcpClient client = localListener.EndAcceptTcpClient(result)) 
     { 
      handler.Handle(client.GetStream()); 
     } 
    } 
} 

La salida de mi ejecución se muestra a continuación. Si ejecuta este código usted mismo, los valores serán diferentes, pero será el mismo en general.

Press return to test... 
BeginAccept [3] 
Wait [3] 

Press return to terminate... 
Pulse [5] 
BeginAccept [3] 
Pulse [3] 
Echo Handler: Test1 
Echo Handler: Test3 
Wait [3] 

Como se puede ver hay dos pulsos de llamada, uno de un hilo separado (el pulso [5]), que despierta el primero de espera. El subproceso 3 hace otro BeginAccept, pero al tener conexiones entrantes pendientes, el subproceso decide llamar a la devolución de llamada de aceptación de inmediato. Como Accept se invoca con el mismo hilo, el bloqueo (sincronización) no bloquea, pero Pulse [3] inmediatamente en una fila de hilos vacía.

Se invocan dos controladores y maneja los dos mensajes.

Todo está bien, y el ThreadStart comienza a funcionar nuevamente y va a Esperar indefinidamente.

Ahora, el problema subyacente aquí es que está tratando de usar un monitor como señal. Dado que no recuerda el estado en que se perdió el segundo pulso.

Pero hay una solución fácil para esto. Use AutoResetEvents, que es una señal adecuada y recordará su estado.

public Server(IHandler handler, int port) 
{ 
    this.handler = handler; 
    IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; 
    listener = new TcpListener(address, port); 
    running = false; 
    _event = new AutoResetEvent(false); 
} 

public void Start() 
{ 
    Thread thread = new Thread(ThreadStart); 
    thread.Start(); 
} 

public void Stop() 
{ 
    listener.Stop(); 
    running = false; 
    _event.Set(); 
} 

void ThreadStart() 
{ 
    if (!running) 
    { 
     listener.Start(); 
     running = true; 
     while (running) 
     { 
      try 
      { 
       listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); 
       _event.WaitOne(); 
      } 
      catch (Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 
    } 
} 

void Accept(IAsyncResult result) 
{ 
    // Let the server continue listening 
    _event.Set(); 
    if (running) 
    { 
     TcpListener localListener = (TcpListener) result.AsyncState; 
     using (TcpClient client = localListener.EndAcceptTcpClient(result)) 
     { 
      handler.Handle(client.GetStream()); 
     } 
    } 
} 
+0

Thanks Mats. Supuse que BeginAcceptTcpClient siempre se ejecutaba en un subproceso separado y, por lo tanto, podría usar el objeto Sync como una sección crítica. Estuviste en el blanco y las señales son el camino a seguir. Gracias de nuevo. SH –

Cuestiones relacionadas