blob: 22626ce804c7c853d6797f7dfd0a48ccc27e16c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.jboss.logging.Logger;
public class AMQPSessionContext extends ProtonInitializable {
private static final Logger log = Logger.getLogger(AMQPSessionContext.class);
protected final AMQPConnectionContext connection;
protected final AMQPSessionCallback sessionSPI;
protected final Session session;
protected Map<Receiver, ProtonAbstractReceiver> receivers = new ConcurrentHashMap<>();
protected Map<Sender, ProtonServerSenderContext> senders = new ConcurrentHashMap<>();
protected boolean closed = false;
protected final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
protected final ActiveMQServer server;
public AMQPSessionContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, Session session, ActiveMQServer server) {
this.connection = connection;
this.sessionSPI = sessionSPI;
this.session = session;
this.server = server;
}
protected Map<Object, ProtonServerSenderContext> serverSenders = new ConcurrentHashMap<>();
public AMQPSessionCallback getSessionSPI() {
return sessionSPI;
}
@Override
public void initialize() throws Exception {
if (!isInitialized()) {
super.initialize();
if (sessionSPI != null) {
try {
sessionSPI.init(this, connection.getSASLResult());
} catch (ActiveMQSecurityException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
}
}
}
/**
* @param consumer
* @param queueName
*/
public void disconnect(Object consumer, String queueName) {
ProtonServerSenderContext protonConsumer = senders.remove(consumer);
if (protonConsumer != null) {
serverSenders.remove(protonConsumer.getBrokerConsumer());
try {
protonConsumer.close(false);
} catch (ActiveMQAMQPException e) {
protonConsumer.getSender().setTarget(null);
protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
}
}
}
public byte[] getTag() {
return tagCache.getNextTag();
}
public void replaceTag(byte[] tag) {
tagCache.returnTag(tag);
}
public void close() {
if (closed) {
return;
}
// Making a copy to avoid ConcurrentModificationException during the iteration
Set<ProtonAbstractReceiver> receiversCopy = new HashSet<>();
receiversCopy.addAll(receivers.values());
for (ProtonAbstractReceiver protonProducer : receiversCopy) {
try {
protonProducer.close(false);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
}
receivers.clear();
Set<ProtonServerSenderContext> protonSendersClone = new HashSet<>();
protonSendersClone.addAll(senders.values());
for (ProtonServerSenderContext protonConsumer : protonSendersClone) {
try {
protonConsumer.close(false);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
}
senders.clear();
serverSenders.clear();
try {
if (sessionSPI != null) {
sessionSPI.close();
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
closed = true;
}
public void removeReceiver(Receiver receiver) {
sessionSPI.removeProducer(receiver.getName());
receivers.remove(receiver);
}
public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection);
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
receiver.setContext(transactionHandler);
connection.runNow(() -> {
receiver.open();
receiver.flow(connection.getAmqpCredits());
connection.flush();
});
}
public void addSender(Sender sender) throws Exception {
addSender(sender, (SenderController)null);
}
public void addSender(Sender sender, SenderController senderController) throws Exception {
// TODO: Remove this check when we have support for global link names
boolean outgoing = (sender.getContext() != null && sender.getContext().equals(true));
ProtonServerSenderContext protonSender = outgoing ? new ProtonClientSenderContext(connection, sender, this, sessionSPI) : new ProtonServerSenderContext(connection, sender, this, sessionSPI, senderController);
addSender(sender, protonSender);
}
public void addSender(Sender sender, ProtonServerSenderContext protonSender) throws Exception {
try {
protonSender.initialize();
senders.put(sender, protonSender);
serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
sender.setContext(protonSender);
if (sender.getLocalState() != EndpointState.ACTIVE) {
connection.runNow(() -> {
sender.open();
connection.flush();
});
}
protonSender.start();
} catch (ActiveMQAMQPException e) {
senders.remove(sender);
if (protonSender.getBrokerConsumer() != null) {
serverSenders.remove(protonSender.getBrokerConsumer());
}
sender.setSource(null);
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
connection.runNow(() -> {
sender.close();
connection.flush();
});
}
}
public void removeSender(Sender sender) throws ActiveMQAMQPException {
ProtonServerSenderContext senderRemoved = senders.remove(sender);
if (senderRemoved != null) {
serverSenders.remove(senderRemoved.getBrokerConsumer());
}
}
public void addReplicaTarget(Receiver receiver) throws Exception {
try {
AMQPMirrorControllerTarget protonReceiver = new AMQPMirrorControllerTarget(sessionSPI, connection, this, receiver, server);
protonReceiver.initialize();
receivers.put(receiver, protonReceiver);
ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
sessionSPI.addProducer(serverProducer);
receiver.setContext(protonReceiver);
HashMap<Symbol, Object> brokerIDProperties = new HashMap<>();
brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, server.getNodeID().toString());
receiver.setProperties(brokerIDProperties);
connection.runNow(() -> {
receiver.open();
connection.flush();
});
} catch (ActiveMQAMQPException e) {
receivers.remove(receiver);
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
connection.runNow(() -> {
receiver.close();
connection.flush();
});
}
}
public void addReceiver(Receiver receiver) throws Exception {
try {
ProtonServerReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver);
protonReceiver.initialize();
receivers.put(receiver, protonReceiver);
ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
sessionSPI.addProducer(serverProducer);
receiver.setContext(protonReceiver);
connection.runNow(() -> {
receiver.open();
connection.flush();
});
} catch (ActiveMQAMQPException e) {
receivers.remove(receiver);
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
connection.runNow(() -> {
receiver.close();
connection.flush();
});
}
}
public int getReceiverCount() {
return receivers.size();
}
public int getSenderCount() {
return senders.size();
}
}