| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.protocol.amqp.proton; |
| |
| import java.net.URI; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.FutureTask; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.UnaryOperator; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.channel.EventLoop; |
| import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; |
| import org.apache.activemq.artemis.core.security.CheckType; |
| import org.apache.activemq.artemis.core.security.SecurityAuth; |
| import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; |
| import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; |
| import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; |
| import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; |
| import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; |
| import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; |
| import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; |
| import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; |
| import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExecutorNettyAdapter; |
| import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; |
| import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; |
| import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL; |
| import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; |
| import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; |
| import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; |
| import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; |
| import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; |
| import org.apache.activemq.artemis.utils.ByteUtil; |
| import org.apache.activemq.artemis.utils.VersionLoader; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.messaging.Source; |
| import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; |
| import org.apache.qpid.proton.amqp.transaction.Coordinator; |
| import org.apache.qpid.proton.amqp.transport.ErrorCondition; |
| import org.apache.qpid.proton.engine.Connection; |
| import org.apache.qpid.proton.engine.Delivery; |
| import org.apache.qpid.proton.engine.EndpointState; |
| import org.apache.qpid.proton.engine.Link; |
| import org.apache.qpid.proton.engine.Receiver; |
| import org.apache.qpid.proton.engine.Sender; |
| import org.apache.qpid.proton.engine.Session; |
| import org.apache.qpid.proton.engine.Transport; |
| import org.jboss.logging.Logger; |
| |
| import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; |
| import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; |
| import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; |
| import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; |
| import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; |
| |
| public class AMQPConnectionContext extends ProtonInitializable implements EventHandler { |
| |
| private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); |
| |
| public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); |
| public static final String AMQP_CONTAINER_ID = "amqp-container-id"; |
| private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null); |
| |
| protected final ProtonHandler handler; |
| |
| private AMQPConnectionCallback connectionCallback; |
| private final String containerId; |
| private final boolean isIncomingConnection; |
| private final ClientSASLFactory saslClientFactory; |
| private final Map<Symbol, Object> connectionProperties = new HashMap<>(); |
| private final ScheduledExecutorService scheduledPool; |
| |
| private LinkCloseListener linkCloseListener; |
| |
| private final Map<Session, AMQPSessionContext> sessions = new ConcurrentHashMap<>(); |
| |
| private final ProtonProtocolManager protocolManager; |
| |
| private final boolean useCoreSubscriptionNaming; |
| |
| /** Outgoing means created by the AMQP Bridge */ |
| private final boolean bridgeConnection; |
| |
| private final ScheduleOperator scheduleOp = new ScheduleOperator(new ScheduleRunnable()); |
| private final AtomicReference<Future<?>> scheduledFutureRef = new AtomicReference(VOID_FUTURE); |
| |
| public AMQPConnectionContext(ProtonProtocolManager protocolManager, |
| AMQPConnectionCallback connectionSP, |
| String containerId, |
| int idleTimeout, |
| int maxFrameSize, |
| int channelMax, |
| boolean useCoreSubscriptionNaming, |
| ScheduledExecutorService scheduledPool, |
| boolean isIncomingConnection, |
| ClientSASLFactory saslClientFactory, |
| Map<Symbol, Object> connectionProperties) { |
| this(protocolManager, connectionSP, containerId, idleTimeout, maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool, isIncomingConnection, saslClientFactory, connectionProperties, false); |
| } |
| |
| public AMQPConnectionContext(ProtonProtocolManager protocolManager, |
| AMQPConnectionCallback connectionSP, |
| String containerId, |
| int idleTimeout, |
| int maxFrameSize, |
| int channelMax, |
| boolean useCoreSubscriptionNaming, |
| ScheduledExecutorService scheduledPool, |
| boolean isIncomingConnection, |
| ClientSASLFactory saslClientFactory, |
| Map<Symbol, Object> connectionProperties, |
| boolean bridgeConnection) { |
| this.protocolManager = protocolManager; |
| this.bridgeConnection = bridgeConnection; |
| this.connectionCallback = connectionSP; |
| this.useCoreSubscriptionNaming = useCoreSubscriptionNaming; |
| this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); |
| this.isIncomingConnection = isIncomingConnection; |
| this.saslClientFactory = saslClientFactory; |
| |
| this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis"); |
| this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion()); |
| |
| if (connectionProperties != null) { |
| this.connectionProperties.putAll(connectionProperties); |
| } |
| |
| this.scheduledPool = scheduledPool; |
| connectionCallback.setConnection(this); |
| EventLoop nettyExecutor; |
| if (connectionCallback.getTransportConnection() instanceof NettyConnection) { |
| nettyExecutor = ((NettyConnection) connectionCallback.getTransportConnection()).getNettyChannel().eventLoop(); |
| } else { |
| nettyExecutor = new ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor()); |
| } |
| this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection && saslClientFactory == null); |
| handler.addEventHandler(this); |
| Transport transport = handler.getTransport(); |
| transport.setEmitFlowEventOnSend(false); |
| if (idleTimeout > 0) { |
| transport.setIdleTimeout(idleTimeout); |
| } |
| transport.setChannelMax(channelMax); |
| transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize()); |
| transport.setMaxFrameSize(maxFrameSize); |
| transport.setOutboundFrameSizeLimit(maxFrameSize); |
| if (saslClientFactory != null) { |
| handler.createClientSASL(); |
| } |
| } |
| |
| public LinkCloseListener getLinkCloseListener() { |
| return linkCloseListener; |
| } |
| |
| public AMQPConnectionContext setLinkCloseListener(LinkCloseListener linkCloseListener) { |
| this.linkCloseListener = linkCloseListener; |
| return this; |
| } |
| |
| public boolean isBridgeConnection() { |
| return bridgeConnection; |
| } |
| |
| public void requireInHandler() { |
| handler.requireHandler(); |
| } |
| |
| public boolean isHandler() { |
| return handler.isHandler(); |
| } |
| |
| public void scheduledFlush() { |
| handler.scheduledFlush(); |
| } |
| |
| public boolean isIncomingConnection() { |
| return isIncomingConnection; |
| } |
| |
| public ClientSASLFactory getSaslClientFactory() { |
| return saslClientFactory; |
| } |
| |
| protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException { |
| AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this); |
| AMQPSessionContext protonSession = new AMQPSessionContext(sessionSPI, this, realSession, protocolManager.getServer()); |
| |
| return protonSession; |
| } |
| |
| public SecurityAuth getSecurityAuth() { |
| return new LocalSecurity(); |
| } |
| |
| public SASLResult getSASLResult() { |
| return handler.getSASLResult(); |
| } |
| |
| public void inputBuffer(ByteBuf buffer) { |
| if (log.isTraceEnabled()) { |
| ByteUtil.debugFrame(log, "Buffer Received ", buffer); |
| } |
| |
| handler.inputBuffer(buffer); |
| } |
| |
| public ProtonHandler getHandler() { |
| return handler; |
| } |
| |
| public void destroy() { |
| handler.runLater(() -> connectionCallback.close()); |
| } |
| |
| public boolean isSyncOnFlush() { |
| return false; |
| } |
| |
| public void instantFlush() { |
| handler.instantFlush(); |
| } |
| public void flush() { |
| handler.flush(); |
| } |
| |
| public void afterFlush(Runnable runnable) { |
| handler.afterFlush(runnable); |
| } |
| |
| public void close(ErrorCondition errorCondition) { |
| Future<?> scheduledFuture = scheduledFutureRef.getAndSet(null); |
| |
| if (scheduledPool instanceof ThreadPoolExecutor && scheduledFuture != null && |
| scheduledFuture != VOID_FUTURE && scheduledFuture instanceof Runnable) { |
| if (!((ThreadPoolExecutor) scheduledPool).remove((Runnable) scheduledFuture) && |
| !scheduledFuture.isCancelled() && !scheduledFuture.isDone()) { |
| ActiveMQAMQPProtocolLogger.LOGGER.cantRemovingScheduledTask(); |
| } |
| } |
| |
| handler.close(errorCondition, this); |
| } |
| |
| public AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException { |
| AMQPSessionContext sessionExtension = sessions.get(realSession); |
| if (sessionExtension == null) { |
| // how this is possible? Log a warn here |
| sessionExtension = newSessionExtension(realSession); |
| realSession.setContext(sessionExtension); |
| sessions.put(realSession, sessionExtension); |
| } |
| return sessionExtension; |
| } |
| |
| public void runOnPool(Runnable run) { |
| handler.runOnPool(run); |
| } |
| |
| public void runNow(Runnable run) { |
| handler.runNow(run); |
| } |
| |
| public void runLater(Runnable run) { |
| handler.runLater(run); |
| } |
| |
| protected boolean validateConnection(Connection connection) { |
| return connectionCallback.validateConnection(connection, handler.getSASLResult()); |
| } |
| |
| public boolean checkDataReceived() { |
| return handler.checkDataReceived(); |
| } |
| |
| public long getCreationTime() { |
| return handler.getCreationTime(); |
| } |
| |
| public String getRemoteContainer() { |
| return handler.getConnection().getRemoteContainer(); |
| } |
| |
| public String getPubSubPrefix() { |
| return null; |
| } |
| |
| protected void initInternal() throws Exception { |
| } |
| |
| public AMQPConnectionCallback getConnectionCallback() { |
| return connectionCallback; |
| } |
| |
| protected void remoteLinkOpened(Link link) throws Exception { |
| |
| AMQPSessionContext protonSession = getSessionExtension(link.getSession()); |
| |
| Runnable runnable = link.attachments().get(Runnable.class, Runnable.class); |
| if (runnable != null) { |
| link.attachments().set(Runnable.class, Runnable.class, null); |
| runnable.run(); |
| return; |
| } |
| |
| if (link.getLocalState() == EndpointState.ACTIVE) { // if already active it's probably from the AMQP bridge and hence we just ignore it |
| return; |
| } |
| |
| link.setSource(link.getRemoteSource()); |
| link.setTarget(link.getRemoteTarget()); |
| if (link instanceof Receiver) { |
| Receiver receiver = (Receiver) link; |
| if (link.getRemoteTarget() instanceof Coordinator) { |
| Coordinator coordinator = (Coordinator) link.getRemoteTarget(); |
| protonSession.addTransactionHandler(coordinator, receiver); |
| } else { |
| if (isReplicaTarget(receiver)) { |
| try { |
| try { |
| protonSession.getSessionSPI().check(SimpleString.toSimpleString(link.getTarget().getAddress()), CheckType.SEND, getSecurityAuth()); |
| } catch (ActiveMQSecurityException e) { |
| throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingProducer(e.getMessage()); |
| } |
| |
| if (!verifyDesiredCapabilities(receiver, AMQPMirrorControllerSource.MIRROR_CAPABILITY)) { |
| throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingDesiredCapability(AMQPMirrorControllerSource.MIRROR_CAPABILITY.toString()); |
| } |
| } catch (ActiveMQAMQPException e) { |
| log.warn(e.getMessage(), e); |
| |
| link.setTarget(null); |
| link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); |
| link.close(); |
| |
| return; |
| } |
| |
| receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); |
| protonSession.addReplicaTarget(receiver); |
| } else { |
| protonSession.addReceiver(receiver); |
| } |
| } |
| } else { |
| Sender sender = (Sender) link; |
| protonSession.addSender(sender); |
| } |
| } |
| |
| |
| protected boolean verifyDesiredCapabilities(Receiver reciever, Symbol s) { |
| |
| if (reciever.getRemoteDesiredCapabilities() == null) { |
| return false; |
| } |
| |
| boolean foundS = false; |
| for (Symbol b : reciever.getRemoteDesiredCapabilities()) { |
| if (b.equals(s)) { |
| foundS = true; |
| break; |
| } |
| } |
| if (!foundS) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| |
| private boolean isReplicaTarget(Link link) { |
| return link != null && link.getTarget() != null && link.getTarget().getAddress() != null && link.getTarget().getAddress().startsWith(ProtonProtocolManager.MIRROR_ADDRESS); |
| } |
| |
| public Symbol[] getConnectionCapabilitiesOffered() { |
| URI tc = connectionCallback.getFailoverList(); |
| if (tc != null) { |
| Map<Symbol, Object> hostDetails = new HashMap<>(); |
| hostDetails.put(NETWORK_HOST, tc.getHost()); |
| boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true"); |
| if (isSSL) { |
| hostDetails.put(SCHEME, "amqps"); |
| } else { |
| hostDetails.put(SCHEME, "amqp"); |
| } |
| hostDetails.put(HOSTNAME, tc.getHost()); |
| hostDetails.put(PORT, tc.getPort()); |
| |
| connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails)); |
| } |
| return ExtCapability.getCapabilities(); |
| } |
| |
| public void open() { |
| handler.open(containerId, connectionProperties); |
| } |
| |
| public String getContainer() { |
| return containerId; |
| } |
| |
| public void addEventHandler(EventHandler eventHandler) { |
| handler.addEventHandler(eventHandler); |
| } |
| |
| public ProtonProtocolManager getProtocolManager() { |
| return protocolManager; |
| } |
| |
| public int getAmqpLowCredits() { |
| if (protocolManager != null) { |
| return protocolManager.getAmqpLowCredits(); |
| } else { |
| // this is for tests only... |
| return AmqpSupport.AMQP_LOW_CREDITS_DEFAULT; |
| } |
| } |
| |
| public int getAmqpCredits() { |
| if (protocolManager != null) { |
| return protocolManager.getAmqpCredits(); |
| } else { |
| // this is for tests only... |
| return AmqpSupport.AMQP_CREDITS_DEFAULT; |
| } |
| } |
| |
| public boolean isUseCoreSubscriptionNaming() { |
| return useCoreSubscriptionNaming; |
| } |
| |
| @Override |
| public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { |
| if (sasl) { |
| // configured mech in decreasing order of preference |
| String[] mechanisms = connectionCallback.getSaslMechanisms(); |
| if (mechanisms == null || mechanisms.length == 0) { |
| mechanisms = AnonymousServerSASL.ANONYMOUS_MECH; |
| } |
| handler.createServerSASL(mechanisms); |
| } else { |
| if (!connectionCallback.isSupportsAnonymous()) { |
| connectionCallback.sendSASLSupported(); |
| connectionCallback.close(); |
| handler.close(null, this); |
| } |
| } |
| } |
| |
| @Override |
| public void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech) { |
| handler.setChosenMechanism(connectionCallback.getServerSASL(mech)); |
| } |
| |
| @Override |
| public void onSaslMechanismsOffered(final ProtonHandler handler, final String[] mechanisms) { |
| if (saslClientFactory != null) { |
| handler.setClientMechanism(saslClientFactory.chooseMechanism(mechanisms)); |
| } |
| } |
| |
| @Override |
| public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) { |
| connectionCallback.close(); |
| handler.close(null, this); |
| } |
| |
| @Override |
| public void onAuthSuccess(final ProtonHandler protonHandler, final Connection connection) { |
| connection.open(); |
| } |
| |
| @Override |
| public void onTransport(Transport transport) { |
| handler.flushBytes(); |
| } |
| |
| @Override |
| public void pushBytes(ByteBuf bytes) { |
| connectionCallback.onTransport(bytes, this); |
| } |
| |
| @Override |
| public boolean flowControl(ReadyListener readyListener) { |
| return connectionCallback.isWritable(readyListener); |
| } |
| |
| @Override |
| public String getRemoteAddress() { |
| return connectionCallback.getTransportConnection().getRemoteAddress(); |
| } |
| |
| @Override |
| public void onRemoteOpen(Connection connection) throws Exception { |
| handler.requireHandler(); |
| try { |
| initInternal(); |
| } catch (Exception e) { |
| log.error("Error init connection", e); |
| } |
| if (!validateConnection(connection)) { |
| connection.close(); |
| } else { |
| connection.setContext(AMQPConnectionContext.this); |
| connection.setContainer(containerId); |
| connection.setProperties(connectionProperties); |
| connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); |
| connection.open(); |
| } |
| initialize(); |
| |
| /* |
| * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections |
| * but its here in case we add support for outbound connections. |
| * */ |
| if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { |
| long nextKeepAliveTime = handler.tick(true); |
| |
| if (nextKeepAliveTime != 0 && scheduledPool != null) { |
| scheduleOp.setDelay(nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); |
| |
| scheduledFutureRef.getAndUpdate(scheduleOp); |
| } |
| } |
| } |
| |
| class ScheduleOperator implements UnaryOperator<Future<?>> { |
| |
| private long delay; |
| final ScheduleRunnable scheduleRunnable; |
| |
| ScheduleOperator(ScheduleRunnable scheduleRunnable) { |
| this.scheduleRunnable = scheduleRunnable; |
| } |
| |
| @Override |
| public Future<?> apply(Future<?> future) { |
| return (future != null) ? scheduledPool.schedule(scheduleRunnable, delay, TimeUnit.MILLISECONDS) : null; |
| } |
| |
| public void setDelay(long delay) { |
| this.delay = delay; |
| } |
| } |
| |
| |
| class TickerRunnable implements Runnable { |
| |
| @Override |
| public void run() { |
| Long rescheduleAt = handler.tick(false); |
| |
| if (rescheduleAt == null) { |
| // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. |
| scheduleOp.setDelay(10); |
| |
| scheduledFutureRef.getAndUpdate(scheduleOp); |
| } else if (rescheduleAt != 0) { |
| scheduleOp.setDelay(rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); |
| |
| scheduledFutureRef.getAndUpdate(scheduleOp); |
| } |
| } |
| } |
| |
| class ScheduleRunnable implements Runnable { |
| |
| final TickerRunnable tickerRunnable = new TickerRunnable(); |
| |
| @Override |
| public void run() { |
| |
| // The actual tick has to happen within a Netty Worker, to avoid requiring a lock |
| // this will also be used to flush the data directly into netty connection's executor |
| handler.runLater(tickerRunnable); |
| } |
| } |
| |
| @Override |
| public void onRemoteClose(Connection connection) { |
| handler.requireHandler(); |
| connection.close(); |
| connection.free(); |
| |
| for (AMQPSessionContext protonSession : sessions.values()) { |
| protonSession.close(); |
| } |
| sessions.clear(); |
| |
| // We must force write the channel before we actually destroy the connection |
| handler.flushBytes(); |
| destroy(); |
| } |
| |
| @Override |
| public void onLocalOpen(Session session) throws Exception { |
| AMQPSessionContext sessionContext = getSessionExtension(session); |
| |
| if (bridgeConnection) { |
| sessionContext.initialize(); |
| } |
| } |
| |
| @Override |
| public void onRemoteOpen(Session session) throws Exception { |
| handler.requireHandler(); |
| getSessionExtension(session).initialize(); |
| session.open(); |
| } |
| |
| @Override |
| public void onRemoteClose(Session session) throws Exception { |
| handler.runLater(() -> { |
| session.close(); |
| session.free(); |
| |
| AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); |
| if (sessionContext != null) { |
| sessionContext.close(); |
| sessions.remove(session); |
| session.setContext(null); |
| } |
| }); |
| } |
| |
| @Override |
| public void onRemoteOpen(Link link) throws Exception { |
| remoteLinkOpened(link); |
| } |
| |
| @Override |
| public void onFlow(Link link) throws Exception { |
| if (link.getContext() != null) { |
| ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain()); |
| } |
| } |
| |
| @Override |
| public void onRemoteClose(Link link) throws Exception { |
| handler.requireHandler(); |
| |
| if (linkCloseListener != null) { |
| linkCloseListener.onClose(link); |
| } |
| |
| ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); |
| if (linkContext != null) { |
| try { |
| linkContext.close(true); |
| } catch (Exception e) { |
| log.error(e.getMessage(), e); |
| } |
| } |
| |
| /// we have to perform the link.close after the linkContext.close is finished. |
| // linkeContext.close will perform a few executions on the netty loop, |
| // this has to come next |
| runLater(() -> { |
| link.close(); |
| link.free(); |
| flush(); |
| }); |
| } |
| |
| @Override |
| public void onRemoteDetach(Link link) throws Exception { |
| handler.requireHandler(); |
| boolean handleAsClose = link.getSource() != null && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH; |
| |
| if (handleAsClose) { |
| onRemoteClose(link); |
| } else { |
| link.detach(); |
| link.free(); |
| } |
| } |
| |
| @Override |
| public void onLocalDetach(Link link) throws Exception { |
| handler.requireHandler(); |
| Object context = link.getContext(); |
| if (context instanceof ProtonServerSenderContext) { |
| ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context; |
| senderContext.close(false); |
| } |
| } |
| |
| @Override |
| public void onDelivery(Delivery delivery) throws Exception { |
| handler.requireHandler(); |
| ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); |
| if (handler != null) { |
| handler.onMessage(delivery); |
| } else { |
| log.warn("Handler is null, can't delivery " + delivery, new Exception("tracing location")); |
| } |
| } |
| |
| |
| private class LocalSecurity implements SecurityAuth { |
| @Override |
| public String getUsername() { |
| String username = null; |
| SASLResult saslResult = getSASLResult(); |
| if (saslResult != null) { |
| username = saslResult.getUser(); |
| } |
| |
| return username; |
| } |
| |
| @Override |
| public String getPassword() { |
| String password = null; |
| SASLResult saslResult = getSASLResult(); |
| if (saslResult != null) { |
| if (saslResult instanceof PlainSASLResult) { |
| password = ((PlainSASLResult) saslResult).getPassword(); |
| } |
| } |
| |
| return password; |
| } |
| |
| @Override |
| public RemotingConnection getRemotingConnection() { |
| return connectionCallback.getProtonConnectionDelegate(); |
| } |
| |
| @Override |
| public String getSecurityDomain() { |
| return getProtocolManager().getSecurityDomain(); |
| } |
| } |
| |
| } |