2012-07-27 14 views
8

Soy nuevo en MassTransit, y extraño algo en mi entendimiento.Publicación de MassTransit y evento versus comando

Digamos que tengo una granja de servidores donde todos los nodos pueden hacer el mismo trabajo. El marco de aplicación es de estilo CQRS. Eso significa que tengo dos tipos de base de mensaje para publicar:

  • Comandos: debe ser manejado por exactamente uno de los servidores, cualquiera de ellos (el primero con la ranura de trabajo libre)
  • Eventos: deben ser manejados por todos los servidores

He creado un prototipo de MassTransit extremadamente simple (una aplicación de consola que envía hello cada X segundos).

En la API, puedo ver que hay un método de "publicación". ¿Cómo puedo especificar qué tipo de mensaje es (uno versus todos los servidores)?

Si miro la configuración del "manejador", puedo especificar la cola uri. Si especifico la misma cola para todos los hosts, todos los hosts recibirán el mensaje, pero no puedo limitar la ejecución a un solo servidor.

Si escucho desde una cola dedicada al host, solo un servidor manejará los mensajes, pero no sé cómo transmitir el otro tipo de mensaje.

Por favor, ayúdame a entender lo que me estoy perdiendo.

PD: si me importa, mi sistema de mensajes es rabbitmq.

Con el fin de probar, tengo crear una biblioteca de clases común con estas clases:

public static class ActualProgram 
{ 
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource(); 

    private static readonly Random g_Random = new Random(); 

    public static void ActualMain(int delay, int instanceName) 
    { 
     Thread.Sleep(delay); 
     SetupBus(instanceName); 

     Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 

     Console.WriteLine("Press enter at any time to exit"); 
     Console.ReadLine(); 
     g_Shutdown.Cancel(); 

     Bus.Shutdown(); 
    } 

    private static void PublishRandomMessage() 
    { 
     Bus.Instance.Publish(new Message 
     { 
      Id = g_Random.Next(), 
      Body = "Some message", 
      Sender = Assembly.GetEntryAssembly().GetName().Name 
     }); 

     if (!g_Shutdown.IsCancellationRequested) 
     { 
      Thread.Sleep(g_Random.Next(500, 10000)); 
      Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 
     } 
    } 

    private static void SetupBus(int instanceName) 
    { 
     Bus.Initialize(sbc => 
     { 
      sbc.UseRabbitMqRouting(); 
      sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName); 
      sbc.Subscribe(subs => 
      { 
       subs.Handler<Message>(MessageHandled); 
      }); 
     }); 
    } 

    private static void MessageHandled(Message msg) 
    { 
     ConsoleColor color = ConsoleColor.Red; 
     switch (msg.Sender) 
     { 
      case "test_app1": 
       color = ConsoleColor.Green; 
       break; 

      case "test_app2": 
       color = ConsoleColor.Blue; 
       break; 

      case "test_app3": 
       color = ConsoleColor.Yellow; 
       break; 
     } 
     Console.ForegroundColor = color; 
     Console.WriteLine(msg.ToString()); 
     Console.ResetColor(); 
    } 

    private static void MessageConsumed(Message msg) 
    { 
     Console.WriteLine(msg.ToString()); 
    } 
} 

public class Message 
{ 
    public long Id { get; set; } 

    public string Sender { get; set; } 

    public string Body { get; set; } 

    public override string ToString() 
    { 
     return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body); 
    } 
} 

tengo también aplicaciones 3 de consola que acaba de ejecutar el método ActualMain:

internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     ActualProgram.ActualMain(0, 1); 
    } 
} 

Respuesta

9

lo que quieres se conoce como Compitiendo Consumidores (busque SO para que encuentre más información) El uso de RabbitMQ hace la vida más fácil, todo lo que necesita hacer es especificar el mismo nombre de cola para cada consumidor que usted comience, el mensaje será procesado por solo uno de ellos. En lugar de generar una cola única cada vez que lo hace.

private static void SetupBus(int instanceName) 
{ 
    Bus.Initialize(sbc => 
    { 
     sbc.UseRabbitMqRouting(); 
     sbc.ReceiveFrom("rabbitmq://localhost/Commands); 
     sbc.Subscribe(subs => 
     { 
      subs.Handler<Message>(MessageHandled); 
     }); 
    }); 
} 

yo sepa, usted necesita tener un proceso separado para los controladores de comandos en lugar de controladores de eventos. Todos los manejadores de comandos recibirán desde la misma cola, todos los manejadores de eventos recibirán desde su propia cola única.

La otra pieza del rompecabezas es cómo se envían los mensajes al autobús. Todavía puede usar publicar para los comandos, pero si configuró los consumidores de manera incorrecta, podría obtener múltiples ejecuciones, ya que el mensaje irá a todos los consumidores, si desea garantizar que el mensaje termine en una sola cola, puede usar Enviar en lugar de Publicar.

Bus.Instance 
    .GetEndpoint(new Uri("rabbitmq://localhost/Commands")) 
    .Send(new Message 
    { 
     Id = g_Random.Next(), 
     Body = "Some message", 
     Sender = Assembly.GetEntryAssembly().GetName().Name 
    }); 
+0

gracias. Esto me ayuda mucho a entender la diferencia. Lo único que creo que me asusta es "necesitarás tener un proceso separado para controladores de comandos en lugar de controladores de eventos". Esto tendrá un impacto en la arquitectura global, pero viviré con eso si no hay otra opción. –

+0

Para los consumidores que compiten, también necesitará instancias de bus separadas para cada consumidor.La razón por la que los comandos suelen estar en una instancia de bus separada suele ser que se procesen de forma síncrona cuando los consumidores se procesan en varios subprocesos. MT no le permite especificar la concurrencia por tipo de mensaje. Creo que es posible alojar múltiples buses en un solo proceso, pero no lo he probado. Uso topshelf (de los mismos tipos) para alojar cada uno de mis "servicios". Le permite elegir en proceso (appdomains) o separar procesos/máquinas separadas en el momento del despliegue muy fácilmente –

+0

thx. Le daré un vistazo –

Cuestiones relacionadas