blob: 3d38eacd21a412a96d12f38b273677ecc436854d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.scram.SCRAMClientSASL;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.scram.SCRAM;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.jboss.logging.Logger;
public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection {
private static final Logger logger = Logger.getLogger(AMQPBrokerConnection.class);
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
private final ProtonProtocolManager protonProtocolManager;
private final ActiveMQServer server;
private final NettyConnector bridgesConnector;
private NettyConnection connection;
private Session session;
private AMQPSessionContext sessionContext;
private ActiveMQProtonRemotingConnection protonRemotingConnection;
private volatile boolean started = false;
private final AMQPBrokerConnectionManager bridgeManager;
private AMQPMirrorControllerSource mirrorControllerSource;
private int retryCounter = 0;
private int lastRetryCounter;
private boolean connecting = false;
private volatile ScheduledFuture reconnectFuture;
private final Set<Queue> senders = new HashSet<>();
private final Set<Queue> receivers = new HashSet<>();
final Executor connectExecutor;
final ScheduledExecutorService scheduledExecutorService;
/** This is just for logging.
* the actual connection will come from the amqpConnection configuration*/
String host;
/** This is just for logging.
* the actual connection will come from the amqpConnection configuration*/
int port;
public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager, AMQPBrokerConnectConfiguration brokerConnectConfiguration,
ProtonProtocolManager protonProtocolManager,
ActiveMQServer server,
NettyConnector bridgesConnector) {
this.bridgeManager = bridgeManager;
this.brokerConnectConfiguration = brokerConnectConfiguration;
this.protonProtocolManager = protonProtocolManager;
this.server = server;
this.bridgesConnector = bridgesConnector;
connectExecutor = server.getExecutorFactory().getExecutor();
scheduledExecutorService = server.getScheduledPool();
}
@Override
public String getName() {
return brokerConnectConfiguration.getName();
}
@Override
public String getProtocol() {
return "AMQP";
}
@Override
public boolean isStarted() {
return started;
}
public boolean isConnecting() {
return connecting;
}
@Override
public void stop() {
if (!started) return;
started = false;
if (protonRemotingConnection != null) {
protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection"));
protonRemotingConnection = null;
connection = null;
}
ScheduledFuture scheduledFuture = reconnectFuture;
reconnectFuture = null;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
@Override
public void start() throws Exception {
if (started) return;
started = true;
server.getConfiguration().registerBrokerPlugin(this);
try {
if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) {
installMirrorController((AMQPMirrorBrokerConnectionElement) connectionElement, server);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return;
}
connectExecutor.execute(() -> doConnect());
}
public NettyConnection getConnection() {
return connection;
}
@Override
public void afterCreateQueue(Queue queue) {
connectExecutor.execute(() -> {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
validateMatching(queue, connectionElement);
}
});
}
public void validateMatching(Queue queue, AMQPBrokerConnectionElement connectionElement) {
if (connectionElement.getType() != AMQPBrokerConnectionAddressType.MIRROR) {
if (connectionElement.getQueueName() != null) {
if (queue.getName().equals(connectionElement.getQueueName())) {
createLink(queue, connectionElement);
}
} else if (connectionElement.match(queue.getAddress(), server.getConfiguration().getWildcardConfiguration())) {
createLink(queue, connectionElement);
}
}
}
public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability);
connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability);
} else {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null);
}
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
}
}
}
SimpleString getMirrorSNF(AMQPMirrorBrokerConnectionElement mirrorElement) {
SimpleString snf = mirrorElement.getMirrorSNF();
if (snf == null) {
snf = SimpleString.toSimpleString(ProtonProtocolManager.getMirrorAddress(this.brokerConnectConfiguration.getName()));
mirrorElement.setMirrorSNF(snf);
}
return snf;
}
private void linkClosed(Link link) {
if (link.getLocalState() == EndpointState.ACTIVE) {
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionRemoteLinkClosed(), lastRetryCounter);
}
}
private void doConnect() {
try {
connecting = true;
List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations();
TransportConfiguration tpConfig = configurationList.get(retryCounter % configurationList.size());
String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams());
int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams());
this.host = hostOnParameter;
this.port = portOnParameter;
connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter);
if (connection == null) {
retryConnection();
return;
}
lastRetryCounter = retryCounter;
retryCounter = 0;
reconnectFuture = null;
// before we retry the connection we need to remove any previous links
// as they will need to be recreated
senders.clear();
receivers.clear();
ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
protonRemotingConnection.getAmqpConnection().setLinkCloseListener(this::linkClosed);
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
protonRemotingConnection.getAmqpConnection().runLater(() -> {
protonRemotingConnection.getAmqpConnection().open();
session.open();
protonRemotingConnection.getAmqpConnection().flush();
});
if (brokerConnectConfiguration.getConnectionElements() != null) {
Stream<Binding> bindingStream = server.getPostOffice().getAllBindings();
bindingStream.forEach(binding -> {
if (binding instanceof QueueBinding) {
Queue queue = ((QueueBinding) binding).getQueue();
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
validateMatching(queue, connectionElement);
}
}
});
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) {
AMQPMirrorBrokerConnectionElement replica = (AMQPMirrorBrokerConnectionElement)connectionElement;
Queue queue = server.locateQueue(getMirrorSNF(replica));
connectSender(queue, queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), server.getNodeID().toString(),
new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}, null);
}
}
}
protonRemotingConnection.getAmqpConnection().flush();
bridgeManager.connected(connection, this);
ActiveMQAMQPProtocolLogger.LOGGER.successReconnect(brokerConnectConfiguration.getName(), host + ":" + port, lastRetryCounter);
connecting = false;
} catch (Throwable e) {
error(e);
}
}
public void retryConnection() {
lastRetryCounter = retryCounter;
if (bridgeManager.isStarted() && started) {
if (brokerConnectConfiguration.getReconnectAttempts() < 0 || retryCounter < brokerConnectConfiguration.getReconnectAttempts()) {
retryCounter++;
ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(brokerConnectConfiguration.getName(), host + ":" + port, retryCounter, brokerConnectConfiguration.getReconnectAttempts());
if (logger.isDebugEnabled()) {
logger.debug("Reconnecting in " + brokerConnectConfiguration.getRetryInterval() + ", this is the " + retryCounter + " of " + brokerConnectConfiguration.getReconnectAttempts());
}
reconnectFuture = scheduledExecutorService.schedule(() -> connectExecutor.execute(() -> doConnect()), brokerConnectConfiguration.getRetryInterval(), TimeUnit.MILLISECONDS);
} else {
retryCounter = 0;
started = false;
connecting = false;
ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(brokerConnectConfiguration.getName(), host + ":" + port, lastRetryCounter);
if (logger.isDebugEnabled()) {
logger.debug("no more reconnections as the retry counter reached " + retryCounter + " out of " + brokerConnectConfiguration.getReconnectAttempts());
}
}
}
}
private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) {
// TODO implement this as part of https://issues.apache.org/jira/browse/ARTEMIS-2965
}
private Queue installMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) throws Exception {
MirrorController currentMirrorController = server.getMirrorController();
// This following block is to avoid a duplicate on mirror controller
if (currentMirrorController != null && currentMirrorController instanceof AMQPMirrorControllerSource) {
Queue queue = checkCurrentMirror(this, (AMQPMirrorControllerSource) currentMirrorController);
// on this case we already had a mirror installed before, we won't duplicate it
if (queue != null) {
return queue;
}
} else if (currentMirrorController != null && currentMirrorController instanceof AMQPMirrorControllerAggregation) {
AMQPMirrorControllerAggregation aggregation = (AMQPMirrorControllerAggregation) currentMirrorController;
for (AMQPMirrorControllerSource source : aggregation.getPartitions()) {
Queue queue = checkCurrentMirror(this, source);
// on this case we already had a mirror installed before, we won't duplicate it
if (queue != null) {
return queue;
}
}
}
AddressInfo addressInfo = server.getAddressInfo(getMirrorSNF(replicaConfig));
if (addressInfo == null) {
addressInfo = new AddressInfo(getMirrorSNF(replicaConfig)).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(!replicaConfig.isDurable()).setInternal(true);
server.addAddressInfo(addressInfo);
}
if (addressInfo.getRoutingType() != RoutingType.ANYCAST) {
throw new IllegalArgumentException("sourceMirrorAddress is not ANYCAST");
}
Queue mirrorControlQueue = server.locateQueue(getMirrorSNF(replicaConfig));
if (mirrorControlQueue == null) {
mirrorControlQueue = server.createQueue(new QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true), true);
}
mirrorControlQueue.setMirrorController(true);
QueueBinding snfReplicaQueueBinding = (QueueBinding)server.getPostOffice().getBinding(getMirrorSNF(replicaConfig));
if (snfReplicaQueueBinding == null) {
logger.warn("Queue does not exist even after creation! " + replicaConfig);
throw new IllegalAccessException("Cannot start replica");
}
Queue snfQueue = snfReplicaQueueBinding.getQueue();
if (!snfQueue.getAddress().equals(getMirrorSNF(replicaConfig))) {
logger.warn("Queue " + snfQueue + " belong to a different address (" + snfQueue.getAddress() + "), while we expected it to be " + addressInfo.getName());
throw new IllegalAccessException("Cannot start replica");
}
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(protonProtocolManager, snfQueue, server, replicaConfig, this);
this.mirrorControllerSource = newPartition;
server.scanAddresses(newPartition);
if (currentMirrorController == null) {
server.installMirrorController(newPartition);
} else {
// Replace a standard implementation by an aggregated supporting multiple targets
if (currentMirrorController instanceof AMQPMirrorControllerSource) {
// replacing the simple mirror control for an aggregator
AMQPMirrorControllerAggregation remoteAggregation = new AMQPMirrorControllerAggregation();
remoteAggregation.addPartition((AMQPMirrorControllerSource) currentMirrorController);
currentMirrorController = remoteAggregation;
server.installMirrorController(remoteAggregation);
}
((AMQPMirrorControllerAggregation) currentMirrorController).addPartition(newPartition);
}
return snfQueue;
}
private static Queue checkCurrentMirror(AMQPBrokerConnection brokerConnection,
AMQPMirrorControllerSource currentMirrorController) {
AMQPMirrorControllerSource source = currentMirrorController;
if (source.getBrokerConnection() == brokerConnection) {
return source.getSnfQueue();
}
return null;
}
private void connectReceiver(ActiveMQProtonRemotingConnection protonRemotingConnection,
Session session,
AMQPSessionContext sessionContext,
Queue queue,
Symbol... capabilities) {
if (logger.isDebugEnabled()) {
logger.debug("Connecting inbound for " + queue);
}
if (session == null) {
logger.debug("session is null");
return;
}
protonRemotingConnection.getAmqpConnection().runLater(() -> {
if (receivers.contains(queue)) {
logger.debug("Receiver for queue " + queue + " already exists, just giving up");
return;
}
receivers.add(queue);
Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID());
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
Target target = new Target();
target.setAddress(queue.getAddress().toString());
receiver.setTarget(target);
Source source = new Source();
source.setAddress(queue.getAddress().toString());
receiver.setSource(source);
if (capabilities != null) {
source.setCapabilities(capabilities);
}
receiver.open();
protonRemotingConnection.getAmqpConnection().flush();
try {
sessionContext.addReceiver(receiver);
} catch (Exception e) {
error(e);
}
});
}
private void connectSender(Queue queue,
String targetName,
java.util.function.Consumer<Sender> senderConsumer,
java.util.function.Consumer<? super MessageReference> beforeDeliver,
String brokerID,
Symbol[] desiredCapabilities,
Symbol[] targetCapabilities) {
if (logger.isDebugEnabled()) {
logger.debug("Connecting outbound for " + queue);
}
if (session == null) {
logger.debug("Session is null");
return;
}
protonRemotingConnection.getAmqpConnection().runLater(() -> {
try {
if (senders.contains(queue)) {
logger.debug("Sender for queue " + queue + " already exists, just giving up");
return;
}
senders.add(queue);
Sender sender = session.sender(targetName + ":" + UUIDGenerator.getInstance().generateStringUUID());
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
Target target = new Target();
target.setAddress(targetName);
if (targetCapabilities != null) {
target.setCapabilities(targetCapabilities);
}
sender.setTarget(target);
Source source = new Source();
source.setAddress(queue.getAddress().toString());
sender.setSource(source);
if (brokerID != null) {
HashMap<Symbol, Object> mapProperties = new HashMap<>(1, 1); // this map is expected to have a single element, so load factor = 1
mapProperties.put(AMQPMirrorControllerSource.BROKER_ID, brokerID);
sender.setProperties(mapProperties);
}
if (desiredCapabilities != null) {
sender.setDesiredCapabilities(desiredCapabilities);
}
AMQPOutgoingController outgoingInitializer = new AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
sender.open();
final ScheduledFuture futureTimeout;
AtomicBoolean cancelled = new AtomicBoolean(false);
if (bridgesConnector.getConnectTimeoutMillis() > 0) {
futureTimeout = server.getScheduledPool().schedule(() -> {
cancelled.set(true);
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter);
}, bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
} else {
futureTimeout = null;
}
// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
sender.attachments().set(Runnable.class, Runnable.class, () -> {
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver);
try {
if (!cancelled.get()) {
if (futureTimeout != null) {
futureTimeout.cancel(false);
}
if (sender.getRemoteTarget() == null) {
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.senderLinkRefused(sender.getTarget().getAddress()), lastRetryCounter);
return;
}
if (desiredCapabilities != null) {
if (!verifyOfferedCapabilities(sender, desiredCapabilities)) {
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingOfferedCapability(Arrays.toString(desiredCapabilities)), lastRetryCounter);
return;
}
}
if (brokerID != null) {
if (sender.getRemoteProperties() == null || !sender.getRemoteProperties().containsKey(AMQPMirrorControllerSource.BROKER_ID)) {
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingBrokerID(), lastRetryCounter);
return;
}
Object remoteBrokerID = sender.getRemoteProperties().get(AMQPMirrorControllerSource.BROKER_ID);
if (remoteBrokerID.equals(brokerID)) {
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionMirrorItself(), lastRetryCounter);
return;
}
}
sessionContext.addSender(sender, senderContext);
if (senderConsumer != null) {
senderConsumer.accept(sender);
}
}
} catch (Exception e) {
error(e);
}
});
} catch (Exception e) {
error(e);
}
protonRemotingConnection.getAmqpConnection().flush();
});
}
protected boolean verifyOfferedCapabilities(Sender sender, Symbol[] capabilities) {
if (sender.getRemoteOfferedCapabilities() == null) {
return false;
}
for (Symbol s : capabilities) {
boolean foundS = false;
for (Symbol b : sender.getRemoteOfferedCapabilities()) {
if (b.equals(s)) {
foundS = true;
break;
}
}
if (!foundS) {
return false;
}
}
return true;
}
protected void error(Throwable e) {
error(e, 0);
}
// the retryCounter is passed here
// in case the error happened after the actual connection
// say the connection is invalid due to an invalid attribute or wrong password
// but the max retry should not be affected by such cases
// otherwise we would always retry from 0 and never reach a max
protected void error(Throwable e, int retryCounter) {
this.retryCounter = retryCounter;
connecting = false;
logger.warn(e.getMessage(), e);
redoConnection();
}
private class AMQPOutgoingController implements SenderController {
final Queue queue;
final Sender sender;
final AMQPSessionCallback sessionSPI;
AMQPOutgoingController(Queue queue, Sender sender, AMQPSessionCallback sessionSPI) {
this.queue = queue;
this.sessionSPI = sessionSPI;
this.sender = sender;
}
@Override
public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
SimpleString queueName = queue.getName();
return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false);
}
@Override
public void close() throws Exception {
}
}
public void disconnect() throws Exception {
redoConnection();
}
@Override
public void connectionCreated(ActiveMQComponent component, Connection connection, ClientProtocolManager protocol) {
}
@Override
public void connectionDestroyed(Object connectionID) {
server.getRemotingService().removeConnection(connectionID);
redoConnection();
}
@Override
public void connectionException(Object connectionID, ActiveMQException me) {
redoConnection();
}
private void redoConnection() {
// avoiding retro-feeding an error call from the close after anyting else that happened.
if (protonRemotingConnection != null) {
protonRemotingConnection.getAmqpConnection().setLinkCloseListener(null);
}
// we need to use the connectExecutor to initiate a redoConnection
// otherwise we would need to add synchronized blocks along this class
// to control when connecting becomes true and when it becomes false
// keeping a single executor thread to this purpose would simplify things
connectExecutor.execute(() -> {
if (connecting) {
if (logger.isDebugEnabled()) {
logger.debug("Broker connection " + this.getName() + " was already in retry mode, exception or retry not captured");
}
return;
}
connecting = true;
try {
if (protonRemotingConnection != null) {
protonRemotingConnection.fail(new ActiveMQException("Connection being recreated"));
connection = null;
protonRemotingConnection = null;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
retryConnection();
});
}
@Override
public void connectionReadyForWrites(Object connectionID, boolean ready) {
protonRemotingConnection.flush();
}
private static final String EXTERNAL = "EXTERNAL";
private static final String PLAIN = "PLAIN";
private static final String ANONYMOUS = "ANONYMOUS";
private static final byte[] EMPTY = new byte[0];
private static class PlainSASLMechanism implements ClientSASL {
private final byte[] initialResponse;
PlainSASLMechanism(String username, String password) {
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
byte[] encoded = new byte[usernameBytes.length + passwordBytes.length + 2];
System.arraycopy(usernameBytes, 0, encoded, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, encoded, usernameBytes.length + 2, passwordBytes.length);
initialResponse = encoded;
}
@Override
public String getName() {
return PLAIN;
}
@Override
public byte[] getInitialResponse() {
return initialResponse;
}
@Override
public byte[] getResponse(byte[] challenge) {
return EMPTY;
}
public static boolean isApplicable(final String username, final String password) {
return username != null && username.length() > 0 && password != null && password.length() > 0;
}
}
private static class AnonymousSASLMechanism implements ClientSASL {
@Override
public String getName() {
return ANONYMOUS;
}
@Override
public byte[] getInitialResponse() {
return EMPTY;
}
@Override
public byte[] getResponse(byte[] challenge) {
return EMPTY;
}
}
private static class ExternalSASLMechanism implements ClientSASL {
@Override
public String getName() {
return EXTERNAL;
}
@Override
public byte[] getInitialResponse() {
return EMPTY;
}
@Override
public byte[] getResponse(byte[] challenge) {
return EMPTY;
}
public static boolean isApplicable(final NettyConnection connection) {
return CertificateUtil.getLocalPrincipalFromConnection(connection) != null;
}
}
private static final class SaslFactory implements ClientSASLFactory {
private final NettyConnection connection;
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration brokerConnectConfiguration) {
this.connection = connection;
this.brokerConnectConfiguration = brokerConnectConfiguration;
}
@Override
public ClientSASL chooseMechanism(String[] offeredMechanims) {
List<String> availableMechanisms = offeredMechanims == null ? Collections.emptyList() : Arrays.asList(offeredMechanims);
if (availableMechanisms.contains(EXTERNAL) && ExternalSASLMechanism.isApplicable(connection)) {
return new ExternalSASLMechanism();
}
if (SCRAMClientSASL.isApplicable(brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword())) {
for (SCRAM scram : SCRAM.values()) {
if (availableMechanisms.contains(scram.getName())) {
return new SCRAMClientSASL(scram, brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword());
}
}
}
if (availableMechanisms.contains(PLAIN) && PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword())) {
return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword());
}
if (availableMechanisms.contains(ANONYMOUS)) {
return new AnonymousSASLMechanism();
}
return null;
}
}
}