2008-09-29 14 views
9

Filtrado del receptor de mensaje JMS por JMSCorrelationID

¿Cómo puedo crear una instancia de un escucha de cola JMS en java (JRE/JDK/J2EE 1.4) que solo recibe mensajes que coinciden con un JMSCorrelationID dado? Los mensajes que estoy buscando recoger se han publicado en una cola y no en un tema, aunque eso puede cambiar si es necesario.

Aquí está el código que estoy utilizando actualmente para poner el mensaje en la cola: configuración de la conexión

/** 
* publishResponseToQueue publishes Requests to the Queue. 
* 
* @param jmsQueueFactory    -Name of the queue-connection-factory 
* @param jmsQueue     -The queue name for the request 
* @param response      -A response object that needs to be published 
* 
* @throws ServiceLocatorException  -An exception if a request message 
*          could not be published to the Topic 
*/ 
private void publishResponseToQueue(String jmsQueueFactory, 
            String jmsQueue, 
            Response response) 
     throws ServiceLocatorException { 

    if (logger.isInfoEnabled()) { 
     logger.info("Begin publishRequestToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + "," + response); 
    } 
    logger.assertLog(jmsQueue != null && !jmsQueue.equals(""), 
         "jmsQueue cannot be null"); 
    logger.assertLog(jmsQueueFactory != null && !jmsQueueFactory.equals(""), 
         "jmsQueueFactory cannot be null"); 
    logger.assertLog(response != null, "Request cannot be null"); 

    try { 

     Queue queue = (Queue)_context.lookup(jmsQueue); 

     QueueConnectionFactory factory = (QueueConnectionFactory) 
      _context.lookup(jmsQueueFactory); 

     QueueConnection connection = factory.createQueueConnection(); 
     connection.start(); 
     QueueSession session = connection.createQueueSession(false, 
            QueueSession.AUTO_ACKNOWLEDGE); 

     ObjectMessage objectMessage = session.createObjectMessage(); 

     objectMessage.setJMSCorrelationID(response.getID()); 

     objectMessage.setObject(response); 

     session.createSender(queue).send(objectMessage); 

     session.close(); 
     connection.close(); 

    } catch (Exception e) { 
     //XC3.2 Added/Modified BEGIN 
     logger.error("ServiceLocator.publishResponseToQueue - Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     throw new ServiceLocatorException("ServiceLocator.publishResponseToQueue " + 
              "- Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     //XC3.2 Added/Modified END 
    } 

    if (logger.isInfoEnabled()) { 
     logger.info("End publishResponseToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + response); 
    } 

} // end of publishResponseToQueue method 

Respuesta

10

La cola es la misma, pero una vez que tenga la QueueSession, que ajuste el selector al crear un receptor.

QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'"); 

continuación

receiver.receive() 

o

receiver.setListener(myListener); 
+0

He estado leyendo sobre el mismo tema hace poco y tengo una pregunta de la siguiente manera: el receptor se siguen recibiendo esos mensajes que no contienen el identificador de correlación requerida y en silencio soltarlos w/o procesamiento, o serán las El proveedor JMS en sí no entrega tales mensajes al receptor, por lo que aún permanecen en la cola? Siento que este último es el enfoque correcto, pero quiero verificarlo. Gracias. – shrini1000

+0

@ shrini1000 estás en lo correcto. – Trying

5

Por cierto, mientras que no es la cuestión real que pedirá - si usted está tratando de implementar respuesta de solicitud a través de JMS se lo recomiendo como el reading this article La API JMS es bastante más compleja de lo que puedas imaginar y hacer esto de manera eficiente es mucho más difícil de lo que parece.

En particular to use JMS efficiently usted debe tratar de evitar la creación de los consumidores para un único mensaje, etc.

también porque la API JMS es tan muy difíciles de usar correctamente y eficientemente - en particular con la puesta en común, transacciones y procesamiento concurrente - Recomiendo hide the middleware from their application code gente como mediante el uso de Apache Camel's Spring Remoting implementation for JMS

+0

Me habría ahorrado una gran cantidad de reinvención de ruedas, si hubiera sabido de Camel hace un par de años. –

0
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; 
QueueReceiver receiver = session.createReceiver(queue, filter); 

Aquí el receptor recibirá los mensajes para los que JMSCorrelationID es igual a MessageID. esto es muy útil en el paradigma de solicitud/respuesta.

o se puede fijar directamente esta a cualquier valor:

QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";); 

de lo que puede hacer cualquiera receiver.receive(2000); o receiver.setMessageListener(this);

2

Esperamos que esto ayude. Usé Open MQ.

package com.MQueues; 

import java.util.UUID; 

import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.QueueConnection; 
import javax.jms.QueueReceiver; 
import javax.jms.QueueSession; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import com.sun.messaging.BasicQueue; 
import com.sun.messaging.QueueConnectionFactory; 

public class HelloProducerConsumer { 

public static String queueName = "queue0"; 
public static String correlationId; 

public static String getCorrelationId() { 
    return correlationId; 
} 

public static void setCorrelationId(String correlationId) { 
    HelloProducerConsumer.correlationId = correlationId; 
} 

public static String getQueueName() { 
    return queueName; 
} 

public static void sendMessage(String threadName) { 
    correlationId = UUID.randomUUID().toString(); 
    try { 

     // Start connection 
     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
     BasicQueue destination = (BasicQueue) session.createQueue(threadName); 
     MessageProducer producer = session.createProducer(destination); 
     connection.start(); 

     // create message to send 
     TextMessage message = session.createTextMessage(); 
     message.setJMSCorrelationID(correlationId); 
     message.setText(threadName + "(" + System.currentTimeMillis() 
       + ") " + correlationId +" from Producer"); 

     System.out.println(correlationId +" Send from Producer"); 
     producer.send(message); 

     // close everything 
     producer.close(); 
     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void receivemessage(final String correlationId) { 
    try { 

     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 

     BasicQueue destination = (BasicQueue) session.createQueue(getQueueName()); 

     connection.start(); 

     System.out.println("\n"); 
     System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 
     long now = System.currentTimeMillis(); 

     // receive our message 
     String filter = "JMSCorrelationID = '" + correlationId + "'"; 
     QueueReceiver receiver = session.createReceiver(destination, filter); 
     TextMessage m = (TextMessage) receiver.receive(); 
     System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp()); 

     System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 

     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void main(String args[]) { 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId1 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId2 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId3 = getCorrelationId(); 


    HelloProducerConsumer.receivemessage(correlationId2); 

    HelloProducerConsumer.receivemessage(correlationId1); 

    HelloProducerConsumer.receivemessage(correlationId3); 
} 
} 
Cuestiones relacionadas