blob: 13d5e15af232cf0210c459216ff2cf1589769180 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>> implements ConsumerTarget<T>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = () -> "[(** Multi-Queue **)] ";
protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
protected final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
private final boolean _isMultiQueue;
private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>();
private final AtomicBoolean _scheduled = new AtomicBoolean();
private volatile Iterator<MessageInstanceConsumer> _pullIterator;
private volatile boolean _notifyWorkDesired;
protected AbstractConsumerTarget(final boolean isMultiQueue,
final AMQPConnection<?> amqpConnection)
{
_isMultiQueue = isMultiQueue;
_suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
{
@Override
protected void log(final long period)
{
amqpConnection.getEventLogger().message(AbstractConsumerTarget.this.getLogSubject(), SubscriptionMessages.STATE(period));
}
};
}
private LogSubject getLogSubject()
{
if (_consumers.size() == 1 && _consumers.get(0) instanceof LogSubject)
{
return (LogSubject) _consumers.get(0);
}
else
{
return MULTI_QUEUE_LOG_SUBJECT;
}
}
@Override
public void acquisitionRemoved(final MessageInstance node)
{
}
@Override
public boolean isMultiQueue()
{
return _isMultiQueue;
}
@Override
public void notifyWork()
{
@SuppressWarnings("unchecked")
final T target = (T) this;
getSession().notifyWork(target);
}
protected final void setNotifyWorkDesired(final boolean desired)
{
if (desired != _notifyWorkDesired)
{
if (desired)
{
getSession().removeTicker(_suspendedConsumerLoggingTicker);
}
else
{
_suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
getSession().addTicker(_suspendedConsumerLoggingTicker);
}
for (MessageInstanceConsumer consumer : _consumers)
{
consumer.setNotifyWorkDesired(desired);
}
_notifyWorkDesired = desired;
}
}
@Override
public final boolean isNotifyWorkDesired()
{
return _notifyWorkDesired;
}
@Override
public boolean processPending()
{
if (getSession() == null || !getSession().getAMQPConnection().isIOThread())
{
return false;
}
// TODO - if not closed
return sendNextMessage();
}
@Override
public void consumerAdded(final MessageInstanceConsumer sub)
{
_consumers.add(sub);
}
@Override
public ListenableFuture<Void> consumerRemoved(final MessageInstanceConsumer sub)
{
if(_consumers.contains(sub))
{
return doOnIoThreadAsync(
() -> consumerRemovedInternal(sub));
}
else
{
return Futures.immediateFuture(null);
}
}
private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
{
return getSession().getAMQPConnection().doOnIOThreadAsync(task);
}
private void consumerRemovedInternal(final MessageInstanceConsumer sub)
{
_consumers.remove(sub);
if(_consumers.isEmpty())
{
close();
}
}
public List<MessageInstanceConsumer> getConsumers()
{
return _consumers;
}
@Override
public final boolean isSuspended()
{
return !isNotifyWorkDesired();
}
@Override
public final State getState()
{
return _state.get();
}
@Override
public final void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
doSend(consumer, entry, batch);
getSession().getAMQPConnection().updateLastMessageOutboundTime();
if (consumer.acquires())
{
entry.makeAcquisitionStealable();
}
}
@Override
public long getUnacknowledgedMessages()
{
return _unacknowledgedCount.longValue();
}
@Override
public long getUnacknowledgedBytes()
{
return _unacknowledgedBytes.longValue();
}
protected abstract void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
@Override
public boolean sendNextMessage()
{
MessageContainer messageContainer = null;
MessageInstanceConsumer consumer = null;
boolean iteratedCompleteList = false;
while (messageContainer == null)
{
if (_pullIterator == null || !_pullIterator.hasNext())
{
if (iteratedCompleteList)
{
break;
}
iteratedCompleteList = true;
_pullIterator = getConsumers().iterator();
}
if (_pullIterator.hasNext())
{
consumer = _pullIterator.next();
messageContainer = consumer.pullMessage();
}
}
if (messageContainer != null)
{
MessageInstance entry = messageContainer.getMessageInstance();
try
{
send(consumer, entry, false);
}
catch (MessageConversionException mce)
{
restoreCredit(entry.getMessage());
final TransactionLogResource owningResource = entry.getOwningResource();
if (owningResource instanceof MessageSource)
{
final MessageSource.MessageConversionExceptionHandlingPolicy handlingPolicy =
((MessageSource) owningResource).getMessageConversionExceptionHandlingPolicy();
switch(handlingPolicy)
{
case CLOSE:
entry.release(consumer);
throw new ConnectionScopedRuntimeException(String.format(
"Unable to convert message %s for this consumer",
entry.getMessage()), mce);
case ROUTE_TO_ALTERNATE:
if (consumer.acquires())
{
int enqueues = entry.routeToAlternate(null, null, null);
if (enqueues == 0)
{
LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+ " Message discarded.", entry.getMessage(), mce.getMessage());
}
else
{
LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+ " Message routed to alternate.", entry.getMessage(), mce.getMessage());
}
}
else
{
LOGGER.info("Failed to convert message {} for this browser because '{}'."
+ " Message skipped.", entry.getMessage(), mce.getMessage());
}
break;
case REJECT:
entry.reject(consumer);
entry.release(consumer);
LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+ " Message skipped.", entry.getMessage(), mce.getMessage());
break;
default:
throw new ServerScopedRuntimeException("Unrecognised policy " + handlingPolicy);
}
}
else
{
throw new ConnectionScopedRuntimeException(String.format(
"Unable to convert message %s for this consumer",
entry.getMessage()), mce);
}
}
finally
{
if (messageContainer.getMessageReference() != null)
{
messageContainer.getMessageReference().release();
}
}
return true;
}
else
{
return false;
}
}
@Override
final public boolean close()
{
if (_state.compareAndSet(State.OPEN, State.CLOSED))
{
setNotifyWorkDesired(false);
List<MessageInstanceConsumer> consumers = new ArrayList<>(_consumers);
_consumers.clear();
for (MessageInstanceConsumer consumer : consumers)
{
consumer.close();
}
getSession().removeTicker(_suspendedConsumerLoggingTicker);
return true;
}
else
{
return false;
}
}
final boolean setScheduled()
{
return _scheduled.compareAndSet(false, true);
}
final void clearScheduled()
{
_scheduled.set(false);
}
@Override
public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
{
consumerRemoved(sub);
}
}