SMX4-1205: Add reconnect behavior to JMS appender implementation
git-svn-id: https://svn.apache.org/repos/asf/servicemix/smx4/features/trunk@1393357 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java b/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java
index 7d3446e..ccc2006 100644
--- a/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java
+++ b/osgi-logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java
@@ -34,52 +34,25 @@
private ConnectionFactory jmsConnectionFactory;
private Connection connection;
private Session session;
- private MessageProducer publisher;
- private Topic topic;
+ private MessageProducer producer;
private String destinationName;
private LoggingEventFormat format = new DefaultLoggingEventFormat();
-
-
- public void init() {
- /*
- * Create connection. Create session from connection; false means
- * session is not transacted.
- */
- try {
- connection = jmsConnectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = session.createTopic(destinationName);
- publisher = session.createProducer(topic);
- publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- LOG.debug("Connection created with ActiveMQ for JMS Pax Appender.");
-
- } catch (JMSException e) {
- LOG.error(e.getMessage());
- }
- }
-
public void close() {
- if (connection != null) {
- try {
- connection.close();
- LOG.debug("Connection closed with ActiveMQ for JMS Pax Appender.");
- } catch (JMSException e) {
- LOG.error(e.getMessage());
- }
- }
+ closeJMSResources();
}
public void doAppend(PaxLoggingEvent paxLoggingEvent) {
try {
// Send message to the destination
- TextMessage message = session.createTextMessage();
+ TextMessage message = getOrCreateSession().createTextMessage();
message.setText(format.toString(paxLoggingEvent));
- publisher.send(message);
+ getOrCreatePublisher().send(message);
} catch (JMSException e) {
- e.printStackTrace();
+ LOG.warn("Exception caught while sending log event - reinitializing JMS resources to recover", e);
+ closeJMSResources();
+
}
}
@@ -98,4 +71,51 @@
format = new DefaultLoggingEventFormat();
}
}
+
+ protected Connection getOrCreateConnection() throws JMSException {
+ if (connection == null) {
+ connection = jmsConnectionFactory.createConnection();
+ }
+ return connection;
+ }
+
+ protected Session getOrCreateSession() throws JMSException {
+ if (session == null) {
+ session = getOrCreateConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ return session;
+ }
+
+ protected MessageProducer getOrCreatePublisher() throws JMSException {
+ if (producer == null) {
+ Destination topic = session.createTopic(destinationName);
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+
+ return producer;
+ }
+
+ private void closeJMSResources() {
+ try {
+ if (producer != null) {
+ producer.close();
+ producer = null;
+ }
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ } catch (JMSException e) {
+ LOG.debug("Exception caught while closing JMS resources", e);
+ // let's just set all the fields to null so stuff will be re-created
+ producer = null;
+ session = null;
+ connection = null;
+ }
+ }
}
diff --git a/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml b/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml
index d03848f..77f10fe 100644
--- a/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml
+++ b/osgi-logging/jms-appender/src/main/resources/OSGI-INF/blueprint/config.xml
@@ -25,7 +25,7 @@
<!-- Need OSGI JMS Connection Factory Service exposed -->
<reference id="jmsConnectionFactory" interface="javax.jms.ConnectionFactory"/>
- <bean id="appender" class="org.apache.servicemix.logging.jms.JMSAppender" init-method="init" destroy-method="close">
+ <bean id="appender" class="org.apache.servicemix.logging.jms.JMSAppender" destroy-method="close">
<property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
<property name="destinationName" value="${destinationName}" />
<property name="format" value="${format}"/>
diff --git a/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java b/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java
index 9da6c05..18acb2e 100644
--- a/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java
+++ b/osgi-logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java
@@ -17,17 +17,12 @@
package org.apache.servicemix.logging.jms;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.Before;
-import org.junit.Test;
-import org.ops4j.pax.logging.service.internal.PaxLoggingEventImpl;
-import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+import org.junit.*;
import javax.naming.Context;
@@ -36,17 +31,30 @@
*/
public class JMSAppenderTest extends CamelTestSupport {
- private static final String BROKER_URL = "vm://test.broker?broker.persistent=false";
private static final String EVENTS_TOPIC = "Events";
private JMSAppender appender;
+ private static BrokerService broker;
+
+ @BeforeClass
+ public static void setupBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setBrokerName("test.broker");
+ broker.start();
+ }
@Before
public void setupBrokerAndAppender() throws Exception {
appender = new JMSAppender();
- appender.setJmsConnectionFactory(new ActiveMQConnectionFactory(BROKER_URL));
+ appender.setJmsConnectionFactory(new ActiveMQConnectionFactory(broker.getVmConnectorURI().toString() + "?create=false"));
appender.setDestinationName(EVENTS_TOPIC);
- appender.init();
+ }
+
+ @AfterClass
+ public static void stopBroker() throws Exception {
+ broker.stop();
}
@Test
@@ -59,20 +67,39 @@
assertMockEndpointsSatisfied();
}
- @Override
- protected Context createJndiContext() throws Exception {
- Context context = super.createJndiContext();
- context.bind("amq", ActiveMQComponent.activeMQComponent(BROKER_URL));
- return context;
+ @Test
+ public void testReconnectToBroker() throws Exception {
+ MockEndpoint events = getMockEndpoint("mock:events");
+ events.expectedMessageCount(2);
+
+ appender.doAppend(MockEvents.createInfoEvent());
+
+ // let's tamper with the underlying JMS connection, causing us to get an exception on the next log event
+ // afterwards, the appender should recover and start logging again automatically
+ appender.getOrCreateConnection().close();
+ appender.doAppend(MockEvents.createInfoEvent());
+
+ appender.doAppend(MockEvents.createInfoEvent());
+
+ assertMockEndpointsSatisfied();
+
+
}
@Override
- protected RouteBuilder createRouteBuilder() throws Exception {
+ protected Context createJndiContext() throws Exception {
+ Context context = super.createJndiContext();
+ context.bind("amq", ActiveMQComponent.activeMQComponent(broker.getVmConnectorURI().toString() + "?create=false"));
+ return context;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("amq:topic://" + EVENTS_TOPIC).to("mock:events");
}
};
- }
+ }
}