[CXF-6454] Handle InvalidClientIdException and allow to set retryInterval
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
index 464fc7a..972c4a0 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
@@ -129,6 +129,8 @@
String targetService = endpoint.getTargetService();
jmsConfig.setTargetService(targetService);
+ int retryInterval = endpoint.getRetryInterval();
+ jmsConfig.setRetryInterval(retryInterval);
return jmsConfig;
}
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 4ab0c89..04b67ef 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -95,6 +95,7 @@
// For jms spec. Do not configure manually
private String targetService;
private String requestURI;
+ private int retryInterval;
@@ -481,4 +482,12 @@
this.transactionManager = transactionManager;
}
+ public int getRetryInterval() {
+ return this.retryInterval;
+ }
+
+ public void setRetryInterval(int retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+
}
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 2b5d8cd..24b6ec1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -27,6 +27,7 @@
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -105,6 +106,9 @@
try {
this.jmsListener = createTargetDestinationListener();
} catch (Exception e) {
+ if (e.getCause() != null && InvalidClientIDException.class.isInstance(e.getCause())) {
+ throw e;
+ }
// If first connect fails we will try to establish the connection in the background
new Thread(new Runnable() {
@@ -150,6 +154,8 @@
connection.start();
return container;
} catch (JMSException e) {
+ ResourceCloser.close(connection);
+ this.connection = null;
throw JMSUtil.convertJmsException(e);
} finally {
ResourceCloser.close(session);
@@ -173,9 +179,9 @@
LOG.log(Level.WARNING, message);
}
try {
- Thread.sleep(5000);
+ Thread.sleep(jmsConfig.getRetryInterval());
} catch (InterruptedException e2) {
- // Ignore
+ shutdown = true;
}
}
} while (jmsListener == null && !shutdown);
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index f5e9d03..17a5705 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -85,6 +85,7 @@
private boolean useConduitIdSelector = true;
private String username;
private int concurrentConsumers = 1;
+ private int retryInterval = 5000;
/**
* @param uri
@@ -475,5 +476,15 @@
throw new IllegalArgumentException(v);
}
}
+
+ public int getRetryInterval() {
+ return retryInterval;
+ }
+ public void setRetryInterval(int retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+ public void setRetryInterval(String retryInterval) {
+ this.retryInterval = Integer.valueOf(retryInterval);
+ }
}
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 9f8fcb2..830f2ba 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -70,6 +70,11 @@
}
} catch (Exception e) {
LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ // Ignore
+ }
}
}
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index b34f53d..3030f9e 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -32,7 +32,6 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
@@ -57,6 +56,7 @@
protected static final int MAX_RECEIVE_TIME = 10;
protected static final String MESSAGE_CONTENT = "HelloWorld";
protected static Bus bus;
+ protected static ActiveMQConnectionFactory cf1;
protected static ConnectionFactory cf;
protected static BrokerService broker;
@@ -78,8 +78,8 @@
broker.addConnector(brokerUri);
broker.start();
bus = BusFactory.getDefaultBus();
- ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory(brokerUri);
- cf = new PooledConnectionFactory(cf1);
+ cf1 = new ActiveMQConnectionFactory(brokerUri);
+ cf = cf1;
}
@AfterClass
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 690c721..a4e3980 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -25,8 +25,10 @@
import java.io.Reader;
import java.io.StringReader;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Topic;
@@ -41,6 +43,7 @@
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.MultiplexDestination;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.junit.Ignore;
import org.junit.Test;
@@ -76,6 +79,29 @@
conduit.close();
destination.shutdown();
}
+
+ @Test(expected = InvalidClientIDException.class)
+ public void testDurableInvalidClientId() throws Throwable {
+ Connection con = cf1.createConnection();
+ JMSDestination destination = null;
+ try {
+ con.setClientID("testClient");
+ con.start();
+ destMessage = null;
+ EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
+ JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
+ jmsConfig.setDurableSubscriptionClientId("testClient");
+ jmsConfig.setDurableSubscriptionName("testsub");
+ jmsConfig.setConnectionFactory(cf);
+ destination = new JMSDestination(bus, ei, jmsConfig);
+ destination.setMessageObserver(createMessageObserver());
+ } catch (RuntimeException e) {
+ throw e.getCause();
+ } finally {
+ ResourceCloser.close(con);
+ destination.shutdown();
+ }
+ }
@Test
public void testOneWayDestination() throws Exception {