| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.activemq.artemis.protocol.amqp.connect; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.stream.Stream; |
| |
| import org.apache.activemq.artemis.api.core.ActiveMQException; |
| import org.apache.activemq.artemis.api.core.QueueConfiguration; |
| import org.apache.activemq.artemis.api.core.RoutingType; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.api.core.TransportConfiguration; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; |
| import org.apache.activemq.artemis.core.postoffice.Binding; |
| import org.apache.activemq.artemis.core.postoffice.QueueBinding; |
| import org.apache.activemq.artemis.core.remoting.CertificateUtil; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; |
| import org.apache.activemq.artemis.core.server.ActiveMQComponent; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.core.server.BrokerConnection; |
| import org.apache.activemq.artemis.core.server.Consumer; |
| import org.apache.activemq.artemis.core.server.MessageReference; |
| import org.apache.activemq.artemis.core.server.Queue; |
| import org.apache.activemq.artemis.core.server.impl.AddressInfo; |
| import org.apache.activemq.artemis.core.server.mirror.MirrorController; |
| import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; |
| import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; |
| import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; |
| import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; |
| import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation; |
| import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; |
| import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; |
| import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; |
| import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; |
| import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; |
| import org.apache.activemq.artemis.protocol.amqp.proton.SenderController; |
| import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; |
| import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; |
| import org.apache.activemq.artemis.protocol.amqp.sasl.scram.SCRAMClientSASL; |
| import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; |
| import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; |
| import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; |
| import org.apache.activemq.artemis.spi.core.remoting.Connection; |
| import org.apache.activemq.artemis.spi.core.security.scram.SCRAM; |
| import org.apache.activemq.artemis.utils.ConfigurationHelper; |
| import org.apache.activemq.artemis.utils.UUIDGenerator; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.messaging.Source; |
| import org.apache.qpid.proton.amqp.messaging.Target; |
| import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; |
| import org.apache.qpid.proton.amqp.transport.SenderSettleMode; |
| import org.apache.qpid.proton.engine.EndpointState; |
| import org.apache.qpid.proton.engine.Link; |
| import org.apache.qpid.proton.engine.Receiver; |
| import org.apache.qpid.proton.engine.Sender; |
| import org.apache.qpid.proton.engine.Session; |
| import org.jboss.logging.Logger; |
| |
| public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection { |
| |
| private static final Logger logger = Logger.getLogger(AMQPBrokerConnection.class); |
| |
| private final AMQPBrokerConnectConfiguration brokerConnectConfiguration; |
| private final ProtonProtocolManager protonProtocolManager; |
| private final ActiveMQServer server; |
| private final NettyConnector bridgesConnector; |
| private NettyConnection connection; |
| private Session session; |
| private AMQPSessionContext sessionContext; |
| private ActiveMQProtonRemotingConnection protonRemotingConnection; |
| private volatile boolean started = false; |
| private final AMQPBrokerConnectionManager bridgeManager; |
| private AMQPMirrorControllerSource mirrorControllerSource; |
| private int retryCounter = 0; |
| private int lastRetryCounter; |
| private boolean connecting = false; |
| private volatile ScheduledFuture reconnectFuture; |
| private final Set<Queue> senders = new HashSet<>(); |
| private final Set<Queue> receivers = new HashSet<>(); |
| |
| final Executor connectExecutor; |
| final ScheduledExecutorService scheduledExecutorService; |
| |
| /** This is just for logging. |
| * the actual connection will come from the amqpConnection configuration*/ |
| String host; |
| |
| /** This is just for logging. |
| * the actual connection will come from the amqpConnection configuration*/ |
| int port; |
| |
| public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager, AMQPBrokerConnectConfiguration brokerConnectConfiguration, |
| ProtonProtocolManager protonProtocolManager, |
| ActiveMQServer server, |
| NettyConnector bridgesConnector) { |
| this.bridgeManager = bridgeManager; |
| this.brokerConnectConfiguration = brokerConnectConfiguration; |
| this.protonProtocolManager = protonProtocolManager; |
| this.server = server; |
| this.bridgesConnector = bridgesConnector; |
| connectExecutor = server.getExecutorFactory().getExecutor(); |
| scheduledExecutorService = server.getScheduledPool(); |
| } |
| |
| @Override |
| public String getName() { |
| return brokerConnectConfiguration.getName(); |
| } |
| |
| @Override |
| public String getProtocol() { |
| return "AMQP"; |
| } |
| |
| @Override |
| public boolean isStarted() { |
| return started; |
| } |
| |
| public boolean isConnecting() { |
| return connecting; |
| } |
| |
| @Override |
| public void stop() { |
| if (!started) return; |
| started = false; |
| if (protonRemotingConnection != null) { |
| protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection")); |
| protonRemotingConnection = null; |
| connection = null; |
| } |
| ScheduledFuture scheduledFuture = reconnectFuture; |
| reconnectFuture = null; |
| if (scheduledFuture != null) { |
| scheduledFuture.cancel(true); |
| } |
| } |
| |
| @Override |
| public void start() throws Exception { |
| if (started) return; |
| started = true; |
| server.getConfiguration().registerBrokerPlugin(this); |
| try { |
| |
| if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) { |
| for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { |
| if (connectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) { |
| installMirrorController((AMQPMirrorBrokerConnectionElement) connectionElement, server); |
| } |
| } |
| } |
| } catch (Throwable e) { |
| logger.warn(e.getMessage(), e); |
| return; |
| } |
| connectExecutor.execute(() -> doConnect()); |
| } |
| |
| public NettyConnection getConnection() { |
| return connection; |
| } |
| |
| @Override |
| public void afterCreateQueue(Queue queue) { |
| connectExecutor.execute(() -> { |
| for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { |
| validateMatching(queue, connectionElement); |
| } |
| }); |
| } |
| |
| public void validateMatching(Queue queue, AMQPBrokerConnectionElement connectionElement) { |
| if (connectionElement.getType() != AMQPBrokerConnectionAddressType.MIRROR) { |
| if (connectionElement.getQueueName() != null) { |
| if (queue.getName().equals(connectionElement.getQueueName())) { |
| createLink(queue, connectionElement); |
| } |
| } else if (connectionElement.match(queue.getAddress(), server.getConfiguration().getWildcardConfiguration())) { |
| createLink(queue, connectionElement); |
| } |
| } |
| } |
| |
| public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) { |
| if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) { |
| Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY}; |
| connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability); |
| connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability); |
| } else { |
| if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) { |
| connectSender(queue, queue.getAddress().toString(), null, null, null, null, null); |
| } |
| if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) { |
| connectReceiver(protonRemotingConnection, session, sessionContext, queue); |
| } |
| } |
| } |
| |
| SimpleString getMirrorSNF(AMQPMirrorBrokerConnectionElement mirrorElement) { |
| SimpleString snf = mirrorElement.getMirrorSNF(); |
| if (snf == null) { |
| snf = SimpleString.toSimpleString(ProtonProtocolManager.getMirrorAddress(this.brokerConnectConfiguration.getName())); |
| mirrorElement.setMirrorSNF(snf); |
| } |
| return snf; |
| } |
| |
| private void linkClosed(Link link) { |
| if (link.getLocalState() == EndpointState.ACTIVE) { |
| error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionRemoteLinkClosed(), lastRetryCounter); |
| } |
| } |
| |
| private void doConnect() { |
| try { |
| connecting = true; |
| |
| List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations(); |
| |
| TransportConfiguration tpConfig = configurationList.get(retryCounter % configurationList.size()); |
| |
| String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams()); |
| int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams()); |
| this.host = hostOnParameter; |
| this.port = portOnParameter; |
| connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter); |
| |
| if (connection == null) { |
| retryConnection(); |
| return; |
| } |
| |
| lastRetryCounter = retryCounter; |
| |
| retryCounter = 0; |
| |
| reconnectFuture = null; |
| |
| // before we retry the connection we need to remove any previous links |
| // as they will need to be recreated |
| senders.clear(); |
| receivers.clear(); |
| |
| ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration); |
| |
| ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory); |
| server.getRemotingService().addConnectionEntry(connection, entry); |
| protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection; |
| protonRemotingConnection.getAmqpConnection().setLinkCloseListener(this::linkClosed); |
| |
| connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor())); |
| |
| session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session(); |
| sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session); |
| |
| protonRemotingConnection.getAmqpConnection().runLater(() -> { |
| protonRemotingConnection.getAmqpConnection().open(); |
| session.open(); |
| protonRemotingConnection.getAmqpConnection().flush(); |
| }); |
| |
| if (brokerConnectConfiguration.getConnectionElements() != null) { |
| Stream<Binding> bindingStream = server.getPostOffice().getAllBindings(); |
| |
| bindingStream.forEach(binding -> { |
| if (binding instanceof QueueBinding) { |
| Queue queue = ((QueueBinding) binding).getQueue(); |
| for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { |
| validateMatching(queue, connectionElement); |
| } |
| } |
| }); |
| |
| for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { |
| if (connectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) { |
| AMQPMirrorBrokerConnectionElement replica = (AMQPMirrorBrokerConnectionElement)connectionElement; |
| |
| Queue queue = server.locateQueue(getMirrorSNF(replica)); |
| |
| connectSender(queue, queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), server.getNodeID().toString(), |
| new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}, null); |
| } |
| } |
| } |
| |
| protonRemotingConnection.getAmqpConnection().flush(); |
| |
| bridgeManager.connected(connection, this); |
| |
| ActiveMQAMQPProtocolLogger.LOGGER.successReconnect(brokerConnectConfiguration.getName(), host + ":" + port, lastRetryCounter); |
| |
| connecting = false; |
| } catch (Throwable e) { |
| error(e); |
| } |
| } |
| |
| public void retryConnection() { |
| lastRetryCounter = retryCounter; |
| if (bridgeManager.isStarted() && started) { |
| if (brokerConnectConfiguration.getReconnectAttempts() < 0 || retryCounter < brokerConnectConfiguration.getReconnectAttempts()) { |
| retryCounter++; |
| ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(brokerConnectConfiguration.getName(), host + ":" + port, retryCounter, brokerConnectConfiguration.getReconnectAttempts()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Reconnecting in " + brokerConnectConfiguration.getRetryInterval() + ", this is the " + retryCounter + " of " + brokerConnectConfiguration.getReconnectAttempts()); |
| } |
| reconnectFuture = scheduledExecutorService.schedule(() -> connectExecutor.execute(() -> doConnect()), brokerConnectConfiguration.getRetryInterval(), TimeUnit.MILLISECONDS); |
| } else { |
| retryCounter = 0; |
| started = false; |
| connecting = false; |
| ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(brokerConnectConfiguration.getName(), host + ":" + port, lastRetryCounter); |
| if (logger.isDebugEnabled()) { |
| logger.debug("no more reconnections as the retry counter reached " + retryCounter + " out of " + brokerConnectConfiguration.getReconnectAttempts()); |
| } |
| } |
| } |
| } |
| |
| private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) { |
| // TODO implement this as part of https://issues.apache.org/jira/browse/ARTEMIS-2965 |
| } |
| |
| private Queue installMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) throws Exception { |
| |
| MirrorController currentMirrorController = server.getMirrorController(); |
| |
| // This following block is to avoid a duplicate on mirror controller |
| if (currentMirrorController != null && currentMirrorController instanceof AMQPMirrorControllerSource) { |
| Queue queue = checkCurrentMirror(this, (AMQPMirrorControllerSource) currentMirrorController); |
| // on this case we already had a mirror installed before, we won't duplicate it |
| if (queue != null) { |
| return queue; |
| } |
| } else if (currentMirrorController != null && currentMirrorController instanceof AMQPMirrorControllerAggregation) { |
| AMQPMirrorControllerAggregation aggregation = (AMQPMirrorControllerAggregation) currentMirrorController; |
| |
| for (AMQPMirrorControllerSource source : aggregation.getPartitions()) { |
| Queue queue = checkCurrentMirror(this, source); |
| // on this case we already had a mirror installed before, we won't duplicate it |
| if (queue != null) { |
| return queue; |
| } |
| } |
| } |
| |
| AddressInfo addressInfo = server.getAddressInfo(getMirrorSNF(replicaConfig)); |
| if (addressInfo == null) { |
| addressInfo = new AddressInfo(getMirrorSNF(replicaConfig)).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(!replicaConfig.isDurable()).setInternal(true); |
| server.addAddressInfo(addressInfo); |
| } |
| |
| if (addressInfo.getRoutingType() != RoutingType.ANYCAST) { |
| throw new IllegalArgumentException("sourceMirrorAddress is not ANYCAST"); |
| } |
| |
| Queue mirrorControlQueue = server.locateQueue(getMirrorSNF(replicaConfig)); |
| |
| if (mirrorControlQueue == null) { |
| mirrorControlQueue = server.createQueue(new QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true), true); |
| } |
| |
| mirrorControlQueue.setMirrorController(true); |
| |
| QueueBinding snfReplicaQueueBinding = (QueueBinding)server.getPostOffice().getBinding(getMirrorSNF(replicaConfig)); |
| if (snfReplicaQueueBinding == null) { |
| logger.warn("Queue does not exist even after creation! " + replicaConfig); |
| throw new IllegalAccessException("Cannot start replica"); |
| } |
| |
| Queue snfQueue = snfReplicaQueueBinding.getQueue(); |
| |
| if (!snfQueue.getAddress().equals(getMirrorSNF(replicaConfig))) { |
| logger.warn("Queue " + snfQueue + " belong to a different address (" + snfQueue.getAddress() + "), while we expected it to be " + addressInfo.getName()); |
| throw new IllegalAccessException("Cannot start replica"); |
| } |
| |
| AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(protonProtocolManager, snfQueue, server, replicaConfig, this); |
| |
| this.mirrorControllerSource = newPartition; |
| |
| server.scanAddresses(newPartition); |
| |
| |
| if (currentMirrorController == null) { |
| server.installMirrorController(newPartition); |
| } else { |
| // Replace a standard implementation by an aggregated supporting multiple targets |
| if (currentMirrorController instanceof AMQPMirrorControllerSource) { |
| // replacing the simple mirror control for an aggregator |
| AMQPMirrorControllerAggregation remoteAggregation = new AMQPMirrorControllerAggregation(); |
| remoteAggregation.addPartition((AMQPMirrorControllerSource) currentMirrorController); |
| currentMirrorController = remoteAggregation; |
| server.installMirrorController(remoteAggregation); |
| } |
| ((AMQPMirrorControllerAggregation) currentMirrorController).addPartition(newPartition); |
| } |
| |
| return snfQueue; |
| } |
| |
| private static Queue checkCurrentMirror(AMQPBrokerConnection brokerConnection, |
| AMQPMirrorControllerSource currentMirrorController) { |
| AMQPMirrorControllerSource source = currentMirrorController; |
| if (source.getBrokerConnection() == brokerConnection) { |
| return source.getSnfQueue(); |
| } |
| |
| return null; |
| } |
| |
| private void connectReceiver(ActiveMQProtonRemotingConnection protonRemotingConnection, |
| Session session, |
| AMQPSessionContext sessionContext, |
| Queue queue, |
| Symbol... capabilities) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Connecting inbound for " + queue); |
| } |
| |
| if (session == null) { |
| logger.debug("session is null"); |
| return; |
| } |
| |
| protonRemotingConnection.getAmqpConnection().runLater(() -> { |
| |
| if (receivers.contains(queue)) { |
| logger.debug("Receiver for queue " + queue + " already exists, just giving up"); |
| return; |
| } |
| receivers.add(queue); |
| Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID()); |
| receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); |
| receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); |
| Target target = new Target(); |
| target.setAddress(queue.getAddress().toString()); |
| receiver.setTarget(target); |
| |
| Source source = new Source(); |
| source.setAddress(queue.getAddress().toString()); |
| receiver.setSource(source); |
| |
| if (capabilities != null) { |
| source.setCapabilities(capabilities); |
| } |
| |
| receiver.open(); |
| protonRemotingConnection.getAmqpConnection().flush(); |
| try { |
| sessionContext.addReceiver(receiver); |
| } catch (Exception e) { |
| error(e); |
| } |
| }); |
| } |
| |
| private void connectSender(Queue queue, |
| String targetName, |
| java.util.function.Consumer<Sender> senderConsumer, |
| java.util.function.Consumer<? super MessageReference> beforeDeliver, |
| String brokerID, |
| Symbol[] desiredCapabilities, |
| Symbol[] targetCapabilities) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Connecting outbound for " + queue); |
| } |
| |
| |
| if (session == null) { |
| logger.debug("Session is null"); |
| return; |
| } |
| |
| protonRemotingConnection.getAmqpConnection().runLater(() -> { |
| try { |
| if (senders.contains(queue)) { |
| logger.debug("Sender for queue " + queue + " already exists, just giving up"); |
| return; |
| } |
| senders.add(queue); |
| Sender sender = session.sender(targetName + ":" + UUIDGenerator.getInstance().generateStringUUID()); |
| sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); |
| sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); |
| Target target = new Target(); |
| target.setAddress(targetName); |
| if (targetCapabilities != null) { |
| target.setCapabilities(targetCapabilities); |
| } |
| sender.setTarget(target); |
| |
| Source source = new Source(); |
| source.setAddress(queue.getAddress().toString()); |
| sender.setSource(source); |
| if (brokerID != null) { |
| HashMap<Symbol, Object> mapProperties = new HashMap<>(1, 1); // this map is expected to have a single element, so load factor = 1 |
| mapProperties.put(AMQPMirrorControllerSource.BROKER_ID, brokerID); |
| sender.setProperties(mapProperties); |
| } |
| |
| if (desiredCapabilities != null) { |
| sender.setDesiredCapabilities(desiredCapabilities); |
| } |
| |
| AMQPOutgoingController outgoingInitializer = new AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI()); |
| |
| sender.open(); |
| |
| final ScheduledFuture futureTimeout; |
| |
| AtomicBoolean cancelled = new AtomicBoolean(false); |
| |
| if (bridgesConnector.getConnectTimeoutMillis() > 0) { |
| futureTimeout = server.getScheduledPool().schedule(() -> { |
| cancelled.set(true); |
| error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter); |
| }, bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); |
| } else { |
| futureTimeout = null; |
| } |
| |
| // Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened |
| sender.attachments().set(Runnable.class, Runnable.class, () -> { |
| ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver); |
| try { |
| if (!cancelled.get()) { |
| if (futureTimeout != null) { |
| futureTimeout.cancel(false); |
| } |
| if (sender.getRemoteTarget() == null) { |
| error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.senderLinkRefused(sender.getTarget().getAddress()), lastRetryCounter); |
| return; |
| } |
| if (desiredCapabilities != null) { |
| if (!verifyOfferedCapabilities(sender, desiredCapabilities)) { |
| error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingOfferedCapability(Arrays.toString(desiredCapabilities)), lastRetryCounter); |
| return; |
| } |
| } |
| if (brokerID != null) { |
| if (sender.getRemoteProperties() == null || !sender.getRemoteProperties().containsKey(AMQPMirrorControllerSource.BROKER_ID)) { |
| error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingBrokerID(), lastRetryCounter); |
| return; |
| } |
| |
| Object remoteBrokerID = sender.getRemoteProperties().get(AMQPMirrorControllerSource.BROKER_ID); |
| if (remoteBrokerID.equals(brokerID)) { |
| error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionMirrorItself(), lastRetryCounter); |
| return; |
| } |
| } |
| sessionContext.addSender(sender, senderContext); |
| if (senderConsumer != null) { |
| senderConsumer.accept(sender); |
| } |
| } |
| } catch (Exception e) { |
| error(e); |
| } |
| }); |
| } catch (Exception e) { |
| error(e); |
| } |
| protonRemotingConnection.getAmqpConnection().flush(); |
| }); |
| } |
| |
| protected boolean verifyOfferedCapabilities(Sender sender, Symbol[] capabilities) { |
| |
| if (sender.getRemoteOfferedCapabilities() == null) { |
| return false; |
| } |
| |
| for (Symbol s : capabilities) { |
| boolean foundS = false; |
| for (Symbol b : sender.getRemoteOfferedCapabilities()) { |
| if (b.equals(s)) { |
| foundS = true; |
| break; |
| } |
| } |
| if (!foundS) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| protected void error(Throwable e) { |
| error(e, 0); |
| } |
| |
| // the retryCounter is passed here |
| // in case the error happened after the actual connection |
| // say the connection is invalid due to an invalid attribute or wrong password |
| // but the max retry should not be affected by such cases |
| // otherwise we would always retry from 0 and never reach a max |
| protected void error(Throwable e, int retryCounter) { |
| this.retryCounter = retryCounter; |
| connecting = false; |
| logger.warn(e.getMessage(), e); |
| redoConnection(); |
| } |
| |
| private class AMQPOutgoingController implements SenderController { |
| |
| final Queue queue; |
| final Sender sender; |
| final AMQPSessionCallback sessionSPI; |
| |
| AMQPOutgoingController(Queue queue, Sender sender, AMQPSessionCallback sessionSPI) { |
| this.queue = queue; |
| this.sessionSPI = sessionSPI; |
| this.sender = sender; |
| } |
| |
| @Override |
| public Consumer init(ProtonServerSenderContext senderContext) throws Exception { |
| SimpleString queueName = queue.getName(); |
| return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| } |
| } |
| |
| public void disconnect() throws Exception { |
| redoConnection(); |
| } |
| |
| @Override |
| public void connectionCreated(ActiveMQComponent component, Connection connection, ClientProtocolManager protocol) { |
| } |
| |
| @Override |
| public void connectionDestroyed(Object connectionID) { |
| server.getRemotingService().removeConnection(connectionID); |
| redoConnection(); |
| } |
| |
| @Override |
| public void connectionException(Object connectionID, ActiveMQException me) { |
| redoConnection(); |
| } |
| |
| private void redoConnection() { |
| |
| // avoiding retro-feeding an error call from the close after anyting else that happened. |
| if (protonRemotingConnection != null) { |
| protonRemotingConnection.getAmqpConnection().setLinkCloseListener(null); |
| } |
| |
| // we need to use the connectExecutor to initiate a redoConnection |
| // otherwise we would need to add synchronized blocks along this class |
| // to control when connecting becomes true and when it becomes false |
| // keeping a single executor thread to this purpose would simplify things |
| connectExecutor.execute(() -> { |
| if (connecting) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Broker connection " + this.getName() + " was already in retry mode, exception or retry not captured"); |
| } |
| return; |
| } |
| connecting = true; |
| |
| try { |
| if (protonRemotingConnection != null) { |
| protonRemotingConnection.fail(new ActiveMQException("Connection being recreated")); |
| connection = null; |
| protonRemotingConnection = null; |
| } |
| } catch (Throwable e) { |
| logger.warn(e.getMessage(), e); |
| } |
| |
| retryConnection(); |
| }); |
| } |
| |
| @Override |
| public void connectionReadyForWrites(Object connectionID, boolean ready) { |
| protonRemotingConnection.flush(); |
| } |
| |
| private static final String EXTERNAL = "EXTERNAL"; |
| private static final String PLAIN = "PLAIN"; |
| private static final String ANONYMOUS = "ANONYMOUS"; |
| private static final byte[] EMPTY = new byte[0]; |
| |
| private static class PlainSASLMechanism implements ClientSASL { |
| |
| private final byte[] initialResponse; |
| |
| PlainSASLMechanism(String username, String password) { |
| byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); |
| byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); |
| byte[] encoded = new byte[usernameBytes.length + passwordBytes.length + 2]; |
| System.arraycopy(usernameBytes, 0, encoded, 1, usernameBytes.length); |
| System.arraycopy(passwordBytes, 0, encoded, usernameBytes.length + 2, passwordBytes.length); |
| initialResponse = encoded; |
| } |
| |
| @Override |
| public String getName() { |
| return PLAIN; |
| } |
| |
| @Override |
| public byte[] getInitialResponse() { |
| return initialResponse; |
| } |
| |
| @Override |
| public byte[] getResponse(byte[] challenge) { |
| return EMPTY; |
| } |
| |
| public static boolean isApplicable(final String username, final String password) { |
| return username != null && username.length() > 0 && password != null && password.length() > 0; |
| } |
| } |
| |
| private static class AnonymousSASLMechanism implements ClientSASL { |
| |
| @Override |
| public String getName() { |
| return ANONYMOUS; |
| } |
| |
| @Override |
| public byte[] getInitialResponse() { |
| return EMPTY; |
| } |
| |
| @Override |
| public byte[] getResponse(byte[] challenge) { |
| return EMPTY; |
| } |
| } |
| |
| private static class ExternalSASLMechanism implements ClientSASL { |
| |
| @Override |
| public String getName() { |
| return EXTERNAL; |
| } |
| |
| @Override |
| public byte[] getInitialResponse() { |
| return EMPTY; |
| } |
| |
| @Override |
| public byte[] getResponse(byte[] challenge) { |
| return EMPTY; |
| } |
| |
| public static boolean isApplicable(final NettyConnection connection) { |
| return CertificateUtil.getLocalPrincipalFromConnection(connection) != null; |
| } |
| } |
| |
| private static final class SaslFactory implements ClientSASLFactory { |
| |
| private final NettyConnection connection; |
| private final AMQPBrokerConnectConfiguration brokerConnectConfiguration; |
| |
| SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration brokerConnectConfiguration) { |
| this.connection = connection; |
| this.brokerConnectConfiguration = brokerConnectConfiguration; |
| } |
| |
| @Override |
| public ClientSASL chooseMechanism(String[] offeredMechanims) { |
| List<String> availableMechanisms = offeredMechanims == null ? Collections.emptyList() : Arrays.asList(offeredMechanims); |
| |
| if (availableMechanisms.contains(EXTERNAL) && ExternalSASLMechanism.isApplicable(connection)) { |
| return new ExternalSASLMechanism(); |
| } |
| if (SCRAMClientSASL.isApplicable(brokerConnectConfiguration.getUser(), |
| brokerConnectConfiguration.getPassword())) { |
| for (SCRAM scram : SCRAM.values()) { |
| if (availableMechanisms.contains(scram.getName())) { |
| return new SCRAMClientSASL(scram, brokerConnectConfiguration.getUser(), |
| brokerConnectConfiguration.getPassword()); |
| } |
| } |
| } |
| if (availableMechanisms.contains(PLAIN) && PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword())) { |
| return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword()); |
| } |
| |
| if (availableMechanisms.contains(ANONYMOUS)) { |
| return new AnonymousSASLMechanism(); |
| } |
| |
| return null; |
| } |
| } |
| |
| } |