blob: 99a89e05951467f96eae189759771bad7dcdbe25 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.event;
import java.util.Properties;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.openjpa.lib.conf.Configurable;
import org.apache.openjpa.lib.conf.GenericConfigurable;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.lib.util.Options;
import org.apache.openjpa.util.OpenJPAException;
import org.apache.openjpa.util.UserException;
/**
* JMS-based implementation of {@link RemoteCommitProvider} that
* listens for object modifications and propagates those changes to
* other RemoteCommitProviders over a JMS topic.
*
* @author Patrick Linskey
* @since 0.2.5.0
*/
public class JMSRemoteCommitProvider
extends AbstractRemoteCommitProvider
implements Configurable, GenericConfigurable, ExceptionListener {
private static Localizer s_loc = Localizer.forPackage
(JMSRemoteCommitProvider.class);
private String _topicName = "topic/OpenJPACommitProviderTopic";
private String _tcfName = "java:/ConnectionFactory";
private Properties _ctxProps = null;
private int _reconnectAttempts = 0;
private TopicConnection _connection;
private TopicSession _session;
private TopicPublisher _publisher;
private ClassLoader _appClassLoader;
/**
* Sets the JMS Topic name. Defaults to
* <code>topic/OpenJPACommitProviderTopic</code>.
*/
public void setTopic(String name) {
_topicName = name;
}
/**
* Sets the JMS TopicConnectionFactory name. Defaults to
* <code>java:/ConnectionFactory</code>.
*/
public void setTopicConnectionFactory(String name) {
_tcfName = name;
}
/**
* The number of times to attempt to reconnect after a JMS send exception
* is detected. Defaults to 0, meaning no attempt to reconnect is made;
* the exception is logged and ignored.
*/
public void setExceptionReconnectAttempts(int attempts) {
_reconnectAttempts = attempts;
}
/**
* Set a map of properties to pass to the {@link InitialContext}
* constructor for JNDI lookups. Implementation of
* {@link GenericConfigurable}.
*/
@Override
public void setInto(Options opts) {
if (opts != null && !opts.isEmpty()) {
_ctxProps = new Properties();
_ctxProps.putAll(opts);
opts.clear();
} else
_ctxProps = null;
}
/**
* Returns a new {@link Context} object for use by this provider.
*/
protected Context newContext()
throws NamingException {
if (_ctxProps == null)
return new InitialContext();
return new InitialContext(_ctxProps);
}
// ---------- RemoteCommitProvider implementation ----------
@Override
public void broadcast(RemoteCommitEvent event) {
try {
_publisher.publish(createMessage(event));
if (log.isTraceEnabled())
log.trace(s_loc.get("jms-sent-update", _topicName));
} catch (JMSException jmse) {
if (log.isWarnEnabled())
log.warn(s_loc.get("jms-send-error", _topicName), jmse);
}
}
@Override
public void close() {
try {
if (_connection != null) {
_connection.close();
if (log.isInfoEnabled())
log.info(s_loc.get("jms-close-listener", _topicName));
}
}
catch (JMSException jmse) {
if (log.isWarnEnabled())
log.warn(s_loc.get("jms-close-error", _topicName), jmse);
} finally {
_connection = null;
}
}
// ---------- Configurable implementation ----------
/**
* Subclasses that need to perform actions in
* {@link Configurable#endConfiguration} must invoke this method.
*/
@Override
public void endConfiguration() {
super.endConfiguration();
_appClassLoader = Thread.currentThread().getContextClassLoader();
connect();
}
protected void connect() {
try {
Context ctx = newContext();
TopicConnectionFactory tcf =
(TopicConnectionFactory) ctx.lookup(_tcfName);
Topic topic = (Topic) ctx.lookup(_topicName);
ctx.close();
_connection = tcf.createTopicConnection();
// false == not transacted.
_session = _connection.createTopicSession
(false, Session.AUTO_ACKNOWLEDGE);
// create a publisher
_publisher = _session.createPublisher(topic);
// create a subscriber.
TopicSubscriber s = _session.createSubscriber(topic, null,
/* noLocal: */ true);
MessageListener l = getMessageListener();
s.setMessageListener(l);
_connection.start();
_connection.setExceptionListener(this);
if (log.isInfoEnabled())
log.info(s_loc.get("jms-start-listener", _topicName));
} catch (OpenJPAException ke) {
throw ke;
} catch (Exception e) {
throw new UserException(s_loc.get("jms-provider-config",
_topicName, _tcfName), e).setFatal(true);
}
}
/* *
* Returns a {@link javax.jms.MessageListener} capable of
* understanding and processing messages created by {@link #createMessage}.
* The listener returned by this method is responsible for
* notifying the provider that a remote event has been received.
*/
protected MessageListener getMessageListener() {
return new MessageListener() {
@Override
public void onMessage(Message m) {
if (!(m instanceof ObjectMessage)) {
if (log.isWarnEnabled())
log.warn(s_loc.get("jms-receive-error-3",
_topicName, m.getClass().getName()));
return;
}
ClassLoader saveCls = Thread.currentThread()
.getContextClassLoader();
try {
if (saveCls != _appClassLoader)
Thread.currentThread().setContextClassLoader(
_appClassLoader);
ObjectMessage om = (ObjectMessage) m;
Object o = om.getObject();
if (o instanceof RemoteCommitEvent) {
if (log.isTraceEnabled())
log.trace(s_loc.get("jms-received-update",
_topicName));
RemoteCommitEvent rce = (RemoteCommitEvent) o;
fireEvent(rce);
} else {
if (log.isWarnEnabled())
log.warn(s_loc.get("jms-receive-error-2",
o.getClass().getName(), _topicName));
}
} catch (JMSException jmse) {
if (log.isWarnEnabled())
log.warn(s_loc.get("jms-receive-error-1"), jmse);
} finally {
if (saveCls != _appClassLoader)
Thread.currentThread().setContextClassLoader(saveCls);
}
}
};
}
/**
* Returns a new {@link Message} to send to the topic. This
* implementation creates an {@link ObjectMessage}.
*/
protected Message createMessage(RemoteCommitEvent event)
throws JMSException {
return _session.createObjectMessage(event);
}
@Override
public void onException(JMSException ex) {
if (log.isWarnEnabled())
log.warn(s_loc.get("jms-listener-error", _topicName), ex);
if (_reconnectAttempts <= 0)
return;
close();
boolean connected = false;
for (int i = 0; !connected && i < _reconnectAttempts; i++) {
try {
if (log.isInfoEnabled())
log.info(s_loc.get("jms-reconnect-attempt", _topicName,
String.valueOf(i + 1)));
connect();
connected = true;
} catch (Exception e) {
if (log.isInfoEnabled())
log.info(s_loc.get("jms-reconnect-fail", _topicName), e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
break;
}
}
}
if (!connected && log.isErrorEnabled())
log.error(s_loc.get("jms-cant-reconnect", _topicName,
String.valueOf(_reconnectAttempts)));
else if (connected && log.isInfoEnabled())
log.info(s_loc.get("jms-reconnected", _topicName));
}
}