blob: cb9c74b121435258d81a4502918e4cc56994c7d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
public class AMQConsumer {
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private final AMQSession session;
private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final boolean hasNotificationDestination;
private final ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private ServerConsumer serverConsumer;
private int prefetchSize;
private final AtomicInteger currentWindow;
private int deliveredAcks;
private long messagePullSequence = 0;
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
//internal means we don't expose
//it's address/queue to management service
private boolean internalAddress = false;
private volatile Set<MessageReference> rolledbackMessageRefs;
public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d,
ConsumerInfo info,
ScheduledExecutorService scheduledPool,
boolean internalAddress) {
this.session = amqSession;
this.openwireDestination = d;
this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
this.currentWindow = new AtomicInteger(prefetchSize);
this.deliveredAcks = 0;
if (prefetchSize == 0) {
messagePullHandler.set(new MessagePullHandler());
}
this.internalAddress = internalAddress;
this.rolledbackMessageRefs = null;
}
private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() {
synchronized (this) {
Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs;
if (rollbackedMessageRefs == null) {
rollbackedMessageRefs = new ConcurrentSkipListSet<>(Comparator.comparingLong(MessageReference::getMessageID));
this.rolledbackMessageRefs = rollbackedMessageRefs;
}
return rollbackedMessageRefs;
}
}
private Set<MessageReference> getRolledbackMessageRefsOrCreate() {
Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs;
if (rolledbackMessageRefs == null) {
rolledbackMessageRefs = guardedInitializationOfRolledBackMessageRefs();
}
return rolledbackMessageRefs;
}
private Set<MessageReference> getRolledbackMessageRefs() {
return this.rolledbackMessageRefs;
}
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector()));
boolean preAck = false;
if (info.isNoLocal()) {
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
//tell the connection to add the property
this.session.getConnection().setNoLocal(true);
} else {
preAck = true;
}
String id = info.getClientId() != null ? info.getClientId() : this.getId().getConnectionId();
String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + id + "'";
if (selector == null) {
selector = new SimpleString(noLocalSelector);
} else {
selector = new SimpleString(info.getSelector() + " AND " + noLocalSelector);
}
}
SimpleString destinationName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
if (openwireDestination.isTopic()) {
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
//only advisory topic consumers need this.
((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
} else {
try {
session.getCoreServer().createQueue(new QueueConfiguration(destinationName)
.setRoutingType(RoutingType.ANYCAST));
} catch (ActiveMQQueueExistsException e) {
// ignore
}
serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString());
if (addrSettings != null) {
//see PolicyEntry
if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
//sends back a ConsumerControl
ConsumerControl cc = new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(0);
session.getConnection().dispatch(cc);
}
}
}
serverConsumer.setProtocolData(this);
}
private SimpleString createTopicSubscription(boolean isDurable,
String clientID,
String physicalName,
String subscriptionName,
SimpleString selector,
SimpleString address) throws Exception {
SimpleString queueName;
if (isDurable) {
queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName);
if (info.getDestination().isComposite()) {
queueName = queueName.concat(physicalName);
}
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isExists()) {
// Already exists
if (result.getConsumerCount() > 0) {
throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
}
SimpleString oldFilterString = result.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
SimpleString oldTopicName = result.getAddress();
boolean topicChanged = !oldTopicName.equals(address);
if (selectorChanged || topicChanged) {
// Delete the old durable sub
session.getCoreSession().deleteQueue(queueName);
// Create the new one
session.getCoreSession().createQueue(new QueueConfiguration(queueName).setAddress(address).setFilterString(selector).setInternal(internalAddress));
}
} else {
session.getCoreSession().createQueue(new QueueConfiguration(queueName).setAddress(address).setFilterString(selector).setInternal(internalAddress));
}
} else {
queueName = new SimpleString(UUID.randomUUID().toString());
session.getCoreSession().createQueue(new QueueConfiguration(queueName).setAddress(address).setFilterString(selector).setDurable(false).setTemporary(true).setInternal(internalAddress));
}
return queueName;
}
public ConsumerId getId() {
return info.getConsumerId();
}
public void acquireCredit(int n) {
if (messagePullHandler.get() != null) {
//don't acquire any credits when the pull handler controls it!!
return;
}
int oldwindow = currentWindow.getAndAdd(n);
boolean promptDelivery = oldwindow < prefetchSize;
if (promptDelivery) {
serverConsumer.promptDelivery();
}
}
public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) {
MessageDispatch dispatch;
try {
MessagePullHandler pullHandler = messagePullHandler.get();
if (pullHandler != null && !pullHandler.checkForcedConsumer(message)) {
return 0;
}
if (session.getConnection().isNoLocal() || session.isInternal()) {
//internal session always delivers messages to noLocal advisory consumers
//so we need to remove this property too.
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
//handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat()
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this);
int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
currentWindow.decrementAndGet();
return size;
} catch (IOException e) {
ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", e);
return 0;
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", t);
return 0;
}
}
public void handleDeliverNullDispatch() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(getId());
md.setDestination(openwireDestination);
session.deliverMessage(md);
}
/**
* The acknowledgement in openwire is done based on intervals.
* We will iterate through the list of delivering messages at {@link ServerConsumer#getDeliveringReferencesBasedOnProtocol(boolean, Object, Object)}
* and add those to the Transaction.
* Notice that we will start a new transaction on the cases where there is no transaction.
*/
public void acknowledge(MessageAck ack) throws Exception {
MessageId first = ack.getFirstMessageId();
MessageId last = ack.getLastMessageId();
if (first == null) {
first = last;
}
boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists
if (serverConsumer.getQueue().isNonDestructive()) {
removeReferences = false;
}
if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
removeReferences = false;
}
List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) {
if (deliveredAcks < ackList.size()) {
acquireCredit(ackList.size() - deliveredAcks);
deliveredAcks = 0;
} else {
deliveredAcks -= ackList.size();
}
} else {
if (ack.isDeliveredAck()) {
this.deliveredAcks += ack.getMessageCount();
}
acquireCredit(ack.getMessageCount());
}
if (removeReferences) {
Transaction originalTX = session.getCoreSession().getCurrentTransaction();
Transaction transaction;
if (originalTX == null) {
transaction = session.getCoreSession().newTransaction();
} else {
transaction = originalTX;
}
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
ref.acknowledge(transaction, serverConsumer);
}
} else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}
}
if (originalTX == null) {
transaction.commit(true);
}
}
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
ref.getQueue().expire(ref, serverConsumer);
}
}
}
public void browseFinished() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(info.getConsumerId());
md.setMessage(null);
md.setDestination(null);
session.deliverMessage(md);
}
public ConsumerInfo getInfo() {
return info;
}
public boolean hasCredits() {
return currentWindow.get() > 0;
}
public void processMessagePull(MessagePull messagePull) throws Exception {
currentWindow.incrementAndGet();
MessagePullHandler pullHandler = messagePullHandler.get();
if (pullHandler != null) {
pullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout());
}
}
public void removeConsumer() throws Exception {
serverConsumer.close(false);
}
public boolean hasNotificationDestination() {
return hasNotificationDestination;
}
public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
return openwireDestination;
}
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
this.currentWindow.set(prefetchSize);
this.info.setPrefetchSize(prefetchSize);
if (this.prefetchSize == 0) {
messagePullHandler.compareAndSet(null, new MessagePullHandler());
} else {
messagePullHandler.set(null);
}
if (this.prefetchSize > 0) {
serverConsumer.promptDelivery();
}
}
public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
if (RemoveInfo.LAST_DELIVERED_UNKNOWN == info.getLastDeliveredSequenceId()) {
// treat as delivered
return true;
}
if (ref.getMessageID() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
// treat as delivered
return true;
}
// default behaviour
return false;
}
/**
* The MessagePullHandler is used with slow consumer policies.
*/
private class MessagePullHandler {
private long next = -1;
private long timeout;
private CountDownLatch latch = new CountDownLatch(1);
private ScheduledFuture<?> messagePullFuture;
public void nextSequence(long next, long timeout) throws Exception {
this.next = next;
this.timeout = timeout;
latch = new CountDownLatch(1);
serverConsumer.forceDelivery(messagePullSequence);
//if we are 0 timeout or less we need to wait to get either the forced message or a real message.
if (timeout <= 0) {
latch.await(10, TimeUnit.SECONDS);
//this means we have received no message just the forced delivery message
if (this.next >= 0) {
handleDeliverNullDispatch();
}
}
}
public boolean checkForcedConsumer(Message message) {
if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
if (next >= 0) {
if (timeout <= 0) {
latch.countDown();
} else {
messagePullFuture = scheduledPool.schedule(new Runnable() {
@Override
public void run() {
if (next >= 0) {
handleDeliverNullDispatch();
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
}
return false;
} else {
next = -1;
if (messagePullFuture != null) {
messagePullFuture.cancel(true);
}
latch.countDown();
return true;
}
}
}
public boolean removeRolledback(MessageReference messageReference) {
final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs();
if (rolledbackMessageRefs == null) {
return false;
}
return rolledbackMessageRefs.remove(messageReference);
}
public void addRolledback(MessageReference messageReference) {
getRolledbackMessageRefsOrCreate().add(messageReference);
}
private boolean isRolledBack(MessageReference messageReference) {
final Set<MessageReference> rollbackedMessageRefs = getRolledbackMessageRefs();
if (rollbackedMessageRefs == null) {
return false;
}
return rollbackedMessageRefs.contains(messageReference);
}
}