| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.network; |
| |
| import java.io.IOException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.activemq.Service; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.command.BrokerId; |
| import org.apache.activemq.command.BrokerInfo; |
| import org.apache.activemq.command.Command; |
| import org.apache.activemq.command.ConnectionId; |
| import org.apache.activemq.command.ConnectionInfo; |
| import org.apache.activemq.command.ConsumerInfo; |
| import org.apache.activemq.command.ExceptionResponse; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.MessageDispatch; |
| import org.apache.activemq.command.ProducerInfo; |
| import org.apache.activemq.command.Response; |
| import org.apache.activemq.command.SessionInfo; |
| import org.apache.activemq.command.ShutdownInfo; |
| import org.apache.activemq.transport.DefaultTransportListener; |
| import org.apache.activemq.transport.FutureResponse; |
| import org.apache.activemq.transport.ResponseCallback; |
| import org.apache.activemq.transport.Transport; |
| import org.apache.activemq.util.IdGenerator; |
| import org.apache.activemq.util.ServiceStopper; |
| import org.apache.activemq.util.ServiceSupport; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Forwards all messages from the local broker to the remote broker. |
| * |
| * @org.apache.xbean.XBean |
| * |
| * |
| */ |
| public class ForwardingBridge implements Service { |
| |
| private static final IdGenerator ID_GENERATOR = new IdGenerator(); |
| private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class); |
| |
| final AtomicLong enqueueCounter = new AtomicLong(); |
| final AtomicLong dequeueCounter = new AtomicLong(); |
| ConnectionInfo connectionInfo; |
| SessionInfo sessionInfo; |
| ProducerInfo producerInfo; |
| ConsumerInfo queueConsumerInfo; |
| ConsumerInfo topicConsumerInfo; |
| BrokerId localBrokerId; |
| BrokerId remoteBrokerId; |
| BrokerInfo localBrokerInfo; |
| BrokerInfo remoteBrokerInfo; |
| |
| private final Transport localBroker; |
| private final Transport remoteBroker; |
| private String clientId; |
| private int prefetchSize = 1000; |
| private boolean dispatchAsync; |
| private String destinationFilter = ">"; |
| private NetworkBridgeListener bridgeFailedListener; |
| |
| public ForwardingBridge(Transport localBroker, Transport remoteBroker) { |
| this.localBroker = localBroker; |
| this.remoteBroker = remoteBroker; |
| } |
| |
| public void start() throws Exception { |
| LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker |
| + " has been established."); |
| |
| localBroker.setTransportListener(new DefaultTransportListener() { |
| public void onCommand(Object o) { |
| Command command = (Command)o; |
| serviceLocalCommand(command); |
| } |
| |
| public void onException(IOException error) { |
| serviceLocalException(error); |
| } |
| }); |
| |
| remoteBroker.setTransportListener(new DefaultTransportListener() { |
| public void onCommand(Object o) { |
| Command command = (Command)o; |
| serviceRemoteCommand(command); |
| } |
| |
| public void onException(IOException error) { |
| serviceRemoteException(error); |
| } |
| }); |
| |
| localBroker.start(); |
| remoteBroker.start(); |
| } |
| |
| protected void triggerStartBridge() throws IOException { |
| Thread thead = new Thread() { |
| public void run() { |
| try { |
| startBridge(); |
| } catch (IOException e) { |
| LOG.error("Failed to start network bridge: " + e, e); |
| } |
| } |
| }; |
| thead.start(); |
| } |
| |
| /** |
| * @throws IOException |
| */ |
| final void startBridge() throws IOException { |
| connectionInfo = new ConnectionInfo(); |
| connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId())); |
| connectionInfo.setClientId(clientId); |
| localBroker.oneway(connectionInfo); |
| remoteBroker.oneway(connectionInfo); |
| |
| sessionInfo = new SessionInfo(connectionInfo, 1); |
| localBroker.oneway(sessionInfo); |
| remoteBroker.oneway(sessionInfo); |
| |
| queueConsumerInfo = new ConsumerInfo(sessionInfo, 1); |
| queueConsumerInfo.setDispatchAsync(dispatchAsync); |
| queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter)); |
| queueConsumerInfo.setPrefetchSize(prefetchSize); |
| queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); |
| localBroker.oneway(queueConsumerInfo); |
| |
| producerInfo = new ProducerInfo(sessionInfo, 1); |
| producerInfo.setResponseRequired(false); |
| remoteBroker.oneway(producerInfo); |
| |
| if (connectionInfo.getClientId() != null) { |
| topicConsumerInfo = new ConsumerInfo(sessionInfo, 2); |
| topicConsumerInfo.setDispatchAsync(dispatchAsync); |
| topicConsumerInfo.setSubscriptionName("topic-bridge"); |
| topicConsumerInfo.setRetroactive(true); |
| topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter)); |
| topicConsumerInfo.setPrefetchSize(prefetchSize); |
| topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); |
| localBroker.oneway(topicConsumerInfo); |
| } |
| LOG.info("Network connection between " + localBroker + " and " + remoteBroker |
| + " has been established."); |
| } |
| |
| public void stop() throws Exception { |
| try { |
| if (connectionInfo != null) { |
| localBroker.request(connectionInfo.createRemoveCommand()); |
| remoteBroker.request(connectionInfo.createRemoveCommand()); |
| } |
| localBroker.setTransportListener(null); |
| remoteBroker.setTransportListener(null); |
| localBroker.oneway(new ShutdownInfo()); |
| remoteBroker.oneway(new ShutdownInfo()); |
| } finally { |
| ServiceStopper ss = new ServiceStopper(); |
| ss.stop(localBroker); |
| ss.stop(remoteBroker); |
| ss.throwFirstException(); |
| } |
| } |
| |
| public void serviceRemoteException(Throwable error) { |
| LOG.info("Unexpected remote exception: " + error); |
| LOG.debug("Exception trace: ", error); |
| } |
| |
| protected void serviceRemoteCommand(Command command) { |
| try { |
| if (command.isBrokerInfo()) { |
| synchronized (this) { |
| remoteBrokerInfo = (BrokerInfo)command; |
| remoteBrokerId = remoteBrokerInfo.getBrokerId(); |
| if (localBrokerId != null) { |
| if (localBrokerId.equals(remoteBrokerId)) { |
| LOG.info("Disconnecting loop back connection."); |
| ServiceSupport.dispose(this); |
| } else { |
| triggerStartBridge(); |
| } |
| } |
| } |
| } else { |
| LOG.warn("Unexpected remote command: " + command); |
| } |
| } catch (IOException e) { |
| serviceLocalException(e); |
| } |
| } |
| |
| public void serviceLocalException(Throwable error) { |
| LOG.info("Unexpected local exception: " + error); |
| LOG.debug("Exception trace: ", error); |
| fireBridgeFailed(); |
| } |
| |
| protected void serviceLocalCommand(Command command) { |
| try { |
| if (command.isMessageDispatch()) { |
| |
| enqueueCounter.incrementAndGet(); |
| |
| final MessageDispatch md = (MessageDispatch)command; |
| Message message = md.getMessage(); |
| message.setProducerId(producerInfo.getProducerId()); |
| |
| if (message.getOriginalTransactionId() == null) { |
| message.setOriginalTransactionId(message.getTransactionId()); |
| } |
| message.setTransactionId(null); |
| |
| if (!message.isResponseRequired()) { |
| // If the message was originally sent using async send, we |
| // will preserve that QOS |
| // by bridging it using an async send (small chance of |
| // message loss). |
| remoteBroker.oneway(message); |
| dequeueCounter.incrementAndGet(); |
| localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); |
| |
| } else { |
| |
| // The message was not sent using async send, so we should |
| // only ack the local |
| // broker when we get confirmation that the remote broker |
| // has received the message. |
| ResponseCallback callback = new ResponseCallback() { |
| public void onCompletion(FutureResponse future) { |
| try { |
| Response response = future.getResult(); |
| if (response.isException()) { |
| ExceptionResponse er = (ExceptionResponse)response; |
| serviceLocalException(er.getException()); |
| } else { |
| dequeueCounter.incrementAndGet(); |
| localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); |
| } |
| } catch (IOException e) { |
| serviceLocalException(e); |
| } |
| } |
| }; |
| |
| remoteBroker.asyncRequest(message, callback); |
| } |
| |
| // Ack on every message since we don't know if the broker is |
| // blocked due to memory |
| // usage and is waiting for an Ack to un-block him. |
| |
| // Acking a range is more efficient, but also more prone to |
| // locking up a server |
| // Perhaps doing something like the following should be policy |
| // based. |
| // if( |
| // md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) |
| // ) { |
| // queueDispatched++; |
| // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) |
| // ) { |
| // localBroker.oneway(new MessageAck(md, |
| // MessageAck.STANDARD_ACK_TYPE, queueDispatched)); |
| // queueDispatched=0; |
| // } |
| // } else { |
| // topicDispatched++; |
| // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) |
| // ) { |
| // localBroker.oneway(new MessageAck(md, |
| // MessageAck.STANDARD_ACK_TYPE, topicDispatched)); |
| // topicDispatched=0; |
| // } |
| // } |
| } else if (command.isBrokerInfo()) { |
| synchronized (this) { |
| localBrokerInfo = (BrokerInfo)command; |
| localBrokerId = localBrokerInfo.getBrokerId(); |
| if (remoteBrokerId != null) { |
| if (remoteBrokerId.equals(localBrokerId)) { |
| LOG.info("Disconnecting loop back connection."); |
| ServiceSupport.dispose(this); |
| } else { |
| triggerStartBridge(); |
| } |
| } |
| } |
| } else { |
| LOG.debug("Unexpected local command: " + command); |
| } |
| } catch (IOException e) { |
| serviceLocalException(e); |
| } |
| } |
| |
| public String getClientId() { |
| return clientId; |
| } |
| |
| public void setClientId(String clientId) { |
| this.clientId = clientId; |
| } |
| |
| public int getPrefetchSize() { |
| return prefetchSize; |
| } |
| |
| public void setPrefetchSize(int prefetchSize) { |
| this.prefetchSize = prefetchSize; |
| } |
| |
| public boolean isDispatchAsync() { |
| return dispatchAsync; |
| } |
| |
| public void setDispatchAsync(boolean dispatchAsync) { |
| this.dispatchAsync = dispatchAsync; |
| } |
| |
| public String getDestinationFilter() { |
| return destinationFilter; |
| } |
| |
| public void setDestinationFilter(String destinationFilter) { |
| this.destinationFilter = destinationFilter; |
| } |
| |
| public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) { |
| this.bridgeFailedListener = listener; |
| } |
| |
| private void fireBridgeFailed() { |
| NetworkBridgeListener l = this.bridgeFailedListener; |
| if (l != null) { |
| l.bridgeFailed(); |
| } |
| } |
| |
| public String getRemoteAddress() { |
| return remoteBroker.getRemoteAddress(); |
| } |
| |
| public String getLocalAddress() { |
| return localBroker.getRemoteAddress(); |
| } |
| |
| public String getLocalBrokerName() { |
| return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); |
| } |
| |
| public String getRemoteBrokerName() { |
| return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); |
| } |
| |
| public long getDequeueCounter() { |
| return dequeueCounter.get(); |
| } |
| |
| public long getEnqueueCounter() { |
| return enqueueCounter.get(); |
| } |
| |
| } |