blob: 978c166ea17769f46892ab661491ed7a0b54a857 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
private static final Logger logger = Logger.getLogger(AMQPMirrorControllerTarget.class);
private static ThreadLocal<MirrorController> controllerThreadLocal = new ThreadLocal<>();
public static void setControllerInUse(MirrorController controller) {
public static MirrorController getControllerInUse() {
return controllerThreadLocal.get();
/** Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
* the operation context. Don't use transaction and OperationContext at the same time
* as that would generate duplicates on the objects cache.
class ACKMessageOperation implements IOCallback, Runnable {
Delivery delivery;
/** notice that when you use the Transaction, you need to make sure you don't use the IO*/
public TransactionOperationAbstract tx = new TransactionOperationAbstract() {
public void afterCommit(Transaction tx) {
void reset() { = null;
ACKMessageOperation setDelivery(Delivery delivery) { = delivery;
return this;
public void run() {
if (logger.isTraceEnabled()) {
logger.trace("Delivery settling for " + delivery + ", context=" + delivery.getContext());
public void done() {
private void completeOperation() {
public void onError(int errorCode, String errorMessage) {
logger.warn(errorMessage + "-" + errorMessage);
// in a regular case we should not have more than amqpCredits on the pool, that's the max we would need
private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new ACKMessageOperation());
final RoutingContextImpl routingContext = new RoutingContextImpl(null);
final BasicMirrorController<Receiver> basicController;
final ActiveMQServer server;
DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;
private final NodeStore<MessageReference> referenceNodeStore;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,
Receiver receiver,
ActiveMQServer server) {
super(sessionSPI, connection, protonSession, receiver);
this.basicController = new BasicMirrorController(server);
this.server = server;
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
public String getRemoteMirrorId() {
return basicController.getRemoteMirrorId();
public void flow() {;
protected void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) {
if (logger.isTraceEnabled()) {
logger.trace(server + "::actualdelivery call for " + message);
ACKMessageOperation messageAckOperation = this.ackMessageMpscPool.borrow().setDelivery(delivery);
try {
/** We use message annotations, because on the same link we will receive control messages
* coming from mirror events,
* and the actual messages that need to be replicated.
* Using anything from the body would force us to parse the body on regular messages.
* The body of the message may still be used on control messages, on cases where a JSON string is sent. */
Object eventType = AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, EVENT_TYPE);
if (eventType != null) {
if (eventType.equals(ADD_ADDRESS)) {
AddressInfo addressInfo = parseAddress(message);
if (logger.isDebugEnabled()) {
logger.debug(server + " Adding Address " + addressInfo);
} else if (eventType.equals(DELETE_ADDRESS)) {
AddressInfo addressInfo = parseAddress(message);
if (logger.isDebugEnabled()) {
logger.debug(server + " Removing Address " + addressInfo);
} else if (eventType.equals(CREATE_QUEUE)) {
QueueConfiguration queueConfiguration = parseQueue(message);
if (logger.isDebugEnabled()) {
logger.debug(server + " Creating queue " + queueConfiguration);
} else if (eventType.equals(DELETE_QUEUE)) {
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
if (logger.isDebugEnabled()) {
logger.debug(server + " Deleting queue " + queueName + " on address " + address);
deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
} else if (eventType.equals(POST_ACK)) {
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
if (nodeID == null) {
nodeID = getRemoteMirrorId(); // not sending the nodeID means it's data generated on that broker
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
AmqpValue value = (AmqpValue) message.getBody();
Long messageID = (Long) value.getValue();
if (logger.isDebugEnabled()) {
logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation)) {
messageAckOperation = null;
} else {
if (logger.isDebugEnabled()) {
logger.debug(server + " Sending message " + message);
if (sendMessage(message, messageAckOperation)) {
// since the send was successful, we give up the reference here,
// so there won't be any call on afterCompleteOperations
messageAckOperation = null;
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
if (messageAckOperation != null) {
public void initialize() throws Exception {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
// Match the settlement mode of the remote instead of relying on the default of MIXED.
// We don't currently support SECOND so enforce that the answer is anlways FIRST
private QueueConfiguration parseQueue(AMQPMessage message) throws Exception {
AmqpValue bodyvalue = (AmqpValue) message.getBody();
String body = (String) bodyvalue.getValue();
QueueConfiguration queueConfiguration = QueueConfiguration.fromJSON(body);
return queueConfiguration;
private AddressInfo parseAddress(AMQPMessage message) throws Exception {
AmqpValue bodyvalue = (AmqpValue) message.getBody();
String body = (String) bodyvalue.getValue();
AddressInfo addressInfo = AddressInfo.fromJSON(body);
return addressInfo;
public void addAddress(AddressInfo addressInfo) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug(server + " Adding address " + addressInfo);
public void deleteAddress(AddressInfo addressInfo) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug(server + " delete address " + addressInfo);
try {
server.removeAddressInfo(addressInfo.getName(), null, true);
} catch (ActiveMQAddressDoesNotExistException expected) {
// it was removed from somewhere else, which is fine
logger.debug(expected.getMessage(), expected);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug(server + " Adding queue " + queueConfiguration);
try {
server.createQueue(queueConfiguration, true);
} catch (Exception ignored) {
logger.debug("Queue could not be created, already existed " + queueConfiguration, ignored);
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug(server + " destroy queue " + queueName + " on address = " + addressName + " server " + server.getIdentity());
try {
server.destroyQueue(queueName,null, false, true, false, false);
} catch (ActiveMQNonExistentQueueException expected) {
logger.debug(server + " queue " + queueName + " was previously removed", expected);
public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage) throws Exception {
final Queue targetQueue = server.locateQueue(queue);
if (targetQueue == null) {
logger.warn("Queue " + queue + " not found on mirror target, ignoring ack for queue=" + queue + ", messageID=" + messageID + ", nodeID=" + nodeID);
return false;
if (logger.isDebugEnabled()) {
// we only do the following check if debug
if (targetQueue.getConsumerCount() > 0) {
logger.debug("server " + server.getIdentity() + ", queue " + targetQueue.getName() + " has consumers while delivering ack for " + messageID);
if (logger.isTraceEnabled()) {
logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue);
performAck(nodeID, messageID, targetQueue, ackMessage, true);
return true;
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName());
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null && retry) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID);
targetQueue.flushOnIntermediate(() -> {
performAck(nodeID, messageID, targetQueue, ackMessageOperation, false);
if (reference != null) {
if (logger.isTraceEnabled()) {
logger.trace("Post ack Server " + server + " worked well for messageID=" + messageID + " nodeID=" + nodeID);
try {
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Post ack Server " + server + " could not find messageID = " + messageID +
" representing nodeID=" + nodeID);
* this method returning true means the sendMessage was successful, and the IOContext should no longer be used.
* as the sendMessage was successful the OperationContext of the transaction will take care of the completion.
* The caller of this method should give up any reference to messageCompletionAck when this method returns true.
* */
private boolean sendMessage(AMQPMessage message, ACKMessageOperation messageCompletionAck) throws Exception {
if (message.getMessageID() <= 0) {
String internalMirrorID = (String)AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, BROKER_ID);
if (internalMirrorID == null) {
internalMirrorID = getRemoteMirrorId(); // not pasisng the ID means the data was generated on the remote broker
Long internalIDLong = (Long) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
String internalAddress = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_DESTINATION);
long internalID = 0;
if (internalIDLong != null) {
internalID = internalIDLong;
if (logger.isTraceEnabled()) {
logger.trace("sendMessage on server " + server + " for message " + message +
" with internalID = " + internalIDLong + " mirror id " + internalMirrorID);
routingContext.setDuplicateDetection(false); // we do our own duplicate detection here
DuplicateIDCache duplicateIDCache;
if (lruDuplicateIDKey != null && lruDuplicateIDKey.equals(internalMirrorID)) {
duplicateIDCache = lruduplicateIDCache;
} else {
// we use the number of credits for the duplicate detection, as that means the maximum number of elements you can have pending
if (logger.isDebugEnabled()) {
logger.trace("Setting up duplicate detection cache on " + ProtonProtocolManager.MIRROR_ADDRESS + ", ServerID=" + internalMirrorID + " with " + connection.getAmqpCredits() + " elements, being the number of credits");
lruDuplicateIDKey = internalMirrorID;
lruduplicateIDCache = server.getPostOffice().getDuplicateIDCache(SimpleString.toSimpleString(ProtonProtocolManager.MIRROR_ADDRESS + "_" + internalMirrorID), connection.getAmqpCredits());
duplicateIDCache = lruduplicateIDCache;
byte[] duplicateIDBytes = ByteUtil.longToBytes(internalIDLong);
if (duplicateIDCache.contains(duplicateIDBytes)) {
return false;
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
if (internalAddress != null) {
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager());
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
server.getPostOffice().route(message, routingContext, false);
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
// return true here will instruct the caller to ignore any references to messageCompletionAck
return true;
* @param ref
* @param reason
public void postAcknowledge(MessageReference ref, AckReason reason) {
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {