2011-08-27 12 views
12

He estado trabajando con JMS y ActiveMQ. Todo está funcionando de maravilla. No estoy utilizando la primavera, ni puedo I.Señal un retroceso de un JMS MessageListener

La interfaz javax.jms.MessageListener tiene un solo método, onMessage. Desde dentro de una implementación, existe la posibilidad de que se produzca una excepción. Si de hecho se lanza una excepción, entonces digo que el mensaje no se procesó correctamente y que se debe volver a intentar. Entonces, necesito que ActiveMQ espere un momento y luego, vuelva a intentarlo. es decir, necesito la excepción lanzada para deshacer la transacción JMS.

¿Cómo puedo lograr tal comportamiento?

tal vez hay alguna configuración en ActiveMQ que no era capaz de encontrar.

O ... tal vez podría acabar con el registro MessageListener s para los consumidores y consumir los mensajes a mí mismo, en un un bucle como:

while (true) { 
    // ... some administrative stuff like ... 
    session = connection.createSesstion(true, SESSION_TRANSACTED) 
    try { 
     Message m = receiver.receive(queue, 1000L); 
     theMessageListener.onMessage(m); 
     session.commit(); 
    } catch (Exception e) { 
     session.rollback(); 
     Thread.sleep(someTimeDefinedSomewhereElse); 
    } 
    // ... some more administrative stuff 
} 

en un par de hilos, en lugar de registrarse al oyente.

O ... De alguna manera puede decorar/AOP/byte manipular los MessageListener s a hacerlo por sí mismos.

¿Qué ruta tomaría usted y por qué?

nota: No tengo control total sobre el código de MessageListener.

EDITAR Una prueba para la prueba de concepto:

@Test 
@Ignore("Interactive test, just a proof of concept") 
public void transaccionConListener() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     @Override 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         message.acknowledge(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         message.acknowledge(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    brokerService.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 
+0

Muchas gracias whaley y @Ammar por las respuestas. Estoy votando ambos desde que ambos nos indicaron el camino correcto. Pero aún no ha elegido una respuesta correcta. Porque se necesitan más pruebas. –

Respuesta

10

Si desea utilizar SESSION_TRANSACTED como el modo de acuse de recibo, entonces usted necesita para configurar un RedeliveryPolicy on your Connection/ConnectionFactory. This page on ActiveMQ's website también contiene algo de información útil sobre lo que podría necesitar hacer.

Puesto que usted no está utilizando la primavera, se puede configurar un RedeliveryPolicy con algo similar al código siguiente (tomado de uno de los enlaces de arriba):

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); 
policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

Editar Tomando su fragmento de código agregado a la respuesta, a continuación se muestra cómo funciona esto con las transacciones. Pruebe este código con el método Session.rollback() comentado y verá que usa SESION_TRANSACTED y Session.commit/rollback funciona como se esperaba:

@Test 
public void test() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         session.commit(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         session.rollback(); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         session.commit(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 

}

+0

Eso no funcionó. Pero me señaló en la dirección correcta. Dejaré DUPS_OK_ACKNOWLEDGE ya que parece ser el que funciona y tengo que trabajar menos. –

+0

Debe pegar la totalidad de su código, porque no está haciendo algo correctamente con su sesión. DUPS_OK_ACKNOWLEDGE solo parece funcionar, ya que la confirmación del cliente es floja y el intermediario seguirá reenviando mensajes hasta que el cliente finalmente lo haga. – whaley

+0

He pegado una prueba de concepto. Solo puedo hacer que funcione con DUPS_OK_ACKNOWLEDGE y el mensaje.agradecimiento no parece marcar la diferencia. –

2

Es necesario configurar el modo de confirmación a Session.CLIENT_ACKNOWLEDGE, el cliente reconoce un mensaje consumido llamando al método de reconocimiento del mensaje.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Luego, después de procesar el mensaje a necesidad de llamar al método Message.acknowledge() con el fin de eliminar ese mensaje.

Message message = ...; 
// Processing message 

message.acknowledge(); 
+0

No funciona. _onMessage_ todavía se llama una vez, incluso si _message.acknowledge() _ nunca recibe una llamada. –

+0

¿Ha configurado correctamente el modo de confirmación? ¡Debe configurarse en Session.CLIENT_ACKNOWLEDGE! – Ammar

+0

Pero funciona con (falso, Session.DUPS_OK_ACKNOWLEDGE) ... message.acknowledge() no parece ser el truco. –

0

Si se tramita su sesión, a continuación, "acknowledgeMode" se ignora anyways..So, acaba de salir de la sesión de tramitó y utilizar session.rollback y session.commit para comprometer o revertir su transacción.

+1

Creo que el (mi) problema es que la sesión no es accesible dentro del MessageListener.onMessage (Message). –

Cuestiones relacionadas