blob: 7c87d2a5a2526d9c9a747c9dc7ded3b7baed02da [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Bridge to other JMS Topic providers
*
* @org.apache.xbean.XBean
*
*
*/
public class JmsTopicConnector extends JmsConnector {
private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
private String outboundTopicConnectionFactoryName;
private String localConnectionFactoryName;
private TopicConnectionFactory outboundTopicConnectionFactory;
private TopicConnectionFactory localTopicConnectionFactory;
private TopicConnection outboundTopicConnection;
private TopicConnection localTopicConnection;
private InboundTopicBridge[] inboundTopicBridges;
private OutboundTopicBridge[] outboundTopicBridges;
public boolean init() {
boolean result = super.init();
if (result) {
try {
initializeForeignTopicConnection();
initializeLocalTopicConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundTopicBridges();
initializeOutboundTopicBridges();
} catch (Exception e) {
LOG.error("Failed to initialize the JMSConnector", e);
}
}
return result;
}
/**
* @return Returns the inboundTopicBridges.
*/
public InboundTopicBridge[] getInboundTopicBridges() {
return inboundTopicBridges;
}
/**
* @param inboundTopicBridges The inboundTopicBridges to set.
*/
public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
this.inboundTopicBridges = inboundTopicBridges;
}
/**
* @return Returns the outboundTopicBridges.
*/
public OutboundTopicBridge[] getOutboundTopicBridges() {
return outboundTopicBridges;
}
/**
* @param outboundTopicBridges The outboundTopicBridges to set.
*/
public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
this.outboundTopicBridges = outboundTopicBridges;
}
/**
* @return Returns the localTopicConnectionFactory.
*/
public TopicConnectionFactory getLocalTopicConnectionFactory() {
return localTopicConnectionFactory;
}
/**
* @param localTopicConnectionFactory The localTopicConnectionFactory to
* set.
*/
public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
this.localTopicConnectionFactory = localConnectionFactory;
}
/**
* @return Returns the outboundTopicConnectionFactory.
*/
public TopicConnectionFactory getOutboundTopicConnectionFactory() {
return outboundTopicConnectionFactory;
}
/**
* @return Returns the outboundTopicConnectionFactoryName.
*/
public String getOutboundTopicConnectionFactoryName() {
return outboundTopicConnectionFactoryName;
}
/**
* @param outboundTopicConnectionFactoryName The
* outboundTopicConnectionFactoryName to set.
*/
public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
}
/**
* @return Returns the localConnectionFactoryName.
*/
public String getLocalConnectionFactoryName() {
return localConnectionFactoryName;
}
/**
* @param localConnectionFactoryName The localConnectionFactoryName to set.
*/
public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
this.localConnectionFactoryName = localConnectionFactoryName;
}
/**
* @return Returns the localTopicConnection.
*/
public TopicConnection getLocalTopicConnection() {
return localTopicConnection;
}
/**
* @param localTopicConnection The localTopicConnection to set.
*/
public void setLocalTopicConnection(TopicConnection localTopicConnection) {
this.localTopicConnection = localTopicConnection;
}
/**
* @return Returns the outboundTopicConnection.
*/
public TopicConnection getOutboundTopicConnection() {
return outboundTopicConnection;
}
/**
* @param outboundTopicConnection The outboundTopicConnection to set.
*/
public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
this.outboundTopicConnection = foreignTopicConnection;
}
/**
* @param outboundTopicConnectionFactory The outboundTopicConnectionFactory
* to set.
*/
public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
}
public void restartProducerConnection() throws NamingException, JMSException {
outboundTopicConnection = null;
initializeForeignTopicConnection();
}
protected void initializeForeignTopicConnection() throws NamingException, JMSException {
if (outboundTopicConnection == null) {
// get the connection factories
if (outboundTopicConnectionFactory == null) {
// look it up from JNDI
if (outboundTopicConnectionFactoryName != null) {
outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
.lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
if (outboundUsername != null) {
outboundTopicConnection = outboundTopicConnectionFactory
.createTopicConnection(outboundUsername, outboundPassword);
} else {
outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
}
} else {
throw new JMSException("Cannot create localConnection - no information");
}
} else {
if (outboundUsername != null) {
outboundTopicConnection = outboundTopicConnectionFactory
.createTopicConnection(outboundUsername, outboundPassword);
} else {
outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
}
}
}
if (localClientId != null && localClientId.length() > 0) {
outboundTopicConnection.setClientID(getOutboundClientId());
}
outboundTopicConnection.start();
}
protected void initializeLocalTopicConnection() throws NamingException, JMSException {
if (localTopicConnection == null) {
// get the connection factories
if (localTopicConnectionFactory == null) {
if (embeddedConnectionFactory == null) {
// look it up from JNDI
if (localConnectionFactoryName != null) {
localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
.lookup(localConnectionFactoryName, TopicConnectionFactory.class);
if (localUsername != null) {
localTopicConnection = localTopicConnectionFactory
.createTopicConnection(localUsername, localPassword);
} else {
localTopicConnection = localTopicConnectionFactory.createTopicConnection();
}
} else {
throw new JMSException("Cannot create localConnection - no information");
}
} else {
localTopicConnection = embeddedConnectionFactory.createTopicConnection();
}
} else {
if (localUsername != null) {
localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername,
localPassword);
} else {
localTopicConnection = localTopicConnectionFactory.createTopicConnection();
}
}
}
if (localClientId != null && localClientId.length() > 0) {
localTopicConnection.setClientID(getLocalClientId());
}
localTopicConnection.start();
}
protected void initializeInboundJmsMessageConvertor() {
inboundMessageConvertor.setConnection(localTopicConnection);
}
protected void initializeOutboundJmsMessageConvertor() {
outboundMessageConvertor.setConnection(outboundTopicConnection);
}
protected void initializeInboundTopicBridges() throws JMSException {
if (inboundTopicBridges != null) {
TopicSession outboundSession = outboundTopicConnection
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSession localSession = localTopicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < inboundTopicBridges.length; i++) {
InboundTopicBridge bridge = inboundTopicBridges[i];
String localTopicName = bridge.getLocalTopicName();
Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
String topicName = bridge.getInboundTopicName();
Topic foreignTopic = createForeignTopic(outboundSession, topicName);
bridge.setConsumerTopic(foreignTopic);
bridge.setProducerTopic(activemqTopic);
bridge.setProducerConnection(localTopicConnection);
bridge.setConsumerConnection(outboundTopicConnection);
if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
addInboundBridge(bridge);
}
outboundSession.close();
localSession.close();
}
}
protected void initializeOutboundTopicBridges() throws JMSException {
if (outboundTopicBridges != null) {
TopicSession outboundSession = outboundTopicConnection
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSession localSession = localTopicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < outboundTopicBridges.length; i++) {
OutboundTopicBridge bridge = outboundTopicBridges[i];
String localTopicName = bridge.getLocalTopicName();
Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
String topicName = bridge.getOutboundTopicName();
Topic foreignTopic = createForeignTopic(outboundSession, topicName);
bridge.setConsumerTopic(activemqTopic);
bridge.setProducerTopic(foreignTopic);
bridge.setProducerConnection(outboundTopicConnection);
bridge.setConsumerConnection(localTopicConnection);
if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
addOutboundBridge(bridge);
}
outboundSession.close();
localSession.close();
}
}
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
Connection replyToConsumerConnection) {
Topic replyToProducerTopic = (Topic)destination;
boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
if (isInbound) {
InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
if (bridge == null) {
bridge = new InboundTopicBridge() {
protected Destination processReplyToDestination(Destination destination) {
return null;
}
};
try {
TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
replyToConsumerSession.close();
bridge.setConsumerTopic(replyToConsumerTopic);
bridge.setProducerTopic(replyToProducerTopic);
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
LOG.info("Created replyTo bridge for " + replyToProducerTopic);
} catch (Exception e) {
LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
return null;
}
replyToBridges.put(replyToProducerTopic, bridge);
}
return bridge.getConsumerTopic();
} else {
OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
if (bridge == null) {
bridge = new OutboundTopicBridge() {
protected Destination processReplyToDestination(Destination destination) {
return null;
}
};
try {
TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
replyToConsumerSession.close();
bridge.setConsumerTopic(replyToConsumerTopic);
bridge.setProducerTopic(replyToProducerTopic);
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
LOG.info("Created replyTo bridge for " + replyToProducerTopic);
} catch (Exception e) {
LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
return null;
}
replyToBridges.put(replyToProducerTopic, bridge);
}
return bridge.getConsumerTopic();
}
}
protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
return session.createTopic(topicName);
}
protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
Topic result = null;
try {
result = session.createTopic(topicName);
} catch (JMSException e) {
// look-up the Topic
try {
result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
} catch (NamingException e1) {
String errStr = "Failed to look-up Topic for name: " + topicName;
LOG.error(errStr, e);
JMSException jmsEx = new JMSException(errStr);
jmsEx.setLinkedException(e1);
throw jmsEx;
}
}
return result;
}
}