SMX4-1205: Add reconnect behavior to JMS appender implementation

git-svn-id: https://svn.apache.org/repos/asf/servicemix/smx4/features/trunk@1393357 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java b/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java
index 7d3446e..ccc2006 100644
--- a/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java
+++ b/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java
@@ -34,52 +34,25 @@
     private ConnectionFactory jmsConnectionFactory;
     private Connection connection;
     private Session session;
-    private MessageProducer publisher;
-    private Topic topic;
+    private MessageProducer producer;
     private String destinationName;
 
     private LoggingEventFormat format = new DefaultLoggingEventFormat();
 
-
-
-    public void init() {
-        /*
-        * Create connection. Create session from connection; false means
-        * session is not transacted.
-        */
-        try {
-            connection = jmsConnectionFactory.createConnection();
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            topic = session.createTopic(destinationName);
-            publisher = session.createProducer(topic);
-            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
-            LOG.debug("Connection created with ActiveMQ for JMS Pax Appender.");
-
-        } catch (JMSException e) {
-            LOG.error(e.getMessage());
-        }
-    }
-
     public void close() {
-        if (connection != null) {
-            try {
-                connection.close();
-                LOG.debug("Connection closed with ActiveMQ for JMS Pax Appender.");
-            } catch (JMSException e) {
-                LOG.error(e.getMessage());
-            }
-        }
+        closeJMSResources();
     }
 
     public void doAppend(PaxLoggingEvent paxLoggingEvent) {
         try {
             // Send message to the destination
-            TextMessage message = session.createTextMessage();
+            TextMessage message = getOrCreateSession().createTextMessage();
             message.setText(format.toString(paxLoggingEvent));
-            publisher.send(message);
+            getOrCreatePublisher().send(message);
         } catch (JMSException e) {
-            e.printStackTrace();
+            LOG.warn("Exception caught while sending log event - reinitializing JMS resources to recover", e);
+            closeJMSResources();
+
         }
     }
 
@@ -98,4 +71,51 @@
             format = new DefaultLoggingEventFormat();
         }
     }
+
+    protected Connection getOrCreateConnection() throws JMSException {
+        if (connection == null) {
+            connection = jmsConnectionFactory.createConnection();
+        }
+        return connection;
+    }
+
+    protected Session getOrCreateSession() throws JMSException {
+        if (session == null) {
+            session = getOrCreateConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+        return session;
+    }
+
+    protected MessageProducer getOrCreatePublisher() throws JMSException {
+        if (producer == null) {
+            Destination topic = session.createTopic(destinationName);
+            producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        }
+
+        return producer;
+    }
+
+    private void closeJMSResources() {
+        try {
+            if (producer != null) {
+                producer.close();
+                producer = null;
+            }
+            if (session != null) {
+                session.close();
+                session = null;
+            }
+            if (connection != null) {
+                connection.close();
+                connection = null;
+            }
+        } catch (JMSException e) {
+            LOG.debug("Exception caught while closing JMS resources", e);
+            // let's just set all the fields to null so stuff will be re-created
+            producer = null;
+            session = null;
+            connection = null;
+        }
+    }
 }
diff --git a/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml b/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml
index d03848f..77f10fe 100644
--- a/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml
+++ b/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml
@@ -25,7 +25,7 @@
     <!-- Need OSGI JMS Connection Factory Service exposed  -->
     <reference id="jmsConnectionFactory" interface="javax.jms.ConnectionFactory"/>
 
-    <bean id="appender" class="org.apache.servicemix.logging.jms.JMSAppender" init-method="init" destroy-method="close">
+    <bean id="appender" class="org.apache.servicemix.logging.jms.JMSAppender" destroy-method="close">
         <property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
         <property name="destinationName" value="${destinationName}" />
         <property name="format" value="${format}"/>
diff --git a/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java b/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java
index 9da6c05..18acb2e 100644
--- a/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java
+++ b/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java
@@ -17,17 +17,12 @@
 package org.apache.servicemix.logging.jms;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.camel.component.ActiveMQComponent;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.Before;
-import org.junit.Test;
-import org.ops4j.pax.logging.service.internal.PaxLoggingEventImpl;
-import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+import org.junit.*;
 
 import javax.naming.Context;
 
@@ -36,17 +31,30 @@
  */
 public class JMSAppenderTest extends CamelTestSupport {
 
-    private static final String BROKER_URL = "vm://test.broker?broker.persistent=false";
     private static final String EVENTS_TOPIC = "Events";
 
     private JMSAppender appender;
+    private static BrokerService broker;
+
+    @BeforeClass
+    public static void setupBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.setBrokerName("test.broker");
+        broker.start();
+    }
 
     @Before
     public void setupBrokerAndAppender() throws Exception {
         appender = new JMSAppender();
-        appender.setJmsConnectionFactory(new ActiveMQConnectionFactory(BROKER_URL));
+        appender.setJmsConnectionFactory(new ActiveMQConnectionFactory(broker.getVmConnectorURI().toString() + "?create=false"));
         appender.setDestinationName(EVENTS_TOPIC);
-        appender.init();
+    }
+
+    @AfterClass
+    public static void stopBroker() throws Exception {
+        broker.stop();
     }
 
     @Test
@@ -59,20 +67,39 @@
         assertMockEndpointsSatisfied();
     }
 
-    @Override
-    protected Context createJndiContext() throws Exception {
-        Context context = super.createJndiContext();
-        context.bind("amq", ActiveMQComponent.activeMQComponent(BROKER_URL));
-        return context;
+    @Test
+    public void testReconnectToBroker() throws Exception {
+        MockEndpoint events = getMockEndpoint("mock:events");
+        events.expectedMessageCount(2);
+
+        appender.doAppend(MockEvents.createInfoEvent());
+
+        // let's tamper with the underlying JMS connection, causing us to get an exception on the next log event
+        // afterwards, the appender should recover and start logging again automatically
+        appender.getOrCreateConnection().close();
+        appender.doAppend(MockEvents.createInfoEvent());
+
+        appender.doAppend(MockEvents.createInfoEvent());
+
+        assertMockEndpointsSatisfied();
+
+
     }
 
     @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
+    protected Context createJndiContext() throws Exception {
+        Context context = super.createJndiContext();
+        context.bind("amq", ActiveMQComponent.activeMQComponent(broker.getVmConnectorURI().toString() + "?create=false"));
+        return context;
+    }
+
+     @Override
+     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("amq:topic://" + EVENTS_TOPIC).to("mock:events");
             }
         };
-    }
+     }
 }