[CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener
diff --git a/rt/transports/jms/pom.xml b/rt/transports/jms/pom.xml
index e13c5a6..e6fd9a4 100644
--- a/rt/transports/jms/pom.xml
+++ b/rt/transports/jms/pom.xml
@@ -45,7 +45,6 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.1_spec</artifactId>
- <version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
@@ -64,6 +63,13 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>2.0.0</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-management</artifactId>
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 8ec23cd..22a94de 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -118,19 +118,20 @@
Session session = null;
try {
connection = JMSFactory.createConnection(jmsConfig);
- connection.setExceptionListener(new ExceptionListener() {
+ ExceptionListener exListener = new ExceptionListener() {
public void onException(JMSException exception) {
if (!shutdown) {
LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", exception);
restartConnection();
}
}
- });
+ };
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = jmsConfig.getTargetDestination(session);
PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
- destination, this);
+ destination,
+ this, exListener);
container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
container.setTransactionManager(jmsConfig.getTransactionManager());
container.setMessageSelector(jmsConfig.getMessageSelector());
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index c4276eb..461a2b1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -23,6 +23,7 @@
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -36,24 +37,25 @@
public class PollingMessageListenerContainer extends AbstractMessageListenerContainer {
private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
+ private ExceptionListener exceptionListener;
public PollingMessageListenerContainer(Connection connection, Destination destination,
- MessageListener listenerHandler) {
+ MessageListener listenerHandler, ExceptionListener exceptionListener) {
this.connection = connection;
this.destination = destination;
this.listenerHandler = listenerHandler;
+ this.exceptionListener = exceptionListener;
}
- private class Poller extends AbstractPoller implements Runnable {
+ private class Poller implements Runnable {
@Override
public void run() {
Session session = null;
- init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
- // Create session early to optimize performance
+ // Create session early to optimize performance // In
session = closer.register(connection.createSession(transacted, acknowledgeMode));
MessageConsumer consumer = closer.register(createConsumer(session));
while (running) {
@@ -70,14 +72,12 @@
safeRollBack(session);
}
}
- } catch (Throwable e) {
- catchUnexpectedExceptionDuringPolling(null, e);
+ } catch (Exception e) {
+ handleException(e);
}
}
-
}
- @Override
protected void safeRollBack(Session session) {
try {
if (session != null && session.getTransacted()) {
@@ -90,11 +90,10 @@
}
- private class XAPoller extends AbstractPoller implements Runnable {
+ private class XAPoller implements Runnable {
@Override
public void run() {
- init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
@@ -121,14 +120,12 @@
safeRollBack(session);
}
} catch (Exception e) {
- catchUnexpectedExceptionDuringPolling(null, e);
+ handleException(e);
}
-
}
}
- @Override
protected void safeRollBack(Session session) {
try {
transactionManager.rollback();
@@ -139,64 +136,6 @@
}
- private abstract class AbstractPoller {
- private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception";
- private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry";
- protected int retryCounter = -1;
- protected int counter;
- protected int sleepingTime = 5000;
-
- protected void init() {
- if (jndiEnvironment != null) {
- if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) {
- retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION));
- }
- if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) {
- sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY));
- }
- }
- }
-
- protected boolean hasToCount() {
- return retryCounter > -1;
- }
-
- protected boolean hasToStop() {
- return counter > retryCounter;
- }
-
- protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e) {
- LOG.log(Level.WARNING, "Unexpected exception.", e);
- if (hasToCount()) {
- counter++;
- if (hasToStop()) {
- stop(session, e);
- }
- }
- if (running) {
- try {
- String log = "Now sleeping for " + sleepingTime / 1000 + " seconds";
- log += hasToCount()
- ? ". Then restarting session and consumer: attempt " + counter + "/" + retryCounter
- : "";
- LOG.log(Level.WARNING, log);
- Thread.sleep(sleepingTime);
- } catch (InterruptedException e1) {
- LOG.log(Level.WARNING, e1.getMessage());
- }
- }
- }
-
- protected void stop(Session session, Throwable e) {
- LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e);
- safeRollBack(session);
- running = false;
- }
-
- protected abstract void safeRollBack(Session session);
-
- }
-
private MessageConsumer createConsumer(Session session) throws JMSException {
if (durableSubscriptionName != null && destination instanceof Topic) {
return session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
@@ -205,6 +144,18 @@
return session.createConsumer(destination, messageSelector);
}
}
+
+ protected void handleException(Exception e) {
+ running = false;
+ JMSException wrapped;
+ if (e instanceof JMSException) {
+ wrapped = (JMSException) e;
+ } else {
+ wrapped = new JMSException("Wrapped exception. " + e.getMessage());
+ wrapped.addSuppressed(e);
+ }
+ this.exceptionListener.onException(wrapped);
+ }
@Override
public void start() {
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index 82cc37a..228ffa7 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -20,6 +20,7 @@
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -36,14 +37,76 @@
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.pool.XaPooledConnectionFactory;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.awaitility.Awaitility;
+import org.easymock.Capture;
import org.junit.Assert;
import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
public class MessageListenerTest {
private static final String FAIL = "fail";
private static final String FAILFIRST = "failfirst";
private static final String OK = "ok";
+
+ @Test
+ public void testConnectionProblem() throws JMSException {
+ Connection connection = createConnection("broker");
+ Queue dest = JMSUtil.createQueue(connection, "test");
+
+ MessageListener listenerHandler = new TestMessageListener();
+ ExceptionListener exListener = createMock(ExceptionListener.class);
+
+ Capture<JMSException> captured = newCapture();
+ exListener.onException(capture(captured));
+ expectLastCall();
+ replay(exListener);
+
+ PollingMessageListenerContainer container = //
+ new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener);
+ connection.close(); // Simulate connection problem
+ container.start();
+ Awaitility.await().until(() -> !container.isRunning());
+ verify(exListener);
+ JMSException ex = captured.getValue();
+ Assert.assertEquals("The connection is already closed", ex.getMessage());
+ }
+
+ @Test
+ public void testConnectionProblemXA() throws JMSException, XAException, InterruptedException {
+ TransactionManager transactionManager = new GeronimoTransactionManager();
+ Connection connection = createXAConnection("brokerJTA", transactionManager);
+ Queue dest = JMSUtil.createQueue(connection, "test");
+
+ MessageListener listenerHandler = new TestMessageListener();
+ ExceptionListener exListener = createMock(ExceptionListener.class);
+
+ Capture<JMSException> captured = newCapture();
+ exListener.onException(capture(captured));
+ expectLastCall();
+ replay(exListener);
+
+ PollingMessageListenerContainer container = //
+ new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener);
+ container.setTransacted(false);
+ container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
+ container.setTransactionManager(transactionManager);
+
+ connection.close(); // Simulate connection problem
+ container.start();
+ Awaitility.await().until(() -> !container.isRunning());
+ verify(exListener);
+ JMSException ex = captured.getValue();
+ // Closing the pooled connection will result in a NPE when using it
+ Assert.assertEquals("Wrapped exception. null", ex.getMessage());
+ }
@Test
public void testWithJTA() throws JMSException, XAException, InterruptedException {
@@ -52,11 +115,16 @@
Queue dest = JMSUtil.createQueue(connection, "test");
MessageListener listenerHandler = new TestMessageListener();
+ ExceptionListener exListener = new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ }
+ };
PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest,
- listenerHandler);
+ listenerHandler, exListener);
container.setTransacted(false);
container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
-
container.setTransactionManager(transactionManager);
container.start();