| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.network; |
| |
| import java.io.IOException; |
| import java.security.GeneralSecurityException; |
| import java.security.cert.X509Certificate; |
| import java.util.*; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.activemq.Service; |
| import org.apache.activemq.advisory.AdvisorySupport; |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.activemq.broker.BrokerServiceAware; |
| import org.apache.activemq.broker.TransportConnection; |
| import org.apache.activemq.broker.region.AbstractRegion; |
| import org.apache.activemq.broker.region.RegionBroker; |
| import org.apache.activemq.broker.region.Subscription; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQMessage; |
| import org.apache.activemq.command.ActiveMQTempDestination; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.command.BrokerId; |
| import org.apache.activemq.command.BrokerInfo; |
| import org.apache.activemq.command.Command; |
| import org.apache.activemq.command.ConnectionError; |
| import org.apache.activemq.command.ConnectionId; |
| import org.apache.activemq.command.ConnectionInfo; |
| import org.apache.activemq.command.ConsumerId; |
| import org.apache.activemq.command.ConsumerInfo; |
| import org.apache.activemq.command.DataStructure; |
| import org.apache.activemq.command.DestinationInfo; |
| import org.apache.activemq.command.ExceptionResponse; |
| import org.apache.activemq.command.KeepAliveInfo; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.MessageDispatch; |
| import org.apache.activemq.command.NetworkBridgeFilter; |
| import org.apache.activemq.command.ProducerInfo; |
| import org.apache.activemq.command.RemoveInfo; |
| import org.apache.activemq.command.Response; |
| import org.apache.activemq.command.SessionInfo; |
| import org.apache.activemq.command.ShutdownInfo; |
| import org.apache.activemq.command.WireFormatInfo; |
| import org.apache.activemq.filter.DestinationFilter; |
| import org.apache.activemq.filter.MessageEvaluationContext; |
| import org.apache.activemq.thread.DefaultThreadPools; |
| import org.apache.activemq.thread.TaskRunnerFactory; |
| import org.apache.activemq.transport.DefaultTransportListener; |
| import org.apache.activemq.transport.FutureResponse; |
| import org.apache.activemq.transport.ResponseCallback; |
| import org.apache.activemq.transport.Transport; |
| import org.apache.activemq.transport.TransportDisposedIOException; |
| import org.apache.activemq.transport.TransportFilter; |
| import org.apache.activemq.transport.TransportListener; |
| import org.apache.activemq.transport.tcp.SslTransport; |
| import org.apache.activemq.util.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.slf4j.MDC; |
| |
| /** |
| * A useful base class for implementing demand forwarding bridges. |
| * |
| * |
| */ |
| public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { |
| private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); |
| private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory(); |
| protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; |
| protected final Transport localBroker; |
| protected final Transport remoteBroker; |
| protected final IdGenerator idGenerator = new IdGenerator(); |
| protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); |
| protected ConnectionInfo localConnectionInfo; |
| protected ConnectionInfo remoteConnectionInfo; |
| protected SessionInfo localSessionInfo; |
| protected ProducerInfo producerInfo; |
| protected String remoteBrokerName = "Unknown"; |
| protected String localClientId; |
| protected ConsumerInfo demandConsumerInfo; |
| protected int demandConsumerDispatched; |
| protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); |
| protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); |
| protected AtomicBoolean disposed = new AtomicBoolean(); |
| protected BrokerId localBrokerId; |
| protected ActiveMQDestination[] excludedDestinations; |
| protected ActiveMQDestination[] dynamicallyIncludedDestinations; |
| protected ActiveMQDestination[] staticallyIncludedDestinations; |
| protected ActiveMQDestination[] durableDestinations; |
| protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); |
| protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); |
| protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; |
| protected CountDownLatch startedLatch = new CountDownLatch(2); |
| protected CountDownLatch localStartedLatch = new CountDownLatch(1); |
| protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1); |
| protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1); |
| protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); |
| protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); |
| protected NetworkBridgeConfiguration configuration; |
| |
| final AtomicLong enqueueCounter = new AtomicLong(); |
| final AtomicLong dequeueCounter = new AtomicLong(); |
| |
| private NetworkBridgeListener networkBridgeListener; |
| private boolean createdByDuplex; |
| private BrokerInfo localBrokerInfo; |
| private BrokerInfo remoteBrokerInfo; |
| |
| private final AtomicBoolean started = new AtomicBoolean(); |
| private TransportConnection duplexInitiatingConnection; |
| private BrokerService brokerService = null; |
| |
| public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { |
| this.configuration = configuration; |
| this.localBroker = localBroker; |
| this.remoteBroker = remoteBroker; |
| } |
| |
| public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { |
| this.localBrokerInfo = localBrokerInfo; |
| this.remoteBrokerInfo = remoteBrokerInfo; |
| this.duplexInitiatingConnection = connection; |
| start(); |
| serviceRemoteCommand(remoteBrokerInfo); |
| } |
| |
| public void start() throws Exception { |
| if (started.compareAndSet(false, true)) { |
| localBroker.setTransportListener(new DefaultTransportListener() { |
| |
| @Override |
| public void onCommand(Object o) { |
| Command command = (Command) o; |
| serviceLocalCommand(command); |
| } |
| |
| @Override |
| public void onException(IOException error) { |
| serviceLocalException(error); |
| } |
| }); |
| remoteBroker.setTransportListener(new TransportListener() { |
| |
| public void onCommand(Object o) { |
| Command command = (Command) o; |
| serviceRemoteCommand(command); |
| } |
| |
| public void onException(IOException error) { |
| serviceRemoteException(error); |
| } |
| |
| public void transportInterupted() { |
| // clear any subscriptions - to try and prevent the bridge |
| // from stalling the broker |
| if (remoteInterupted.compareAndSet(false, true)) { |
| LOG.info("Outbound transport to " + remoteBrokerName + " interrupted."); |
| if (localBridgeStarted.get()) { |
| clearDownSubscriptions(); |
| synchronized (DemandForwardingBridgeSupport.this) { |
| try { |
| localBroker.oneway(localConnectionInfo.createRemoveCommand()); |
| } catch (TransportDisposedIOException td) { |
| LOG.debug("local broker is now disposed", td); |
| } catch (IOException e) { |
| LOG.warn("Caught exception from local start", e); |
| } |
| } |
| } |
| localBridgeStarted.set(false); |
| remoteBridgeStarted.set(false); |
| startedLatch = new CountDownLatch(2); |
| localStartedLatch = new CountDownLatch(1); |
| } |
| } |
| |
| public void transportResumed() { |
| if (remoteInterupted.compareAndSet(true, false)) { |
| // We want to slow down false connects so that we don't |
| // get in a busy loop. |
| // False connects can occurr if you using SSH tunnels. |
| if (!lastConnectSucceeded.get()) { |
| try { |
| LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop."); |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| lastConnectSucceeded.set(false); |
| try { |
| startLocalBridge(); |
| remoteBridgeStarted.set(true); |
| startedLatch.countDown(); |
| LOG.info("Outbound transport to " + remoteBrokerName + " resumed"); |
| } catch (Throwable e) { |
| LOG.error("Caught exception from local start in resume transport", e); |
| serviceLocalException(e); |
| } |
| } |
| } |
| }); |
| |
| localBroker.start(); |
| remoteBroker.start(); |
| if (!disposed.get()) { |
| try { |
| triggerRemoteStartBridge(); |
| } catch (IOException e) { |
| LOG.warn("Caught exception from remote start", e); |
| } |
| } else { |
| LOG.warn ("Bridge was disposed before the start() method was fully executed."); |
| throw new TransportDisposedIOException(); |
| } |
| } |
| } |
| |
| protected void triggerLocalStartBridge() throws IOException { |
| final Map context = MDCHelper.getCopyOfContextMap(); |
| asyncTaskRunner.execute(new Runnable() { |
| public void run() { |
| MDCHelper.setContextMap(context); |
| final String originalName = Thread.currentThread().getName(); |
| Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); |
| try { |
| startLocalBridge(); |
| } catch (Throwable e) { |
| serviceLocalException(e); |
| } finally { |
| Thread.currentThread().setName(originalName); |
| } |
| } |
| }); |
| } |
| |
| protected void triggerRemoteStartBridge() throws IOException { |
| final Map context = MDCHelper.getCopyOfContextMap(); |
| asyncTaskRunner.execute(new Runnable() { |
| public void run() { |
| MDCHelper.setContextMap(context); |
| final String originalName = Thread.currentThread().getName(); |
| Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker); |
| try { |
| startRemoteBridge(); |
| } catch (Exception e) { |
| serviceRemoteException(e); |
| } finally { |
| Thread.currentThread().setName(originalName); |
| } |
| } |
| }); |
| } |
| |
| protected void startLocalBridge() throws Throwable { |
| if (localBridgeStarted.compareAndSet(false, true)) { |
| synchronized (this) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker); |
| } |
| remoteBrokerNameKnownLatch.await(); |
| |
| if (!disposed.get()) { |
| localConnectionInfo = new ConnectionInfo(); |
| localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); |
| localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); |
| localConnectionInfo.setClientId(localClientId); |
| localConnectionInfo.setUserName(configuration.getUserName()); |
| localConnectionInfo.setPassword(configuration.getPassword()); |
| Transport originalTransport = remoteBroker; |
| while (originalTransport instanceof TransportFilter) { |
| originalTransport = ((TransportFilter) originalTransport).getNext(); |
| } |
| if (originalTransport instanceof SslTransport) { |
| X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); |
| localConnectionInfo.setTransportContext(peerCerts); |
| } |
| // sync requests that may fail |
| Object resp = localBroker.request(localConnectionInfo); |
| if (resp instanceof ExceptionResponse) { |
| throw ((ExceptionResponse)resp).getException(); |
| } |
| localSessionInfo = new SessionInfo(localConnectionInfo, 1); |
| localBroker.oneway(localSessionInfo); |
| |
| brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex); |
| NetworkBridgeListener l = this.networkBridgeListener; |
| if (l != null) { |
| l.onStart(this); |
| } |
| LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); |
| |
| } else { |
| LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed."); |
| } |
| startedLatch.countDown(); |
| localStartedLatch.countDown(); |
| if (!disposed.get()) { |
| setupStaticDestinations(); |
| } else { |
| LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment."); |
| } |
| } |
| } |
| } |
| |
| protected void startRemoteBridge() throws Exception { |
| if (remoteBridgeStarted.compareAndSet(false, true)) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker); |
| } |
| synchronized (this) { |
| if (!isCreatedByDuplex()) { |
| BrokerInfo brokerInfo = new BrokerInfo(); |
| brokerInfo.setBrokerName(configuration.getBrokerName()); |
| brokerInfo.setBrokerURL(configuration.getBrokerURL()); |
| brokerInfo.setNetworkConnection(true); |
| brokerInfo.setDuplexConnection(configuration.isDuplex()); |
| // set our properties |
| Properties props = new Properties(); |
| IntrospectionSupport.getProperties(configuration, props, null); |
| String str = MarshallingSupport.propertiesToString(props); |
| brokerInfo.setNetworkProperties(str); |
| brokerInfo.setBrokerId(this.localBrokerId); |
| remoteBroker.oneway(brokerInfo); |
| } |
| if (remoteConnectionInfo != null) { |
| remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); |
| } |
| remoteConnectionInfo = new ConnectionInfo(); |
| remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); |
| remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); |
| remoteConnectionInfo.setUserName(configuration.getUserName()); |
| remoteConnectionInfo.setPassword(configuration.getPassword()); |
| remoteBroker.oneway(remoteConnectionInfo); |
| |
| SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); |
| remoteBroker.oneway(remoteSessionInfo); |
| producerInfo = new ProducerInfo(remoteSessionInfo, 1); |
| producerInfo.setResponseRequired(false); |
| remoteBroker.oneway(producerInfo); |
| // Listen to consumer advisory messages on the remote broker to |
| // determine demand. |
| demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); |
| demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync()); |
| String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter(); |
| if (configuration.isBridgeTempDestinations()) { |
| advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; |
| } |
| demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); |
| demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); |
| remoteBroker.oneway(demandConsumerInfo); |
| startedLatch.countDown(); |
| if (!disposed.get()) { |
| triggerLocalStartBridge(); |
| } |
| } |
| } |
| } |
| |
| public void stop() throws Exception { |
| if (started.compareAndSet(true, false)) { |
| if (disposed.compareAndSet(false, true)) { |
| LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName); |
| NetworkBridgeListener l = this.networkBridgeListener; |
| if (l != null) { |
| l.onStop(this); |
| } |
| try { |
| remoteBridgeStarted.set(false); |
| final CountDownLatch sendShutdown = new CountDownLatch(1); |
| final Map map = MDCHelper.getCopyOfContextMap(); |
| asyncTaskRunner.execute(new Runnable() { |
| public void run() { |
| try { |
| MDCHelper.setContextMap(map); |
| localBroker.oneway(new ShutdownInfo()); |
| sendShutdown.countDown(); |
| remoteBroker.oneway(new ShutdownInfo()); |
| } catch (Throwable e) { |
| LOG.debug("Caught exception sending shutdown", e); |
| } finally { |
| sendShutdown.countDown(); |
| } |
| |
| } |
| }); |
| if (!sendShutdown.await(10, TimeUnit.SECONDS)) { |
| LOG.info("Network Could not shutdown in a timely manner"); |
| } |
| } finally { |
| ServiceStopper ss = new ServiceStopper(); |
| ss.stop(remoteBroker); |
| ss.stop(localBroker); |
| // Release the started Latch since another thread could be |
| // stuck waiting for it to start up. |
| startedLatch.countDown(); |
| startedLatch.countDown(); |
| localStartedLatch.countDown(); |
| ss.throwFirstException(); |
| } |
| } |
| brokerService.getBroker().removeBroker(null, remoteBrokerInfo); |
| brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); |
| LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); |
| remoteBrokerNameKnownLatch.countDown(); |
| } |
| } |
| |
| public void serviceRemoteException(Throwable error) { |
| if (!disposed.get()) { |
| if (error instanceof SecurityException || error instanceof GeneralSecurityException) { |
| LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); |
| } else { |
| LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); |
| } |
| LOG.debug("The remote Exception was: " + error, error); |
| final Map map = MDCHelper.getCopyOfContextMap(); |
| asyncTaskRunner.execute(new Runnable() { |
| public void run() { |
| MDCHelper.setContextMap(map); |
| ServiceSupport.dispose(getControllingService()); |
| } |
| }); |
| fireBridgeFailed(); |
| } |
| } |
| |
| protected void serviceRemoteCommand(Command command) { |
| if (!disposed.get()) { |
| try { |
| if (command.isMessageDispatch()) { |
| waitStarted(); |
| MessageDispatch md = (MessageDispatch) command; |
| serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); |
| demandConsumerDispatched++; |
| if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { |
| remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched)); |
| demandConsumerDispatched = 0; |
| } |
| } else if (command.isBrokerInfo()) { |
| lastConnectSucceeded.set(true); |
| remoteBrokerInfo = (BrokerInfo) command; |
| Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); |
| try { |
| IntrospectionSupport.getProperties(configuration, props, null); |
| if (configuration.getExcludedDestinations() != null) { |
| excludedDestinations = configuration.getExcludedDestinations().toArray( |
| new ActiveMQDestination[configuration.getExcludedDestinations().size()]); |
| } |
| if (configuration.getStaticallyIncludedDestinations() != null) { |
| staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( |
| new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); |
| } |
| if (configuration.getDynamicallyIncludedDestinations() != null) { |
| dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations() |
| .toArray( |
| new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations() |
| .size()]); |
| } |
| } catch (Throwable t) { |
| LOG.error("Error mapping remote destinations", t); |
| } |
| serviceRemoteBrokerInfo(command); |
| // Let the local broker know the remote broker's ID. |
| localBroker.oneway(command); |
| // new peer broker (a consumer can work with remote broker also) |
| brokerService.getBroker().addBroker(null, remoteBrokerInfo); |
| } else if (command.getClass() == ConnectionError.class) { |
| ConnectionError ce = (ConnectionError) command; |
| serviceRemoteException(ce.getException()); |
| } else { |
| if (isDuplex()) { |
| if (command.isMessage()) { |
| ActiveMQMessage message = (ActiveMQMessage) command; |
| if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) |
| || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { |
| serviceRemoteConsumerAdvisory(message.getDataStructure()); |
| } else { |
| if (!isPermissableDestination(message.getDestination(), true)) { |
| return; |
| } |
| if (message.isResponseRequired()) { |
| Response reply = new Response(); |
| reply.setCorrelationId(message.getCommandId()); |
| localBroker.oneway(message); |
| remoteBroker.oneway(reply); |
| } else { |
| localBroker.oneway(message); |
| } |
| } |
| } else { |
| switch (command.getDataStructureType()) { |
| case ConnectionInfo.DATA_STRUCTURE_TYPE: |
| case SessionInfo.DATA_STRUCTURE_TYPE: |
| case ProducerInfo.DATA_STRUCTURE_TYPE: |
| localBroker.oneway(command); |
| break; |
| case ConsumerInfo.DATA_STRUCTURE_TYPE: |
| localStartedLatch.await(); |
| if (started.get()) { |
| if (!addConsumerInfo((ConsumerInfo) command)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Ignoring ConsumerInfo: " + command); |
| } |
| } else { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Adding ConsumerInfo: " + command); |
| } |
| } |
| } else { |
| // received a subscription whilst stopping |
| LOG.warn("Stopping - ignoring ConsumerInfo: " + command); |
| } |
| break; |
| case ShutdownInfo.DATA_STRUCTURE_TYPE: |
| // initiator is shutting down, controlled case |
| // abortive close dealt with by inactivity monitor |
| LOG.info("Stopping network bridge on shutdown of remote broker"); |
| serviceRemoteException(new IOException(command.toString())); |
| break; |
| default: |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Ignoring remote command: " + command); |
| } |
| } |
| } |
| } else { |
| switch (command.getDataStructureType()) { |
| case KeepAliveInfo.DATA_STRUCTURE_TYPE: |
| case WireFormatInfo.DATA_STRUCTURE_TYPE: |
| case ShutdownInfo.DATA_STRUCTURE_TYPE: |
| break; |
| default: |
| LOG.warn("Unexpected remote command: " + command); |
| } |
| } |
| } |
| } catch (Throwable e) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Exception processing remote command: " + command, e); |
| } |
| serviceRemoteException(e); |
| } |
| } |
| } |
| |
| private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { |
| final int networkTTL = configuration.getNetworkTTL(); |
| if (data.getClass() == ConsumerInfo.class) { |
| // Create a new local subscription |
| ConsumerInfo info = (ConsumerInfo) data; |
| BrokerId[] path = info.getBrokerPath(); |
| |
| if (info.isBrowser()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed"); |
| } |
| return; |
| } |
| |
| if (path != null && path.length >= networkTTL) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); |
| } |
| return; |
| } |
| if (contains(path, localBrokerPath[0])) { |
| // Ignore this consumer as it's a consumer we locally sent to the broker. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info); |
| } |
| return; |
| } |
| if (!isPermissableDestination(info.getDestination())) { |
| // ignore if not in the permitted or in the excluded list |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info); |
| } |
| return; |
| } |
| |
| // in a cyclic network there can be multiple bridges per broker that can propagate |
| // a network subscription so there is a need to synchronise on a shared entity |
| synchronized (brokerService.getVmConnectorURI()) { |
| if (addConsumerInfo(info)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info); |
| } |
| } |
| } |
| } else if (data.getClass() == DestinationInfo.class) { |
| // It's a destination info - we want to pass up |
| // information about temporary destinations |
| DestinationInfo destInfo = (DestinationInfo) data; |
| BrokerId[] path = destInfo.getBrokerPath(); |
| if (path != null && path.length >= networkTTL) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only"); |
| } |
| return; |
| } |
| if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { |
| // Ignore this consumer as it's a consumer we locally sent to |
| // the broker. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once"); |
| } |
| return; |
| } |
| destInfo.setConnectionId(localConnectionInfo.getConnectionId()); |
| if (destInfo.getDestination() instanceof ActiveMQTempDestination) { |
| // re-set connection id so comes from here |
| ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); |
| tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); |
| } |
| destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("bridging destination control command: " + destInfo); |
| } |
| localBroker.oneway(destInfo); |
| } else if (data.getClass() == RemoveInfo.class) { |
| ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); |
| removeDemandSubscription(id); |
| } |
| } |
| |
| public void serviceLocalException(Throwable error) { |
| if (!disposed.get()) { |
| LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); |
| LOG.debug("The local Exception was:" + error, error); |
| final Map map = MDCHelper.getCopyOfContextMap(); |
| asyncTaskRunner.execute(new Runnable() { |
| public void run() { |
| MDCHelper.setContextMap(map); |
| ServiceSupport.dispose(getControllingService()); |
| } |
| }); |
| fireBridgeFailed(); |
| } |
| } |
| |
| protected Service getControllingService() { |
| return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; |
| } |
| |
| protected void addSubscription(DemandSubscription sub) throws IOException { |
| if (sub != null) { |
| localBroker.oneway(sub.getLocalInfo()); |
| } |
| } |
| |
| protected void removeSubscription(final DemandSubscription sub) throws IOException { |
| if (sub != null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); |
| } |
| subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); |
| |
| // continue removal in separate thread to free up this thread for outstanding responses |
| final Map map = MDCHelper.getCopyOfContextMap(); |
| asyncTaskRunner.execute(new Runnable() { |
| public void run() { |
| MDCHelper.setContextMap(map); |
| sub.waitForCompletion(); |
| try { |
| localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); |
| } catch (IOException e) { |
| LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e); |
| } |
| } |
| }); |
| } |
| } |
| |
| protected Message configureMessage(MessageDispatch md) { |
| Message message = md.getMessage().copy(); |
| // Update the packet to show where it came from. |
| message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); |
| message.setProducerId(producerInfo.getProducerId()); |
| message.setDestination(md.getDestination()); |
| if (message.getOriginalTransactionId() == null) { |
| message.setOriginalTransactionId(message.getTransactionId()); |
| } |
| message.setTransactionId(null); |
| return message; |
| } |
| |
| protected void serviceLocalCommand(Command command) { |
| if (!disposed.get()) { |
| try { |
| if (command.isMessageDispatch()) { |
| enqueueCounter.incrementAndGet(); |
| final MessageDispatch md = (MessageDispatch) command; |
| final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); |
| if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { |
| |
| if (suppressMessageDispatch(md, sub)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage()); |
| } |
| // still ack as it may be durable |
| try { |
| localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); |
| } finally { |
| sub.decrementOutstandingResponses(); |
| } |
| return; |
| } |
| |
| Message message = configureMessage(md); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); |
| } |
| |
| if (!message.isResponseRequired()) { |
| |
| // If the message was originally sent using async |
| // send, we will preserve that QOS |
| // by bridging it using an async send (small chance |
| // of message loss). |
| try { |
| remoteBroker.oneway(message); |
| localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); |
| dequeueCounter.incrementAndGet(); |
| } finally { |
| sub.decrementOutstandingResponses(); |
| } |
| |
| } else { |
| |
| // The message was not sent using async send, so we |
| // should only ack the local |
| // broker when we get confirmation that the remote |
| // broker has received the message. |
| ResponseCallback callback = new ResponseCallback() { |
| public void onCompletion(FutureResponse future) { |
| try { |
| Response response = future.getResult(); |
| if (response.isException()) { |
| ExceptionResponse er = (ExceptionResponse) response; |
| serviceLocalException(er.getException()); |
| } else { |
| localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); |
| dequeueCounter.incrementAndGet(); |
| } |
| } catch (IOException e) { |
| serviceLocalException(e); |
| } finally { |
| sub.decrementOutstandingResponses(); |
| } |
| } |
| }; |
| |
| remoteBroker.asyncRequest(message, callback); |
| |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); |
| } |
| } |
| } else if (command.isBrokerInfo()) { |
| localBrokerInfo = (BrokerInfo) command; |
| serviceLocalBrokerInfo(command); |
| } else if (command.isShutdownInfo()) { |
| LOG.info(configuration.getBrokerName() + " Shutting down"); |
| // Don't shut down the whole connector if the remote side |
| // was interrupted. |
| // the local transport is just shutting down temporarily |
| // until the remote side |
| // is restored. |
| if (!remoteInterupted.get()) { |
| stop(); |
| } |
| } else if (command.getClass() == ConnectionError.class) { |
| ConnectionError ce = (ConnectionError) command; |
| serviceLocalException(ce.getException()); |
| } else { |
| switch (command.getDataStructureType()) { |
| case WireFormatInfo.DATA_STRUCTURE_TYPE: |
| break; |
| default: |
| LOG.warn("Unexpected local command: " + command); |
| } |
| } |
| } catch (Throwable e) { |
| LOG.warn("Caught an exception processing local command", e); |
| serviceLocalException(e); |
| } |
| } |
| } |
| |
| private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { |
| // See if this consumer's brokerPath tells us it came from the broker at the other end |
| // of the bridge. I think we should be making this decision based on the message's |
| // broker bread crumbs and not the consumer's? However, the message's broker bread |
| // crumbs are null, which is another matter. |
| boolean suppress = false; |
| Object consumerInfo = md.getMessage().getDataStructure(); |
| if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) { |
| suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId()); |
| } |
| |
| // for durable subs, suppression via filter leaves dangling acks so we need to |
| // check here and allow the ack irrespective |
| if (!suppress && sub.getLocalInfo().isDurable()) { |
| MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); |
| messageEvalContext.setMessageReference(md.getMessage()); |
| suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext); |
| } |
| return suppress; |
| } |
| |
| /** |
| * @return Returns the dynamicallyIncludedDestinations. |
| */ |
| public ActiveMQDestination[] getDynamicallyIncludedDestinations() { |
| return dynamicallyIncludedDestinations; |
| } |
| |
| /** |
| * @param dynamicallyIncludedDestinations The |
| * dynamicallyIncludedDestinations to set. |
| */ |
| public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { |
| this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; |
| } |
| |
| /** |
| * @return Returns the excludedDestinations. |
| */ |
| public ActiveMQDestination[] getExcludedDestinations() { |
| return excludedDestinations; |
| } |
| |
| /** |
| * @param excludedDestinations The excludedDestinations to set. |
| */ |
| public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { |
| this.excludedDestinations = excludedDestinations; |
| } |
| |
| /** |
| * @return Returns the staticallyIncludedDestinations. |
| */ |
| public ActiveMQDestination[] getStaticallyIncludedDestinations() { |
| return staticallyIncludedDestinations; |
| } |
| |
| /** |
| * @param staticallyIncludedDestinations The staticallyIncludedDestinations |
| * to set. |
| */ |
| public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { |
| this.staticallyIncludedDestinations = staticallyIncludedDestinations; |
| } |
| |
| /** |
| * @return Returns the durableDestinations. |
| */ |
| public ActiveMQDestination[] getDurableDestinations() { |
| return durableDestinations; |
| } |
| |
| /** |
| * @param durableDestinations The durableDestinations to set. |
| */ |
| public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { |
| this.durableDestinations = durableDestinations; |
| } |
| |
| /** |
| * @return Returns the localBroker. |
| */ |
| public Transport getLocalBroker() { |
| return localBroker; |
| } |
| |
| /** |
| * @return Returns the remoteBroker. |
| */ |
| public Transport getRemoteBroker() { |
| return remoteBroker; |
| } |
| |
| /** |
| * @return the createdByDuplex |
| */ |
| public boolean isCreatedByDuplex() { |
| return this.createdByDuplex; |
| } |
| |
| /** |
| * @param createdByDuplex the createdByDuplex to set |
| */ |
| public void setCreatedByDuplex(boolean createdByDuplex) { |
| this.createdByDuplex = createdByDuplex; |
| } |
| |
| public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { |
| if (brokerPath != null) { |
| for (int i = 0; i < brokerPath.length; i++) { |
| if (brokerId.equals(brokerPath[i])) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { |
| if (brokerPath == null || brokerPath.length == 0) { |
| return pathsToAppend; |
| } |
| BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; |
| System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); |
| System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); |
| return rc; |
| } |
| |
| protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { |
| if (brokerPath == null || brokerPath.length == 0) { |
| return new BrokerId[] { idToAppend }; |
| } |
| BrokerId rc[] = new BrokerId[brokerPath.length + 1]; |
| System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); |
| rc[brokerPath.length] = idToAppend; |
| return rc; |
| } |
| |
| protected boolean isPermissableDestination(ActiveMQDestination destination) { |
| return isPermissableDestination(destination, false); |
| } |
| |
| protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { |
| // Are we not bridging temp destinations? |
| if (destination.isTemporary()) { |
| if (allowTemporary) { |
| return true; |
| } else { |
| return configuration.isBridgeTempDestinations(); |
| } |
| } |
| |
| ActiveMQDestination[] dests = excludedDestinations; |
| if (dests != null && dests.length > 0) { |
| for (int i = 0; i < dests.length; i++) { |
| ActiveMQDestination match = dests[i]; |
| DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match); |
| if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { |
| return false; |
| } |
| } |
| } |
| |
| dests = dynamicallyIncludedDestinations; |
| if (dests != null && dests.length > 0) { |
| for (int i = 0; i < dests.length; i++) { |
| ActiveMQDestination match = dests[i]; |
| DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); |
| if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Subscriptions for these destinations are always created |
| */ |
| protected void setupStaticDestinations() { |
| ActiveMQDestination[] dests = staticallyIncludedDestinations; |
| if (dests != null) { |
| for (int i = 0; i < dests.length; i++) { |
| ActiveMQDestination dest = dests[i]; |
| DemandSubscription sub = createDemandSubscription(dest); |
| try { |
| addSubscription(sub); |
| } catch (IOException e) { |
| LOG.error("Failed to add static destination " + dest, e); |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("bridging messages for static destination: " + dest); |
| } |
| } |
| } |
| } |
| |
| protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { |
| boolean consumerAdded = false; |
| ConsumerInfo info = consumerInfo.copy(); |
| addRemoteBrokerToBrokerPath(info); |
| DemandSubscription sub = createDemandSubscription(info); |
| if (sub != null) { |
| if (duplicateSuppressionIsRequired(sub)) { |
| undoMapRegistration(sub); |
| } else { |
| addSubscription(sub); |
| consumerAdded = true; |
| } |
| } |
| return consumerAdded; |
| } |
| |
| private void undoMapRegistration(DemandSubscription sub) { |
| subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); |
| subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); |
| } |
| |
| /* |
| * check our existing subs networkConsumerIds against the list of network ids in this subscription |
| * A match means a duplicate which we suppress for topics and maybe for queues |
| */ |
| private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { |
| final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); |
| boolean suppress = false; |
| |
| if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || |
| consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) { |
| return suppress; |
| } |
| |
| List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); |
| Collection<Subscription> currentSubs = |
| getRegionSubscriptions(consumerInfo.getDestination().isTopic()); |
| for (Subscription sub : currentSubs) { |
| List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); |
| if (!networkConsumers.isEmpty()) { |
| if (matchFound(candidateConsumers, networkConsumers)) { |
| suppress = hasLowerPriority(sub, candidate.getLocalInfo()); |
| break; |
| } |
| } |
| } |
| return suppress; |
| } |
| |
| private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { |
| boolean suppress = false; |
| |
| if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName |
| + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " |
| + existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); |
| } |
| suppress = true; |
| } else { |
| // remove the existing lower priority duplicate and allow this candidate |
| try { |
| removeDuplicateSubscription(existingSub); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() |
| + " with sub from " + remoteBrokerName |
| + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " |
| + candidateInfo.getNetworkConsumerIds()); |
| } |
| } catch (IOException e) { |
| LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e); |
| } |
| } |
| return suppress; |
| } |
| |
| private void removeDuplicateSubscription(Subscription existingSub) throws IOException { |
| for (NetworkConnector connector : brokerService.getNetworkConnectors()) { |
| if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { |
| break; |
| } |
| } |
| } |
| |
| private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { |
| boolean found = false; |
| for (ConsumerId aliasConsumer : networkConsumers) { |
| if (candidateConsumers.contains(aliasConsumer)) { |
| found = true; |
| break; |
| } |
| } |
| return found; |
| } |
| |
| private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) { |
| RegionBroker region = (RegionBroker) brokerService.getRegionBroker(); |
| AbstractRegion abstractRegion = (AbstractRegion) |
| (isTopic ? region.getTopicRegion() : region.getQueueRegion()); |
| return abstractRegion.getSubscriptions().values(); |
| } |
| |
| protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { |
| //add our original id to ourselves |
| info.addNetworkConsumerId(info.getConsumerId()); |
| return doCreateDemandSubscription(info); |
| } |
| |
| protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { |
| DemandSubscription result = new DemandSubscription(info); |
| result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); |
| if (info.getDestination().isTemporary()) { |
| // reset the local connection Id |
| |
| ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); |
| dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); |
| } |
| |
| if (configuration.isDecreaseNetworkConsumerPriority()) { |
| byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; |
| if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { |
| // The longer the path to the consumer, the less it's consumer priority. |
| priority -= info.getBrokerPath().length + 1; |
| } |
| result.getLocalInfo().setPriority(priority); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info); |
| } |
| } |
| configureDemandSubscription(info, result); |
| return result; |
| } |
| |
| final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { |
| ConsumerInfo info = new ConsumerInfo(); |
| info.setDestination(destination); |
| // the remote info held by the DemandSubscription holds the original |
| // consumerId, |
| // the local info get's overwritten |
| |
| info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); |
| DemandSubscription result = null; |
| try { |
| result = createDemandSubscription(info); |
| } catch (IOException e) { |
| LOG.error("Failed to create DemandSubscription ", e); |
| } |
| if (result != null) { |
| result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); |
| } |
| return result; |
| } |
| |
| protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { |
| sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); |
| sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); |
| subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); |
| subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); |
| |
| if (!info.isDurable()) { |
| // This works for now since we use a VM connection to the local broker. |
| // may need to change if we ever subscribe to a remote broker. |
| sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); |
| } else { |
| // need to ack this message if it is ignored as it is durable so |
| // we check before we send. see: suppressMessageDispatch() |
| } |
| } |
| |
| protected void removeDemandSubscription(ConsumerId id) throws IOException { |
| DemandSubscription sub = subscriptionMapByRemoteId.remove(id); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub); |
| } |
| if (sub != null) { |
| removeSubscription(sub); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo()); |
| } |
| } |
| } |
| |
| protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { |
| boolean removeDone = false; |
| DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); |
| if (sub != null) { |
| try { |
| removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); |
| removeDone = true; |
| } catch (IOException e) { |
| LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e); |
| } |
| } |
| return removeDone; |
| } |
| |
| protected void waitStarted() throws InterruptedException { |
| startedLatch.await(); |
| localBrokerIdKnownLatch.await(); |
| } |
| |
| protected void clearDownSubscriptions() { |
| subscriptionMapByLocalId.clear(); |
| subscriptionMapByRemoteId.clear(); |
| } |
| |
| protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException; |
| |
| protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException; |
| |
| protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; |
| |
| protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; |
| |
| protected abstract BrokerId[] getRemoteBrokerPath(); |
| |
| public void setNetworkBridgeListener(NetworkBridgeListener listener) { |
| this.networkBridgeListener = listener; |
| } |
| |
| private void fireBridgeFailed() { |
| NetworkBridgeListener l = this.networkBridgeListener; |
| if (l != null) { |
| l.bridgeFailed(); |
| } |
| } |
| |
| public String getRemoteAddress() { |
| return remoteBroker.getRemoteAddress(); |
| } |
| |
| public String getLocalAddress() { |
| return localBroker.getRemoteAddress(); |
| } |
| |
| public String getRemoteBrokerName() { |
| return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); |
| } |
| |
| public String getLocalBrokerName() { |
| return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); |
| } |
| |
| public long getDequeueCounter() { |
| return dequeueCounter.get(); |
| } |
| |
| public long getEnqueueCounter() { |
| return enqueueCounter.get(); |
| } |
| |
| protected boolean isDuplex() { |
| return configuration.isDuplex() || createdByDuplex; |
| } |
| |
| public void setBrokerService(BrokerService brokerService) { |
| this.brokerService = brokerService; |
| } |
| } |