blob: 804bde13f124108b3747ef2e4dad8ac262a7d9cd [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
import java.text.MessageFormat;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.StateChangeListener;
class QueueConsumerImpl
extends AbstractConfiguredObject<QueueConsumerImpl>
implements QueueConsumer<QueueConsumerImpl>, LogSubject
{
private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
private final long _createTime = System.currentTimeMillis();
private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl>
_owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
private final WaitingOnCreditMessageListener _waitingOnCreditMessageListener = new WaitingOnCreditMessageListener();
private final boolean _acquires;
private final boolean _seesRequeues;
private final boolean _isTransient;
private final AtomicLong _deliveredCount = new AtomicLong(0);
private final AtomicLong _deliveredBytes = new AtomicLong(0);
private final FilterManager _filters;
private final Class<? extends ServerMessage> _messageClass;
private final Object _sessionReference;
private final AbstractQueue _queue;
private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
static
{
STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
}
private final ConsumerTarget _target;
private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _listener;
private volatile QueueContext _queueContext;
private volatile StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
{
public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
{
// no-op
}
};
@ManagedAttributeField
private boolean _exclusive;
@ManagedAttributeField
private boolean _noLocal;
@ManagedAttributeField
private String _distributionMode;
@ManagedAttributeField
private String _settlementMode;
@ManagedAttributeField
private String _selector;
@ManagedAttributeField
private int _priority;
QueueConsumerImpl(final AbstractQueue<?> queue,
ConsumerTarget target,
final String consumerName,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
EnumSet<Option> optionSet,
final Integer priority)
{
super(parentsMap(queue, target.getSessionModel().getModelObject()),
createAttributeMap(consumerName, filters, optionSet, priority));
_messageClass = messageClass;
_sessionReference = target.getSessionModel().getConnectionReference();
_consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
_filters = filters;
_acquires = optionSet.contains(Option.ACQUIRES);
_seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
_isTransient = optionSet.contains(Option.TRANSIENT);
_target = target;
_queue = queue;
// Access control
authorise(Operation.CREATE);
open();
setupLogging();
_listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
{
@Override
public void stateChanged(final ConsumerTarget object,
final ConsumerTarget.State oldState,
final ConsumerTarget.State newState)
{
targetStateChanged(oldState, newState);
}
};
_target.addStateListener(_listener);
_suspendedConsumerLoggingTicker = target.isMultiQueue()
? null
: new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
{
@Override
protected void log(final long period)
{
getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
}
};
}
private static Map<String, Object> createAttributeMap(String name,
FilterManager filters,
EnumSet<Option> optionSet,
Integer priority)
{
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(ID, UUID.randomUUID());
attributes.put(NAME, name);
attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
if(priority != null)
{
attributes.put(PRIORITY, priority);
}
if(filters != null)
{
Iterator<MessageFilter> iter = filters.filters();
while(iter.hasNext())
{
MessageFilter filter = iter.next();
if(filter instanceof JMSSelectorFilter)
{
attributes.put(SELECTOR, ((JMSSelectorFilter) filter).getSelector());
break;
}
}
}
return attributes;
}
private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
{
if(oldState != newState)
{
if(newState == ConsumerTarget.State.CLOSED)
{
if(_targetClosed.compareAndSet(false,true))
{
getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
}
}
if(_suspendedConsumerLoggingTicker != null)
{
if (newState == ConsumerTarget.State.SUSPENDED)
{
_suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
}
else
{
getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
}
}
}
if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
{
closeAsync();
}
final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
if(stateListener != null)
{
stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
}
}
@Override
public ConsumerTarget getTarget()
{
return _target;
}
@Override
public void awaitCredit(final QueueEntry node)
{
_waitingOnCreditMessageListener.update(node);
}
@Override
public boolean hasCredit()
{
return _target.hasCredit();
}
@Override
public void externalStateChange()
{
if(isPullOnly())
{
getSessionModel().getAMQPConnection().notifyWork();
}
else
{
_queue.deliverAsync();
}
}
@Override
public boolean hasAvailableMessages()
{
return !_queue.isEmpty() && _queue.hasAvailableMessages(this);
}
@Override
public long getUnacknowledgedBytes()
{
return _target.getUnacknowledgedBytes();
}
@Override
public long getUnacknowledgedMessages()
{
return _target.getUnacknowledgedMessages();
}
@Override
public AMQSessionModel getSessionModel()
{
return _target.getSessionModel();
}
@Override
public MessageSource getMessageSource()
{
return _queue;
}
@Override
public boolean isSuspended()
{
return _target.isSuspended();
}
@Override
protected void onClose()
{
if(_closed.compareAndSet(false,true))
{
_target.getSendLock();
try
{
_waitingOnCreditMessageListener.remove();
_target.consumerRemoved(this);
_target.removeStateChangeListener(_listener);
_queue.unregisterConsumer(this);
if(_suspendedConsumerLoggingTicker != null)
{
getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
}
deleted();
}
finally
{
_target.releaseSendLock();
}
}
}
@Override
public int getPriority()
{
return _priority;
}
public void flushBatched()
{
_target.flushBatched();
}
@Override
public boolean isPullOnly()
{
return _target.isPullOnly();
}
public void queueDeleted()
{
_target.queueDeleted();
}
public boolean wouldSuspend(final QueueEntry msg)
{
return !_target.allocateCredit(msg.getMessage());
}
public void restoreCredit(final QueueEntry queueEntry)
{
_target.restoreCredit(queueEntry.getMessage());
}
public void queueEmpty()
{
_target.queueEmpty();
}
@Override
public State getState()
{
return STATE_MAP.get(_target.getState());
}
@Override
public final Queue<?> getQueue()
{
return _queue;
}
private void setupLogging()
{
final String filterLogString = getFilterLogString();
getEventLogger().message(this,
SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && _exclusive,
filterLogString.length() > 0));
}
protected final LogSubject getLogSubject()
{
return this;
}
@Override
public final void flush()
{
AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
try
{
connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
_queue.flushConsumer(this);
_target.processPending();
}
finally
{
connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
}
}
@Override
public void pullMessage()
{
AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
try
{
connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
_queue.flushConsumer(this, 1);
}
finally
{
connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
}
}
public boolean resend(final QueueEntry entry)
{
boolean messageWasResent = getQueue().resend(entry, this);
if (messageWasResent)
{
_target.processPending();
}
return messageWasResent;
}
public final long getConsumerNumber()
{
return _consumerNumber;
}
public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
{
return _stateListener;
}
public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
{
_stateListener = listener;
}
public final QueueContext getQueueContext()
{
return _queueContext;
}
final void setQueueContext(QueueContext queueContext)
{
_queueContext = queueContext;
}
public final boolean isActive()
{
return getState() == State.ACTIVE;
}
public final boolean isClosed()
{
return getState() == State.DELETED;
}
public final boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
if (entry.isRejectedBy(this) || entry.checkHeld(System.currentTimeMillis()))
{
return false;
}
if (entry.getMessage().getClass() == _messageClass)
{
if(_noLocal)
{
Object connectionRef = entry.getMessage().getConnectionReference();
if (connectionRef != null && connectionRef == _sessionReference)
{
return false;
}
}
}
else
{
// no interest in messages we can't convert
if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(),
_messageClass)==null)
{
return false;
}
}
if (_filters == null)
{
return true;
}
else
{
MessageReference ref = entry.newMessageReference();
if(ref != null)
{
try
{
Filterable msg = entry.asFilterable();
try
{
return _filters.allAllow(msg);
}
catch (SelectorParsingException e)
{
LOGGER.info(this + " could not evaluate filter [" + _filters
+ "] against message " + msg
+ ". Error was : " + e.getMessage());
return false;
}
}
finally
{
ref.release();
}
}
else
{
return false;
}
}
}
protected String getFilterLogString()
{
StringBuilder filterLogString = new StringBuilder();
String delimiter = ", ";
boolean hasEntries = false;
if (_filters != null && _filters.hasFilters())
{
filterLogString.append(_filters.toString());
hasEntries = true;
}
if (!acquires())
{
if (hasEntries)
{
filterLogString.append(delimiter);
}
filterLogString.append("Browser");
}
return filterLogString.toString();
}
public final boolean trySendLock()
{
return getTarget().trySendLock();
}
public final void getSendLock()
{
getTarget().getSendLock();
}
public final void releaseSendLock()
{
getTarget().releaseSendLock();
}
public final long getCreateTime()
{
return _createTime;
}
public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl> getOwningState()
{
return _owningState;
}
public final boolean acquires()
{
return _acquires;
}
public final boolean seesRequeues()
{
return _seesRequeues;
}
public final boolean isTransient()
{
return _isTransient;
}
public final long getBytesOut()
{
return _deliveredBytes.longValue();
}
public final long getMessagesOut()
{
return _deliveredCount.longValue();
}
public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
long size = _target.send(this, entry, batch);
_deliveredBytes.addAndGet(size);
}
@Override
public void acquisitionRemoved(final QueueEntry node)
{
_target.acquisitionRemoved(node);
_queue.decrementUnackedMsgCount(node);
}
@Override
public String getDistributionMode()
{
return _distributionMode;
}
@Override
public String getSettlementMode()
{
return _settlementMode;
}
@Override
public boolean isExclusive()
{
return _exclusive;
}
@Override
public boolean isNoLocal()
{
return _noLocal;
}
@Override
public String getSelector()
{
return _selector;
}
@Override
public String toLogString()
{
String logString;
if(_queue == null)
{
logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
+ "(UNKNOWN)"
+ "] ";
}
else
{
String queueString = new QueueLogSubject(_queue).toLogString();
logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
+ "("
// queueString is [vh(/{0})/qu({1}) ] so need to trim
// ^ ^^
+ queueString.substring(1,queueString.length() - 3)
+ ")"
+ "] ";
}
return logString;
}
private EventLogger getEventLogger()
{
return _queue.getEventLogger();
}
public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, MessageInstance.EntryState>
{
private final AtomicReference<MessageInstance> _entry = new AtomicReference<>();
public WaitingOnCreditMessageListener()
{
}
public void update(final MessageInstance entry)
{
remove();
// this only happens under send lock so only one thread can be setting to a non null value at any time
_entry.set(entry);
entry.addStateChangeListener(this);
if(!entry.isAvailable())
{
if(isPullOnly())
{
getSessionModel().getAMQPConnection().notifyWork();
}
else
{
_queue.deliverAsync();
}
remove();
}
}
public void remove()
{
MessageInstance instance;
if((instance = _entry.getAndSet(null)) != null)
{
instance.removeStateChangeListener(this);
}
}
@Override
public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
{
entry.removeStateChangeListener(this);
_entry.compareAndSet(entry, null);
if(isPullOnly())
{
getSessionModel().getAMQPConnection().notifyWork();
}
else
{
_queue.deliverAsync();
}
}
}
}