2011-11-04 13 views
10

He estado utilizando Rx por un tiempo ahora para eventos en mis proyectos y dedicado a la programación de Socket y la parte buena es su haciendo bien. Administrar mi código, ventaja de rendimiento y mucho mejor para ejecutar e interpretar.Uso de la extensión reactiva (Rx) para recibir mensajes MSMQ utilizando un patrón asíncrono (queue.BeginReceive, queue.EndReceive)

Últimamente tengo que modificar el flujo de procesos de mi proyecto donde necesito volcar todos los datos entrantes (desde operaciones de socket) en colas (usando la implementación MSMQ como se decidió para hacer cola).

Como MSMQ proporciona una llamada asincrónica para eliminar mensajes de la cola (pero en un patrón extraño). He estado luchando para usar Rx para este propósito ahora, pero lo habilito para hacerlo.

Pregunta: ¿Alguien puede darme un ejemplo de código limpio para implementar Rx para la recepción de mensajes de la cola utilizando el patrón Async.

necesito la ejecución del operador asíncrono para MSMQ análoga a algo como esto

var data = Observable.FromAsyncPattern<byte[]>(
         this.receiverSocket.BeginReceive, 
         this.receiverSocket.EndReceive(some parameters); 

Gracias de antemano. * aplausos * a Rx y .NET

+0

¿Me puede apuntar a la clase que tiene el 'EndReceive' con los parámetros? No puedo encontrarlo ... – Enigmativity

+0

No estoy seguro de lo que estás preguntando, pero estoy respondiendo lo que entendí. Necesito una implementación de Rx (FromAsyncPattern) para BeginReceive & EndReceive que me devuelve el objeto "System.Messaging.Message". Los métodos asíncronos son para métodos de objetos System.Messaging.MessageQueue. Espero que ahora tengas una mejor idea ahora. Estoy contento y estaba seguro de que responderá súper rápido cuando se trata de preguntas sobre Rx: D – Jsinh

+0

Ninguno de los métodos 'EndReceive' en la clase' MessageQueue' tiene parámetros que no sean 'IAsyncResult'. ¿Qué tipo es tu 'receiverSocket'? – Enigmativity

Respuesta

4

Sería tan simple como:

var queue = new System.Messaging.MessageQueue("test"); 
var fun = Observable.FromAsyncPattern((cb, obj) => queue.BeginReceive(TimeSpan.FromMinutes(10),obj,cb), a => queue.EndReceive(a)); 
var obs = fun(); 
+0

Creo que tienes razón. ¡Todos los "parámetros en EndReceive" me arrojaron por completo! – Enigmativity

+1

sí. Este código funciona perfectamente. Pero el problema es con el tiempo de espera. Cuando el tiempo: el caso aquí es de 10 minutos ha terminado y no hay ningún mensaje en la cola, arroja una MessageQueueException. El segundo problema es que var obs = fun(); subscribe llamará a la función solo una vez, ¿cómo puedo hacer que sea recursiva en sincronización con el final que recibe la solicitud anterior? – Jsinh

+1

Nota para Googlers: aunque fue una buena respuesta en ese momento, 'FromAsyncPattern' ahora está ** obsoleto **. – MickyD

Cuestiones relacionadas