[CXF-6454] Handle InvalidClientIdException and allow to set retryInterval
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
index 464fc7a..972c4a0 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
@@ -129,6 +129,8 @@
         
         String targetService = endpoint.getTargetService();
         jmsConfig.setTargetService(targetService);
+        int retryInterval = endpoint.getRetryInterval();
+        jmsConfig.setRetryInterval(retryInterval);
         return jmsConfig;
     }
 
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 4ab0c89..04b67ef 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -95,6 +95,7 @@
     // For jms spec. Do not configure manually
     private String targetService;
     private String requestURI;
+    private int retryInterval;
 
 
 
@@ -481,4 +482,12 @@
         this.transactionManager = transactionManager;
     }
 
+    public int getRetryInterval() {
+        return this.retryInterval;
+    }
+    
+    public void setRetryInterval(int retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+
 }
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 2b5d8cd..24b6ec1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -27,6 +27,7 @@
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -105,6 +106,9 @@
         try {
             this.jmsListener = createTargetDestinationListener();
         } catch (Exception e) {
+            if (e.getCause() != null && InvalidClientIDException.class.isInstance(e.getCause())) {
+                throw e;
+            }
             // If first connect fails we will try to establish the connection in the background 
             new Thread(new Runnable() {
                 
@@ -150,6 +154,8 @@
             connection.start();
             return container;
         } catch (JMSException e) {
+            ResourceCloser.close(connection);
+            this.connection = null;
             throw JMSUtil.convertJmsException(e);
         } finally {
             ResourceCloser.close(session);
@@ -173,9 +179,9 @@
                     LOG.log(Level.WARNING, message);
                 }
                 try {
-                    Thread.sleep(5000);
+                    Thread.sleep(jmsConfig.getRetryInterval());
                 } catch (InterruptedException e2) {
-                    // Ignore
+                    shutdown = true;
                 }
             }
         } while (jmsListener == null && !shutdown);
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index f5e9d03..17a5705 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -85,6 +85,7 @@
     private boolean useConduitIdSelector = true;
     private String username;
     private int concurrentConsumers = 1;
+    private int retryInterval = 5000;
 
     /**
      * @param uri
@@ -475,5 +476,15 @@
             throw new IllegalArgumentException(v);
         }
     }
+
+    public int getRetryInterval() {
+        return retryInterval;
+    }
+    public void setRetryInterval(int retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+    public void setRetryInterval(String retryInterval) {
+        this.retryInterval = Integer.valueOf(retryInterval);
+    }
     
 }
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 9f8fcb2..830f2ba 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -70,6 +70,11 @@
                     }
                 } catch (Exception e) {
                     LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        // Ignore
+                    }
                 }
             }
 
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index b34f53d..3030f9e 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -32,7 +32,6 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -57,6 +56,7 @@
     protected static final int MAX_RECEIVE_TIME = 10;
     protected static final String MESSAGE_CONTENT = "HelloWorld";
     protected static Bus bus;
+    protected static ActiveMQConnectionFactory cf1;
     protected static ConnectionFactory cf;
     protected static BrokerService broker;
 
@@ -78,8 +78,8 @@
         broker.addConnector(brokerUri);
         broker.start();
         bus = BusFactory.getDefaultBus();
-        ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory(brokerUri);
-        cf = new PooledConnectionFactory(cf1);
+        cf1 = new ActiveMQConnectionFactory(brokerUri);
+        cf = cf1;
     }
 
     @AfterClass
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 690c721..a4e3980 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -25,8 +25,10 @@
 import java.io.Reader;
 import java.io.StringReader;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.Topic;
@@ -41,6 +43,7 @@
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.MultiplexDestination;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -76,6 +79,29 @@
         conduit.close();
         destination.shutdown();
     }
+    
+    @Test(expected = InvalidClientIDException.class)
+    public void testDurableInvalidClientId() throws Throwable {
+        Connection con = cf1.createConnection();
+        JMSDestination destination = null;
+        try {
+            con.setClientID("testClient");
+            con.start();
+            destMessage = null;
+            EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
+            JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
+            jmsConfig.setDurableSubscriptionClientId("testClient");
+            jmsConfig.setDurableSubscriptionName("testsub");
+            jmsConfig.setConnectionFactory(cf);
+            destination = new JMSDestination(bus, ei, jmsConfig);
+            destination.setMessageObserver(createMessageObserver());
+        } catch (RuntimeException e) {
+            throw e.getCause();
+        } finally {
+            ResourceCloser.close(con);
+            destination.shutdown();
+        }
+    }
 
     @Test
     public void testOneWayDestination() throws Exception {