blob: 998ebf5bb707e4c0ae30dde6ca11be5e205ecf6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExecutorNettyAdapter;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final String AMQP_CONTAINER_ID = "amqp-container-id";
private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null);
protected final ProtonHandler handler;
private AMQPConnectionCallback connectionCallback;
private final String containerId;
private final boolean isIncomingConnection;
private final ClientSASLFactory saslClientFactory;
private final Map<Symbol, Object> connectionProperties = new HashMap<>();
private final ScheduledExecutorService scheduledPool;
private LinkCloseListener linkCloseListener;
private final Map<Session, AMQPSessionContext> sessions = new ConcurrentHashMap<>();
private final ProtonProtocolManager protocolManager;
private final boolean useCoreSubscriptionNaming;
/** Outgoing means created by the AMQP Bridge */
private final boolean bridgeConnection;
private final ScheduleOperator scheduleOp = new ScheduleOperator(new ScheduleRunnable());
private final AtomicReference<Future<?>> scheduledFutureRef = new AtomicReference(VOID_FUTURE);
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
AMQPConnectionCallback connectionSP,
String containerId,
int idleTimeout,
int maxFrameSize,
int channelMax,
boolean useCoreSubscriptionNaming,
ScheduledExecutorService scheduledPool,
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
Map<Symbol, Object> connectionProperties) {
this(protocolManager, connectionSP, containerId, idleTimeout, maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool, isIncomingConnection, saslClientFactory, connectionProperties, false);
}
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
AMQPConnectionCallback connectionSP,
String containerId,
int idleTimeout,
int maxFrameSize,
int channelMax,
boolean useCoreSubscriptionNaming,
ScheduledExecutorService scheduledPool,
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
Map<Symbol, Object> connectionProperties,
boolean bridgeConnection) {
this.protocolManager = protocolManager;
this.bridgeConnection = bridgeConnection;
this.connectionCallback = connectionSP;
this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
this.isIncomingConnection = isIncomingConnection;
this.saslClientFactory = saslClientFactory;
this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
if (connectionProperties != null) {
this.connectionProperties.putAll(connectionProperties);
}
this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
EventLoop nettyExecutor;
if (connectionCallback.getTransportConnection() instanceof NettyConnection) {
nettyExecutor = ((NettyConnection) connectionCallback.getTransportConnection()).getNettyChannel().eventLoop();
} else {
nettyExecutor = new ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor());
}
this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection && saslClientFactory == null);
handler.addEventHandler(this);
Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false);
if (idleTimeout > 0) {
transport.setIdleTimeout(idleTimeout);
}
transport.setChannelMax(channelMax);
transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
transport.setMaxFrameSize(maxFrameSize);
transport.setOutboundFrameSizeLimit(maxFrameSize);
if (saslClientFactory != null) {
handler.createClientSASL();
}
}
public LinkCloseListener getLinkCloseListener() {
return linkCloseListener;
}
public AMQPConnectionContext setLinkCloseListener(LinkCloseListener linkCloseListener) {
this.linkCloseListener = linkCloseListener;
return this;
}
public boolean isBridgeConnection() {
return bridgeConnection;
}
public void requireInHandler() {
handler.requireHandler();
}
public boolean isHandler() {
return handler.isHandler();
}
public void scheduledFlush() {
handler.scheduledFlush();
}
public boolean isIncomingConnection() {
return isIncomingConnection;
}
public ClientSASLFactory getSaslClientFactory() {
return saslClientFactory;
}
protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
AMQPSessionContext protonSession = new AMQPSessionContext(sessionSPI, this, realSession, protocolManager.getServer());
return protonSession;
}
public SecurityAuth getSecurityAuth() {
return new LocalSecurity();
}
public SASLResult getSASLResult() {
return handler.getSASLResult();
}
public void inputBuffer(ByteBuf buffer) {
if (log.isTraceEnabled()) {
ByteUtil.debugFrame(log, "Buffer Received ", buffer);
}
handler.inputBuffer(buffer);
}
public ProtonHandler getHandler() {
return handler;
}
public void destroy() {
handler.runLater(() -> connectionCallback.close());
}
public boolean isSyncOnFlush() {
return false;
}
public void instantFlush() {
handler.instantFlush();
}
public void flush() {
handler.flush();
}
public void afterFlush(Runnable runnable) {
handler.afterFlush(runnable);
}
public void close(ErrorCondition errorCondition) {
Future<?> scheduledFuture = scheduledFutureRef.getAndSet(null);
if (scheduledPool instanceof ThreadPoolExecutor && scheduledFuture != null &&
scheduledFuture != VOID_FUTURE && scheduledFuture instanceof Runnable) {
if (!((ThreadPoolExecutor) scheduledPool).remove((Runnable) scheduledFuture) &&
!scheduledFuture.isCancelled() && !scheduledFuture.isDone()) {
ActiveMQAMQPProtocolLogger.LOGGER.cantRemovingScheduledTask();
}
}
handler.close(errorCondition, this);
}
public AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
AMQPSessionContext sessionExtension = sessions.get(realSession);
if (sessionExtension == null) {
// how this is possible? Log a warn here
sessionExtension = newSessionExtension(realSession);
realSession.setContext(sessionExtension);
sessions.put(realSession, sessionExtension);
}
return sessionExtension;
}
public void runOnPool(Runnable run) {
handler.runOnPool(run);
}
public void runNow(Runnable run) {
handler.runNow(run);
}
public void runLater(Runnable run) {
handler.runLater(run);
}
protected boolean validateConnection(Connection connection) {
return connectionCallback.validateConnection(connection, handler.getSASLResult());
}
public boolean checkDataReceived() {
return handler.checkDataReceived();
}
public long getCreationTime() {
return handler.getCreationTime();
}
public String getRemoteContainer() {
return handler.getConnection().getRemoteContainer();
}
public String getPubSubPrefix() {
return null;
}
protected void initInternal() throws Exception {
}
public AMQPConnectionCallback getConnectionCallback() {
return connectionCallback;
}
protected void remoteLinkOpened(Link link) throws Exception {
AMQPSessionContext protonSession = getSessionExtension(link.getSession());
Runnable runnable = link.attachments().get(Runnable.class, Runnable.class);
if (runnable != null) {
link.attachments().set(Runnable.class, Runnable.class, null);
runnable.run();
return;
}
if (link.getLocalState() == EndpointState.ACTIVE) { // if already active it's probably from the AMQP bridge and hence we just ignore it
return;
}
link.setSource(link.getRemoteSource());
link.setTarget(link.getRemoteTarget());
if (link instanceof Receiver) {
Receiver receiver = (Receiver) link;
if (link.getRemoteTarget() instanceof Coordinator) {
Coordinator coordinator = (Coordinator) link.getRemoteTarget();
protonSession.addTransactionHandler(coordinator, receiver);
} else {
if (isReplicaTarget(receiver)) {
try {
try {
protonSession.getSessionSPI().check(SimpleString.toSimpleString(link.getTarget().getAddress()), CheckType.SEND, getSecurityAuth());
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingProducer(e.getMessage());
}
if (!verifyDesiredCapabilities(receiver, AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingDesiredCapability(AMQPMirrorControllerSource.MIRROR_CAPABILITY.toString());
}
} catch (ActiveMQAMQPException e) {
log.warn(e.getMessage(), e);
link.setTarget(null);
link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
link.close();
return;
}
receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
protonSession.addReplicaTarget(receiver);
} else {
protonSession.addReceiver(receiver);
}
}
} else {
Sender sender = (Sender) link;
protonSession.addSender(sender);
}
}
protected boolean verifyDesiredCapabilities(Receiver reciever, Symbol s) {
if (reciever.getRemoteDesiredCapabilities() == null) {
return false;
}
boolean foundS = false;
for (Symbol b : reciever.getRemoteDesiredCapabilities()) {
if (b.equals(s)) {
foundS = true;
break;
}
}
if (!foundS) {
return false;
}
return true;
}
private boolean isReplicaTarget(Link link) {
return link != null && link.getTarget() != null && link.getTarget().getAddress() != null && link.getTarget().getAddress().startsWith(ProtonProtocolManager.MIRROR_ADDRESS);
}
public Symbol[] getConnectionCapabilitiesOffered() {
URI tc = connectionCallback.getFailoverList();
if (tc != null) {
Map<Symbol, Object> hostDetails = new HashMap<>();
hostDetails.put(NETWORK_HOST, tc.getHost());
boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true");
if (isSSL) {
hostDetails.put(SCHEME, "amqps");
} else {
hostDetails.put(SCHEME, "amqp");
}
hostDetails.put(HOSTNAME, tc.getHost());
hostDetails.put(PORT, tc.getPort());
connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails));
}
return ExtCapability.getCapabilities();
}
public void open() {
handler.open(containerId, connectionProperties);
}
public String getContainer() {
return containerId;
}
public void addEventHandler(EventHandler eventHandler) {
handler.addEventHandler(eventHandler);
}
public ProtonProtocolManager getProtocolManager() {
return protocolManager;
}
public int getAmqpLowCredits() {
if (protocolManager != null) {
return protocolManager.getAmqpLowCredits();
} else {
// this is for tests only...
return AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
}
}
public int getAmqpCredits() {
if (protocolManager != null) {
return protocolManager.getAmqpCredits();
} else {
// this is for tests only...
return AmqpSupport.AMQP_CREDITS_DEFAULT;
}
}
public boolean isUseCoreSubscriptionNaming() {
return useCoreSubscriptionNaming;
}
@Override
public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
if (sasl) {
// configured mech in decreasing order of preference
String[] mechanisms = connectionCallback.getSaslMechanisms();
if (mechanisms == null || mechanisms.length == 0) {
mechanisms = AnonymousServerSASL.ANONYMOUS_MECH;
}
handler.createServerSASL(mechanisms);
} else {
if (!connectionCallback.isSupportsAnonymous()) {
connectionCallback.sendSASLSupported();
connectionCallback.close();
handler.close(null, this);
}
}
}
@Override
public void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech) {
handler.setChosenMechanism(connectionCallback.getServerSASL(mech));
}
@Override
public void onSaslMechanismsOffered(final ProtonHandler handler, final String[] mechanisms) {
if (saslClientFactory != null) {
handler.setClientMechanism(saslClientFactory.chooseMechanism(mechanisms));
}
}
@Override
public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) {
connectionCallback.close();
handler.close(null, this);
}
@Override
public void onAuthSuccess(final ProtonHandler protonHandler, final Connection connection) {
connection.open();
}
@Override
public void onTransport(Transport transport) {
handler.flushBytes();
}
@Override
public void pushBytes(ByteBuf bytes) {
connectionCallback.onTransport(bytes, this);
}
@Override
public boolean flowControl(ReadyListener readyListener) {
return connectionCallback.isWritable(readyListener);
}
@Override
public String getRemoteAddress() {
return connectionCallback.getTransportConnection().getRemoteAddress();
}
@Override
public void onRemoteOpen(Connection connection) throws Exception {
handler.requireHandler();
try {
initInternal();
} catch (Exception e) {
log.error("Error init connection", e);
}
if (!validateConnection(connection)) {
connection.close();
} else {
connection.setContext(AMQPConnectionContext.this);
connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
initialize();
/*
* This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
* but its here in case we add support for outbound connections.
* */
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
long nextKeepAliveTime = handler.tick(true);
if (nextKeepAliveTime != 0 && scheduledPool != null) {
scheduleOp.setDelay(nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
scheduledFutureRef.getAndUpdate(scheduleOp);
}
}
}
class ScheduleOperator implements UnaryOperator<Future<?>> {
private long delay;
final ScheduleRunnable scheduleRunnable;
ScheduleOperator(ScheduleRunnable scheduleRunnable) {
this.scheduleRunnable = scheduleRunnable;
}
@Override
public Future<?> apply(Future<?> future) {
return (future != null) ? scheduledPool.schedule(scheduleRunnable, delay, TimeUnit.MILLISECONDS) : null;
}
public void setDelay(long delay) {
this.delay = delay;
}
}
class TickerRunnable implements Runnable {
@Override
public void run() {
Long rescheduleAt = handler.tick(false);
if (rescheduleAt == null) {
// this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
scheduleOp.setDelay(10);
scheduledFutureRef.getAndUpdate(scheduleOp);
} else if (rescheduleAt != 0) {
scheduleOp.setDelay(rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
scheduledFutureRef.getAndUpdate(scheduleOp);
}
}
}
class ScheduleRunnable implements Runnable {
final TickerRunnable tickerRunnable = new TickerRunnable();
@Override
public void run() {
// The actual tick has to happen within a Netty Worker, to avoid requiring a lock
// this will also be used to flush the data directly into netty connection's executor
handler.runLater(tickerRunnable);
}
}
@Override
public void onRemoteClose(Connection connection) {
handler.requireHandler();
connection.close();
connection.free();
for (AMQPSessionContext protonSession : sessions.values()) {
protonSession.close();
}
sessions.clear();
// We must force write the channel before we actually destroy the connection
handler.flushBytes();
destroy();
}
@Override
public void onLocalOpen(Session session) throws Exception {
AMQPSessionContext sessionContext = getSessionExtension(session);
if (bridgeConnection) {
sessionContext.initialize();
}
}
@Override
public void onRemoteOpen(Session session) throws Exception {
handler.requireHandler();
getSessionExtension(session).initialize();
session.open();
}
@Override
public void onRemoteClose(Session session) throws Exception {
handler.runLater(() -> {
session.close();
session.free();
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
if (sessionContext != null) {
sessionContext.close();
sessions.remove(session);
session.setContext(null);
}
});
}
@Override
public void onRemoteOpen(Link link) throws Exception {
remoteLinkOpened(link);
}
@Override
public void onFlow(Link link) throws Exception {
if (link.getContext() != null) {
((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
}
}
@Override
public void onRemoteClose(Link link) throws Exception {
handler.requireHandler();
if (linkCloseListener != null) {
linkCloseListener.onClose(link);
}
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
try {
linkContext.close(true);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/// we have to perform the link.close after the linkContext.close is finished.
// linkeContext.close will perform a few executions on the netty loop,
// this has to come next
runLater(() -> {
link.close();
link.free();
flush();
});
}
@Override
public void onRemoteDetach(Link link) throws Exception {
handler.requireHandler();
boolean handleAsClose = link.getSource() != null && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH;
if (handleAsClose) {
onRemoteClose(link);
} else {
link.detach();
link.free();
}
}
@Override
public void onLocalDetach(Link link) throws Exception {
handler.requireHandler();
Object context = link.getContext();
if (context instanceof ProtonServerSenderContext) {
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
senderContext.close(false);
}
}
@Override
public void onDelivery(Delivery delivery) throws Exception {
handler.requireHandler();
ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
if (handler != null) {
handler.onMessage(delivery);
} else {
log.warn("Handler is null, can't delivery " + delivery, new Exception("tracing location"));
}
}
private class LocalSecurity implements SecurityAuth {
@Override
public String getUsername() {
String username = null;
SASLResult saslResult = getSASLResult();
if (saslResult != null) {
username = saslResult.getUser();
}
return username;
}
@Override
public String getPassword() {
String password = null;
SASLResult saslResult = getSASLResult();
if (saslResult != null) {
if (saslResult instanceof PlainSASLResult) {
password = ((PlainSASLResult) saslResult).getPassword();
}
}
return password;
}
@Override
public RemotingConnection getRemotingConnection() {
return connectionCallback.getProtonConnectionDelegate();
}
@Override
public String getSecurityDomain() {
return getProtocolManager().getSecurityDomain();
}
}
}