blob: 6441c92cca27c97c8719a7f401caf5d8db8e2507 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.BrokerSubscriptionInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A useful base class for implementing demand forwarding bridges.
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker;
protected final Transport remoteBroker;
protected IdGenerator idGenerator = new IdGenerator();
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
protected ConnectionInfo localConnectionInfo;
protected ConnectionInfo remoteConnectionInfo;
protected SessionInfo localSessionInfo;
protected ProducerInfo producerInfo;
protected String remoteBrokerName = "Unknown";
protected String localClientId;
protected ConsumerInfo demandConsumerInfo;
protected int demandConsumerDispatched;
protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
protected final AtomicBoolean disposed = new AtomicBoolean();
protected BrokerId localBrokerId;
protected ActiveMQDestination[] excludedDestinations;
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
protected ActiveMQDestination[] staticallyIncludedDestinations;
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
protected BrokerId remoteBrokerId;
protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
private NetworkBridgeListener networkBridgeListener;
private boolean createdByDuplex;
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
protected BrokerService brokerService = null;
private ObjectName mbeanObjectName;
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
//Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
private Transport duplexInboundLocalBroker = null;
private ProducerInfo duplexInboundLocalProducerInfo;
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration = configuration;
this.localBroker = localBroker;
this.remoteBroker = remoteBroker;
}
public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
this.localBrokerInfo = localBrokerInfo;
this.remoteBrokerInfo = remoteBrokerInfo;
this.duplexInitiatingConnection = connection;
start();
serviceRemoteCommand(remoteBrokerInfo);
}
@Override
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
if (brokerService == null) {
throw new IllegalArgumentException("BrokerService is null on " + this);
}
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
if (isDuplex()) {
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
@Override
public void onException(IOException error) {
serviceLocalException(error);
}
});
duplexInboundLocalBroker.start();
}
localBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
@Override
public void onException(IOException error) {
if (!futureLocalBrokerInfo.isDone()) {
LOG.info("Error with pending local brokerInfo on: {} ({})", localBroker, error.getMessage());
LOG.debug("Peer error: ", error);
futureLocalBrokerInfo.cancel(true);
return;
}
serviceLocalException(error);
}
});
remoteBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceRemoteCommand(command);
}
@Override
public void onException(IOException error) {
if (!futureRemoteBrokerInfo.isDone()) {
LOG.info("Error with pending remote brokerInfo on: {} ({})", remoteBroker, error.getMessage());
LOG.debug("Peer error: ", error);
futureRemoteBrokerInfo.cancel(true);
return;
}
serviceRemoteException(error);
}
});
remoteBroker.start();
localBroker.start();
if (!disposed.get()) {
try {
triggerStartAsyncNetworkBridgeCreation();
} catch (IOException e) {
LOG.warn("Caught exception from remote start", e);
}
} else {
LOG.warn("Bridge was disposed before the start() method was fully executed.");
throw new TransportDisposedIOException();
}
}
}
@Override
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
if (disposed.compareAndSet(false, true)) {
LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
futureRemoteBrokerInfo.cancel(true);
futureLocalBrokerInfo.cancel(true);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStop(this);
}
try {
// local start complete
if (startedLatch.getCount() < 2) {
LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
configuration.getBrokerName(), this, remoteBrokerName
});
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
}
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
serialExecutor.shutdown();
if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
List<Runnable> pendingTasks = serialExecutor.shutdownNow();
LOG.info("pending tasks on stop {}", pendingTasks);
}
//Shutdown the syncExecutor, call countDown to make sure a thread can
//terminate if it is waiting
staticDestinationsLatch.countDown();
syncExecutor.shutdown();
if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
List<Runnable> pendingTasks = syncExecutor.shutdownNow();
LOG.info("pending tasks on stop {}", pendingTasks);
}
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
} catch (Throwable e) {
LOG.debug("Caught exception sending shutdown", e);
} finally {
sendShutdown.countDown();
}
}
}, "ActiveMQ ForwardingBridge StopTask");
if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
LOG.info("Network Could not shutdown in a timely manner");
}
} finally {
ServiceStopper ss = new ServiceStopper();
stopFailoverTransport(remoteBroker);
ss.stop(remoteBroker);
ss.stop(localBroker);
ss.stop(duplexInboundLocalBroker);
// Release the started Latch since another thread could be
// stuck waiting for it to start up.
startedLatch.countDown();
startedLatch.countDown();
localStartedLatch.countDown();
staticDestinationsLatch.countDown();
ss.throwFirstException();
}
}
LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
}
}
private void stopFailoverTransport(Transport transport) {
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
if (failoverTransport != null) {
// may be blocked on write, in which case stop will block
try {
failoverTransport.handleTransportFailure(new IOException("Bridge stopped"));
} catch (InterruptedException ignored) {}
}
}
protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
"remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
try {
// First we collect the info data from both the local and remote ends
collectBrokerInfos();
// Once we have all required broker info we can attempt to start
// the local and then remote sides of the bridge.
doStartLocalAndRemoteBridges();
} finally {
Thread.currentThread().setName(originalName);
}
}
});
}
private void collectBrokerInfos() {
int timeout = 30000;
TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class);
if (tcpTransport != null) {
timeout = tcpTransport.getConnectionTimeout();
}
// First wait for the remote to feed us its BrokerInfo, then we can check on
// the LocalBrokerInfo and decide is this is a loop.
try {
remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
if (remoteBrokerInfo == null) {
serviceLocalException(new Throwable("remoteBrokerInfo is null"));
return;
}
} catch (Exception e) {
serviceRemoteException(e);
return;
}
try {
localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
if (localBrokerInfo == null) {
serviceLocalException(new Throwable("localBrokerInfo is null"));
return;
}
// Before we try and build the bridge lets check if we are in a loop
// and if so just stop now before registering anything.
remoteBrokerId = remoteBrokerInfo.getBrokerId();
if (localBrokerId.equals(remoteBrokerId)) {
LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
});
ServiceSupport.dispose(localBroker);
ServiceSupport.dispose(remoteBroker);
// the bridge is left in a bit of limbo, but it won't get retried
// in this state.
return;
}
// Fill in the remote broker's information now.
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
if (configuration.isUseBrokerNamesAsIdSeed()) {
idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
}
} catch (Throwable e) {
serviceLocalException(e);
}
}
private void doStartLocalAndRemoteBridges() {
if (disposed.get()) {
return;
}
if (isCreatedByDuplex()) {
// apply remote (propagated) configuration to local duplex bridge before start
Properties props = null;
try {
props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
IntrospectionSupport.getProperties(configuration, props, null);
if (configuration.getExcludedDestinations() != null) {
excludedDestinations = configuration.getExcludedDestinations().toArray(
new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
}
if (configuration.getStaticallyIncludedDestinations() != null) {
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
}
if (configuration.getDynamicallyIncludedDestinations() != null) {
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
}
} catch (Throwable t) {
LOG.error("Error mapping remote configuration: {}", props, t);
}
}
try {
startLocalBridge();
} catch (Throwable e) {
serviceLocalException(e);
return;
}
try {
startRemoteBridge();
} catch (Throwable e) {
serviceRemoteException(e);
return;
}
try {
if (safeWaitUntilStarted()) {
setupStaticDestinations();
staticDestinationsLatch.countDown();
}
} catch (Throwable e) {
serviceLocalException(e);
}
}
private void startLocalBridge() throws Throwable {
if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
synchronized (this) {
LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
if (!disposed.get()) {
if (idGenerator == null) {
throw new IllegalStateException("Id Generator cannot be null");
}
localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId = configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + configuration.getBrokerName();
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(configuration.getUserName());
localConnectionInfo.setPassword(configuration.getPassword());
Transport originalTransport = remoteBroker;
while (originalTransport instanceof TransportFilter) {
originalTransport = ((TransportFilter) originalTransport).getNext();
}
if (originalTransport instanceof TcpTransport) {
X509Certificate[] peerCerts = originalTransport.getPeerCertificates();
localConnectionInfo.setTransportContext(peerCerts);
}
// sync requests that may fail
Object resp = localBroker.request(localConnectionInfo);
if (resp instanceof ExceptionResponse) {
throw ((ExceptionResponse) resp).getException();
}
localSessionInfo = new SessionInfo(localConnectionInfo, 1);
localBroker.oneway(localSessionInfo);
if (configuration.isDuplex()) {
// separate in-bound channel for forwards so we don't
// contend with out-bound dispatch on same connection
remoteBrokerInfo.setNetworkConnection(true);
duplexInboundLocalBroker.oneway(remoteBrokerInfo);
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
duplexLocalConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + "duplex"
+ configuration.getClientIdToken() + configuration.getBrokerName());
duplexLocalConnectionInfo.setUserName(configuration.getUserName());
duplexLocalConnectionInfo.setPassword(configuration.getPassword());
if (originalTransport instanceof TcpTransport) {
X509Certificate[] peerCerts = originalTransport.getPeerCertificates();
duplexLocalConnectionInfo.setTransportContext(peerCerts);
}
// sync requests that may fail
resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
if (resp instanceof ExceptionResponse) {
throw ((ExceptionResponse) resp).getException();
}
SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
duplexInboundLocalBroker.oneway(duplexInboundSession);
duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
}
brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStart(this);
}
// Let the local broker know the remote broker's ID.
localBroker.oneway(remoteBrokerInfo);
// new peer broker (a consumer can work with remote broker also)
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
localBroker, remoteBroker, remoteBrokerName
});
LOG.trace("{} register bridge ({}) to {}", new Object[]{
configuration.getBrokerName(), this, remoteBrokerName
});
} else {
LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
}
startedLatch.countDown();
localStartedLatch.countDown();
}
}
}
protected void startRemoteBridge() throws Exception {
if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
synchronized (this) {
if (!isCreatedByDuplex()) {
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(configuration.getBrokerName());
brokerInfo.setBrokerURL(configuration.getBrokerURL());
brokerInfo.setNetworkConnection(true);
brokerInfo.setDuplexConnection(configuration.isDuplex());
// set our properties
Properties props = new Properties();
IntrospectionSupport.getProperties(configuration, props, null);
String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
props.put(dynamicallyIncludedDestinationsKey,
StringToListOfActiveMQDestinationConverter.
convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
}
if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
props.put(staticallyIncludedDestinationsKey,
StringToListOfActiveMQDestinationConverter.
convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
}
props.remove("networkTTL");
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
brokerInfo.setBrokerId(this.localBrokerId);
remoteBroker.oneway(brokerInfo);
if (configuration.isSyncDurableSubs() &&
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
configuration));
}
}
if (remoteConnectionInfo != null) {
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}
remoteConnectionInfo = new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + configuration.getBrokerName() + configuration.getClientIdToken() + "outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
remoteBroker.oneway(remoteSessionInfo);
producerInfo = new ProducerInfo(remoteSessionInfo, 1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
// Listen to consumer advisory messages on the remote broker to determine demand.
if (!configuration.isStaticBridge()) {
demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
// always dispatch advisory message asynchronously so that
// we never block the producer broker if we are slow
demandConsumerInfo.setDispatchAsync(true);
String advisoryTopic = configuration.getDestinationFilter();
if (configuration.isBridgeTempDestinations()) {
advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
configureConsumerPrefetch(demandConsumerInfo);
remoteBroker.oneway(demandConsumerInfo);
}
startedLatch.countDown();
}
}
}
@Override
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString());
} else {
LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString());
}
LOG.debug("The remote Exception was: {}", error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed(error);
}
}
/**
* Checks whether or not this consumer is a direct bridge network subscription
* @param info
* @return
*/
protected boolean isDirectBridgeConsumer(ConsumerInfo info) {
return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
(info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
}
protected boolean isProxyBridgeSubscription(String clientId, String subName) {
if (subName != null && clientId != null) {
if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(configuration.getName())) {
return true;
}
}
return false;
}
/**
* This scenaior is primarily used for durable sync on broker restarts
*
* @param sub
* @param clientId
* @param subName
*/
protected void addProxyNetworkSubscriptionClientId(final DemandSubscription sub, final String clientId, String subName) {
if (clientId != null && sub != null && subName != null) {
String newClientId = getProxyBridgeClientId(clientId);
final SubscriptionInfo newSubInfo = new SubscriptionInfo(newClientId, subName);
sub.getDurableRemoteSubs().add(newSubInfo);
LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
} else {
LOG.debug("Skipping addProxyNetworkSubscription");
}
}
/**
* Add a durable remote proxy subscription when we can generate via the BrokerId path
* This is the most common scenario
*
* @param sub
* @param path
* @param subName
*/
protected void addProxyNetworkSubscriptionBrokerPath(final DemandSubscription sub, final BrokerId[] path, String subName) {
if (sub != null && path.length > 1 && subName != null) {
String b1 = path[path.length-1].toString();
String b2 = path[path.length-2].toString();
final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
sub.getDurableRemoteSubs().add(newSubInfo);
}
}
private String getProxyBridgeClientId(String clientId) {
String newClientId = clientId;
String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
if (clientIdTokens != null && clientIdTokens.length > 2) {
newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound"
+ configuration.getClientIdToken() + clientIdTokens[clientIdTokens.length -1];
}
return newClientId;
}
protected boolean isProxyNSConsumerBrokerPath(ConsumerInfo info) {
return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
}
protected boolean isProxyNSConsumerClientId(String clientId) {
return clientId != null && clientId.split(Pattern.quote(configuration.getClientIdToken())).length > 3;
}
protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
safeWaitUntilStarted();
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command instanceof BrokerSubscriptionInfo) {
final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
//Start in a new thread so we don't block the transport waiting for staticDestinations
syncExecutor.execute(new Runnable() {
@Override
public void run() {
try {
staticDestinationsLatch.await();
//Make sure after the countDown of staticDestinationsLatch we aren't stopping
if (!disposed.get()) {
BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
brokerService.getBrokerName(), subInfo.getBrokerName());
if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
&& !configuration.isDynamicOnly()) {
if (started.get()) {
if (subInfo.getSubscriptionInfos() != null) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
//re-add any process any non-NC consumers that match the
//dynamicallyIncludedDestinations list
//Also re-add network consumers that are not part of this direct
//bridge (proxy of proxy bridges)
if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
}
}
//After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
cleanupDurableSub(ds, i);
}
}
}
}
}
} catch (Exception e) {
LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
LOG.debug(e.getMessage(), e);
}
}
});
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
} else {
if (isDuplex()) {
LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command;
if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
ackAdvisory(message);
} else {
if (!isPermissableDestination(message.getDestination(), true)) {
return;
}
safeWaitUntilStarted();
// message being forwarded - we need to
// propagate the response to our local send
if (canDuplexDispatch(message)) {
message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
final int correlationId = message.getCommandId();
@Override
public void onCompletion(FutureResponse resp) {
try {
Response reply = resp.getResult();
reply.setCorrelationId(correlationId);
remoteBroker.oneway(reply);
//increment counter when messages are received in duplex mode
networkBridgeStatistics.getReceivedCount().increment();
} catch (IOException error) {
LOG.error("Exception: {} on duplex forward of: {}", error, message);
serviceRemoteException(error);
}
}
});
} else {
duplexInboundLocalBroker.oneway(message);
networkBridgeStatistics.getReceivedCount().increment();
}
serviceInboundMessage(message);
} else {
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
Response reply = new Response();
reply.setCorrelationId(message.getCommandId());
remoteBroker.oneway(reply);
}
}
}
} else {
switch (command.getDataStructureType()) {
case ConnectionInfo.DATA_STRUCTURE_TYPE:
if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
// end of initiating connection setup - propogate to initial connection to get mbean by clientid
duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
} else {
localBroker.oneway(command);
}
break;
case SessionInfo.DATA_STRUCTURE_TYPE:
localBroker.oneway(command);
break;
case ProducerInfo.DATA_STRUCTURE_TYPE:
// using duplexInboundLocalProducerInfo
break;
case MessageAck.DATA_STRUCTURE_TYPE:
MessageAck ack = (MessageAck) command;
DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
if (localSub != null) {
ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
localBroker.oneway(ack);
} else {
LOG.warn("Matching local subscription not found for ack: {}", ack);
}
break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
localStartedLatch.await();
if (started.get()) {
final ConsumerInfo consumerInfo = (ConsumerInfo) command;
if (isDuplicateSuppressionOff(consumerInfo)) {
addConsumerInfo(consumerInfo);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(consumerInfo);
}
}
} else {
// received a subscription whilst stopping
LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
}
break;
case ShutdownInfo.DATA_STRUCTURE_TYPE:
// initiator is shutting down, controlled case
// abortive close dealt with by inactivity monitor
LOG.info("Stopping network bridge on shutdown of remote broker");
serviceRemoteException(new IOException(command.toString()));
break;
default:
LOG.debug("Ignoring remote command: {}", command);
}
}
} else {
switch (command.getDataStructureType()) {
case KeepAliveInfo.DATA_STRUCTURE_TYPE:
case WireFormatInfo.DATA_STRUCTURE_TYPE:
case ShutdownInfo.DATA_STRUCTURE_TYPE:
break;
default:
LOG.warn("Unexpected remote command: {}", command);
}
}
}
} catch (Throwable e) {
LOG.debug("Exception processing remote command: {}", command, e);
serviceRemoteException(e);
}
}
}
private void ackAdvisory(Message message) throws IOException {
demandConsumerDispatched++;
if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
(configuration.getAdvisoryAckPercentage() / 100f))) {
final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
ack.setConsumerId(demandConsumerInfo.getConsumerId());
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
remoteBroker.oneway(ack);
} catch (IOException e) {
LOG.warn("Failed to send advisory ack " + ack, e);
}
}
});
demandConsumerDispatched = 0;
}
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getConsumerTTL();
if (data.getClass() == ConsumerInfo.class) {
// Create a new local subscription
ConsumerInfo info = (ConsumerInfo) data;
BrokerId[] path = info.getBrokerPath();
if (info.isBrowser()) {
LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
return;
}
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, networkTTL, info
});
return;
}
if (contains(path, localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to the broker.
LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, info
});
return;
}
if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permitted or in the excluded list
LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
});
return;
}
// in a cyclic network there can be multiple bridges per broker that can propagate
// a network subscription so there is a need to synchronize on a shared entity
// if duplicate suppression is required
if (isDuplicateSuppressionOff(info)) {
addConsumerInfo(info);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(info);
}
}
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up information about temporary destinations
final DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
configuration.getBrokerName(), destInfo, networkTTL
});
return;
}
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
return;
}
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
// re-set connection id so comes from here
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
});
if (destInfo.isRemoveOperation()) {
// Serialize with removeSub operations such that all removeSub advisories
// are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(destInfo);
} catch (IOException e) {
LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
}
}
});
} else {
localBroker.oneway(destInfo);
}
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
if (forcedDurableRemoteId.remove(id)) {
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.removeForcedDurableConsumer(id);
if (removed) {
cleanupDurableSub(ds, i);
}
}
}
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
subscriptionInfo.getSubscriptionName());
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
//If this is a proxy bridge subscription we need to try changing the clientId
if (!removed && proxyBridgeSub){
subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
ds.getDurableRemoteSubs().remove(subscriptionInfo);
removed = true;
}
}
if (removed) {
cleanupDurableSub(ds, i);
}
}
}
}
private void cleanupDurableSub(final DemandSubscription ds,
Iterator<DemandSubscription> i) throws IOException {
if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
&& ds.getForcedDurableConsumersSize() == 0) {
// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
localBroker.oneway(removeInfo);
// remove subscriber
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
sending.setClientId(localClientId);
sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending);
//remove subscriber from local map
i.remove();
//need to remove the mapping from the remote map as well
subscriptionMapByRemoteId.remove(ds.getRemoteInfo().getConsumerId());
}
}
@Override
public void serviceLocalException(Throwable error) {
serviceLocalException(null, error);
}
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
if (!disposed.get()) {
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
// not a reason to terminate the bridge - temps can disappear with
// pending sends as the demand sub may outlive the remote dest
if (messageDispatch != null) {
LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
try {
MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setPoisonCause(error);
localBroker.oneway(poisonAck);
} catch (IOException ioe) {
LOG.error("Failed to posion ack message following forward failure: ", ioe);
}
fireFailedForwardAdvisory(messageDispatch, error);
} else {
LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
}
return;
}
LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
LOG.debug("The local Exception was: {}", error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed(error);
}
}
private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
if (configuration.isAdvisoryForFailedForward()) {
AdvisoryBroker advisoryBroker = null;
try {
advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
if (advisoryBroker != null) {
ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(brokerService.getBroker());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
advisoryMessage);
}
} catch (Exception e) {
LOG.warn("failed to fire forward failure advisory, cause: {}", (Object)e);
LOG.debug("detail", e);
}
}
}
protected Service getControllingService() {
return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
}
protected void addSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
// Serialize with remove operations such that new sub does not cause remove/purge to fail
// remain synchronous b/c duplicate suppression depends on add completion
FutureTask syncTask = new FutureTask(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(sub.getLocalInfo());
} catch (IOException e) {
LOG.warn("failed to deliver add sub command: {}, cause: {}", sub.getLocalInfo(), e);
LOG.debug("detail", e);
}
}
}, null);
try {
serialExecutor.execute(syncTask);
syncTask.get();
} catch (Exception e) {
LOG.warn("failed to execute add sub command: {}, cause: {}", sub.getLocalInfo(), e);
LOG.debug("detail", e);
}
}
}
protected void removeSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
// ensure not available for conduit subs pending removal
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up tshis thread for outstanding responses
// Serialize with removeDestination operations so that removeSubs are serialized with
// removeDestinations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
sub.waitForCompletion();
try {
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
} catch (IOException e) {
LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
}
}
});
}
}
protected Message configureMessage(MessageDispatch md) throws IOException {
Message message = md.getMessage().copy();
// Update the packet to show where it came from.
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
message.setProducerId(producerInfo.getProducerId());
message.setDestination(md.getDestination());
message.setMemoryUsage(null);
if (message.getOriginalTransactionId() == null) {
message.setOriginalTransactionId(message.getTransactionId());
}
message.setTransactionId(null);
if (configuration.isUseCompression()) {
message.compress();
}
return message;
}
protected void serviceLocalCommand(Command command) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
safeWaitUntilStarted();
networkBridgeStatistics.getEnqueues().increment();
final MessageDispatch md = (MessageDispatch) command;
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
if (suppressMessageDispatch(md, sub)) {
LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
});
// still ack as it may be durable
try {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
} finally {
sub.decrementOutstandingResponses();
}
return;
}
Message message = configureMessage(md);
LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
});
if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
try {
// never request b/c they are eventually acked async
remoteBroker.oneway(message);
} finally {
sub.decrementOutstandingResponses();
}
return;
}
if (isPermissableDestination(md.getDestination())) {
if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
// The message was not sent using async send, so we should only
// ack the local broker when we get confirmation that the remote
// broker has received the message.
remoteBroker.asyncRequest(message, new ResponseCallback() {
@Override
public void onCompletion(FutureResponse future) {
try {
Response response = future.getResult();
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
serviceLocalException(md, er.getException());
} else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment();
}
} catch (IOException e) {
serviceLocalException(md, e);
} finally {
sub.decrementOutstandingResponses();
}
}
});
} else {
// If the message was originally sent using async send, we will
// preserve that QOS by bridging it using an async send (small chance
// of message loss).
try {
remoteBroker.oneway(message);
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment();
} finally {
sub.decrementOutstandingResponses();
}
}
serviceOutbound(message);
}
} else {
LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
}
} else if (command.isBrokerInfo()) {
futureLocalBrokerInfo.set((BrokerInfo) command);
} else if (command.isShutdownInfo()) {
LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
stop();
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceLocalException(ce.getException());
} else {
switch (command.getDataStructureType()) {
case WireFormatInfo.DATA_STRUCTURE_TYPE:
break;
case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
break;
default:
LOG.warn("Unexpected local command: {}", command);
}
}
} catch (Throwable e) {
LOG.warn("Caught an exception processing local command", e);
serviceLocalException(e);
}
}
}
private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
boolean suppress = false;
// for durable subs, suppression via filter leaves dangling acks so we
// need to check here and allow the ack irrespective
if (sub.getLocalInfo().isDurable()) {
NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage());
messageEvalContext.setDestination(md.getDestination());
suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
}
return suppress;
}
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if (brokerPath != null) {
for (BrokerId id : brokerPath) {
if (brokerId.equals(id)) {
return true;
}
}
}
return false;
}
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
if (brokerPath == null || brokerPath.length == 0) {
return pathsToAppend;
}
BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
return rc;
}
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
if (brokerPath == null || brokerPath.length == 0) {
return new BrokerId[]{idToAppend};
}
BrokerId rc[] = new BrokerId[brokerPath.length + 1];
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
rc[brokerPath.length] = idToAppend;
return rc;
}
protected boolean isPermissableDestination(ActiveMQDestination destination) {
return isPermissableDestination(destination, false);
}
protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
// Are we not bridging temporary destinations?
if (destination.isTemporary()) {
if (allowTemporary) {
return true;
} else {
return configuration.isBridgeTempDestinations();
}
}
ActiveMQDestination[] dests = excludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return false;
}
}
}
dests = staticallyIncludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
}
dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
return false;
}
return true;
}
/**
* Subscriptions for these destinations are always created
*/
protected void setupStaticDestinations() {
ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null) {
for (ActiveMQDestination dest : dests) {
if (isPermissableDestination(dest)) {
DemandSubscription sub = createDemandSubscription(dest, null);
if (sub != null) {
sub.setStaticallyIncluded(true);
try {
addSubscription(sub);
} catch (IOException e) {
LOG.error("Failed to add static destination {}", dest, e);
}
LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
} else {
LOG.info("{}, static destination excluded: {}, demand already exists", configuration.getBrokerName(), dest);
}
} else {
LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
}
}
}
}
protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
ConsumerInfo info = consumerInfo.copy();
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub = createDemandSubscription(info);
if (sub != null) {
if (duplicateSuppressionIsRequired(sub)) {
undoMapRegistration(sub);
} else {
if (consumerInfo.isDurable()) {
//Handle the demand generated by proxy network subscriptions
//The broker path is case is normal
if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo()) &&
info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) {
final BrokerId[] path = info.getBrokerPath();
addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName());
//This is the durable sync case on broker restart
} else if (isProxyNSConsumerClientId(sub.getRemoteInfo().getClientId()) &&
isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
addProxyNetworkSubscriptionClientId(sub, sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName());
} else {
sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
}
}
addSubscription(sub);
LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
}
}
}
private void undoMapRegistration(DemandSubscription sub) {
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
}
/*
* check our existing subs networkConsumerIds against the list of network
* ids in this subscription A match means a duplicate which we suppress for
* topics and maybe for queues
*/
private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
boolean suppress = false;
if (isDuplicateSuppressionOff(consumerInfo)) {
return suppress;
}
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
if (matchFound(candidateConsumers, networkConsumers)) {
if (isInActiveDurableSub(sub)) {
suppress = false;
} else {
suppress = hasLowerPriority(sub, candidate.getLocalInfo());
}
break;
}
}
}
return suppress;
}
private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
|| consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
|| consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
}
private boolean isInActiveDurableSub(Subscription sub) {
return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
}
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
boolean suppress = false;
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
});
suppress = true;
} else {
// remove the existing lower priority duplicate and allow this candidate
try {
removeDuplicateSubscription(existingSub);
LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
});
} catch (IOException e) {
LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
}
}
return suppress;
}
private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
break;
}
}
}
private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
boolean found = false;
for (ConsumerId aliasConsumer : networkConsumers) {
if (candidateConsumers.contains(aliasConsumer)) {
found = true;
break;
}
}
return found;
}
protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
Region region;
Collection<Subscription> subs;
region = null;
switch (dest.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
region = region_broker.getQueueRegion();
break;
case ActiveMQDestination.TOPIC_TYPE:
region = region_broker.getTopicRegion();
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
region = region_broker.getTempQueueRegion();
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
region = region_broker.getTempTopicRegion();
break;
}
if (region instanceof AbstractRegion) {
subs = ((AbstractRegion) region).getSubscriptions().values();
} else {
subs = null;
}
return subs;
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
// add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info);
}
protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
DemandSubscription result = new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
if (info.getDestination().isTemporary()) {
// reset the local connection Id
ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
}
if (configuration.isDecreaseNetworkConsumerPriority()) {
byte priority = (byte) configuration.getConsumerPriorityBase();
if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
// The longer the path to the consumer, the less it's consumer priority.
priority -= info.getBrokerPath().length + 1;
}
result.getLocalInfo().setPriority(priority);
LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
}
configureDemandSubscription(info, result);
return result;
}
final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) {
ConsumerInfo info = new ConsumerInfo();
info.setNetworkSubscription(true);
info.setDestination(destination);
if (subscriptionName != null) {
info.setSubscriptionName(subscriptionName);
}
// Indicate that this subscription is being made on behalf of the remote broker.
info.setBrokerPath(new BrokerId[]{remoteBrokerId});
// the remote info held by the DemandSubscription holds the original
// consumerId, the local info get's overwritten
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
DemandSubscription result = null;
try {
result = createDemandSubscription(info);
} catch (IOException e) {
LOG.error("Failed to create DemandSubscription ", e);
}
return result;
}
protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
sub.getLocalInfo().setDispatchAsync(true);
} else {
sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
}
configureConsumerPrefetch(sub.getLocalInfo());
subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
if (!info.isDurable()) {
// This works for now since we use a VM connection to the local broker.
// may need to change if we ever subscribe to a remote broker.
sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
} else {
sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
}
protected void removeDemandSubscription(ConsumerId id) throws IOException {
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
});
if (sub != null) {
removeSubscription(sub);
LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
});
}
}
protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
boolean removeDone = false;
DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
if (sub != null) {
try {
removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
removeDone = true;
} catch (IOException e) {
LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
}
}
return removeDone;
}
/**
* Performs a timed wait on the started latch and then checks for disposed
* before performing another wait each time the the started wait times out.
*/
protected boolean safeWaitUntilStarted() throws InterruptedException {
while (!disposed.get()) {
if (startedLatch.await(1, TimeUnit.SECONDS)) {
break;
}
}
return !disposed.get();
}
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
if (brokerService != null && brokerService.getDestinationPolicy() != null) {
PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
filterFactory = entry.getNetworkBridgeFilterFactory();
}
}
return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
}
protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath;
}
@Override
public void setNetworkBridgeListener(NetworkBridgeListener listener) {
this.networkBridgeListener = listener;
}
private void fireBridgeFailed(Throwable reason) {
LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
l.bridgeFailed();
}
}
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
return dynamicallyIncludedDestinations;
}
/**
* @param dynamicallyIncludedDestinations
* The dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
public ActiveMQDestination[] getExcludedDestinations() {
return excludedDestinations;
}
/**
* @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
this.excludedDestinations = excludedDestinations;
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
public ActiveMQDestination[] getStaticallyIncludedDestinations() {
return staticallyIncludedDestinations;
}
/**
* @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
/**
* @return Returns the durableDestinations.
*/
public ActiveMQDestination[] getDurableDestinations() {
return durableDestinations;
}
/**
* @param durableDestinations The durableDestinations to set.
*/
public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
this.durableDestinations = durableDestinations;
}
/**
* @return Returns the localBroker.
*/
public Transport getLocalBroker() {
return localBroker;
}
/**
* @return Returns the remoteBroker.
*/
public Transport getRemoteBroker() {
return remoteBroker;
}
/**
* @return the createdByDuplex
*/
public boolean isCreatedByDuplex() {
return this.createdByDuplex;
}
/**
* @param createdByDuplex the createdByDuplex to set
*/
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
}
@Override
public String getRemoteAddress() {
return remoteBroker.getRemoteAddress();
}
@Override
public String getLocalAddress() {
return localBroker.getRemoteAddress();
}
@Override
public String getRemoteBrokerName() {
return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
}
@Override
public String getRemoteBrokerId() {
return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
}
@Override
public String getLocalBrokerName() {
return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
}
@Override
public long getDequeueCounter() {
return networkBridgeStatistics.getDequeues().getCount();
}
@Override
public long getEnqueueCounter() {
return networkBridgeStatistics.getEnqueues().getCount();
}
@Override
public NetworkBridgeStatistics getNetworkBridgeStatistics() {
return networkBridgeStatistics;
}
protected boolean isDuplex() {
return configuration.isDuplex() || createdByDuplex;
}
public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
return subscriptionMapByRemoteId;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
localBrokerPath[0] = localBrokerId;
}
@Override
public void setMbeanObjectName(ObjectName objectName) {
this.mbeanObjectName = objectName;
}
@Override
public ObjectName getMbeanObjectName() {
return mbeanObjectName;
}
@Override
public void resetStats() {
networkBridgeStatistics.reset();
}
/*
* Used to allow for async tasks to await receipt of the BrokerInfo from the local and
* remote sides of the network bridge.
*/
private static class FutureBrokerInfo implements Future<BrokerInfo> {
private final CountDownLatch slot = new CountDownLatch(1);
private final AtomicBoolean disposed;
private volatile BrokerInfo info = null;
public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
this.info = info;
this.disposed = disposed;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
slot.countDown();
return true;
}
@Override
public boolean isCancelled() {
return slot.getCount() == 0 && info == null;
}
@Override
public boolean isDone() {
return info != null;
}
@Override
public BrokerInfo get() throws InterruptedException, ExecutionException {
try {
if (info == null) {
while (!disposed.get()) {
if (slot.await(1, TimeUnit.SECONDS)) {
break;
}
}
}
return info;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug("Operation interrupted: {}", e, e);
throw new InterruptedException("Interrupted.");
}
}
@Override
public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
if (info == null) {
long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
while (!disposed.get() || System.currentTimeMillis() - deadline < 0) {
if (slot.await(1, TimeUnit.MILLISECONDS)) {
break;
}
}
if (info == null) {
throw new TimeoutException();
}
}
return info;
} catch (InterruptedException e) {
throw new InterruptedException("Interrupted.");
}
}
public void set(BrokerInfo info) {
this.info = info;
this.slot.countDown();
}
}
protected void serviceOutbound(Message message) {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onOutboundMessage(this, message);
}
}
protected void serviceInboundMessage(Message message) {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onInboundMessage(this, message);
}
}
protected boolean canDuplexDispatch(Message message) {
boolean result = true;
if (configuration.isCheckDuplicateMessagesOnDuplex()){
final long producerSequenceId = message.getMessageId().getProducerSequenceId();
// messages are multiplexed on this producer so we need to query the persistenceAdapter
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
if (producerSequenceId <= lastStoredForMessageProducer) {
result = false;
LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
(LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
});
}
}
return result;
}
protected long getStoredSequenceIdForMessage(MessageId messageId) {
try {
return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
} catch (IOException ignored) {
LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
}
return -1;
}
protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
//If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
//set then use it, else default to the prefetchSize setting
if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
configuration.getAdvisoryPrefetchSize() > 0) {
consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
} else {
consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
}
}
}