blob: ce81bb7f3bf693fdecdb58a9b014a1a7920589ad [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.transfer;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.server.message.BaseMessageInstance;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class TransferQueueImpl implements TransferQueue
{
private static final UUID TRANSFER_QUEUE_ID = UUID.nameUUIDFromBytes("$transfer".getBytes(StandardCharsets.UTF_8));
private static final int RECOVERING = 1;
private static final int COMPLETING_RECOVERY = 2;
private static final int RECOVERED = 3;
private final AtomicInteger _recovering = new AtomicInteger(RECOVERING);
private final AtomicInteger _enqueuingWhileRecovering = new AtomicInteger(0);
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
private final VirtualHost<?> _virtualHost;
private final TransferQueueEntryList _queueEntryList;
private Collection<TransferQueueConsumer> _consumers = new CopyOnWriteArrayList<>();
public TransferQueueImpl(final VirtualHost<?> virtualHost)
{
_virtualHost = virtualHost;
_queueEntryList = new TransferQueueEntryList(this);
}
@Override
public void enqueue(final ServerMessage message,
final Action<? super BaseMessageInstance> action,
final MessageEnqueueRecord enqueueRecord)
{
if(_recovering.get() != RECOVERED)
{
_enqueuingWhileRecovering.incrementAndGet();
boolean addedToRecoveryQueue;
try
{
if(addedToRecoveryQueue = (_recovering.get() == RECOVERING))
{
_postRecoveryQueue.add(new EnqueueRequest(message, action, enqueueRecord));
}
}
finally
{
_enqueuingWhileRecovering.decrementAndGet();
}
if(!addedToRecoveryQueue)
{
while(_recovering.get() != RECOVERED)
{
Thread.yield();
}
doEnqueue(message, action, enqueueRecord);
}
}
else
{
doEnqueue(message, action, enqueueRecord);
}
}
@Override
public void recover(final ServerMessage<?> message, final MessageEnqueueRecord enqueueRecord)
{
doEnqueue(message, null, enqueueRecord);
}
@Override
public final void completeRecovery()
{
if(_recovering.compareAndSet(RECOVERING, COMPLETING_RECOVERY))
{
while(_enqueuingWhileRecovering.get() != 0)
{
Thread.yield();
}
// at this point we can assert that any new enqueue to the queue will not try to put into the post recovery
// queue (because the state is no longer RECOVERING, but also no threads are currently trying to enqueue
// because the _enqueuingWhileRecovering count is 0.
enqueueFromPostRecoveryQueue();
_recovering.set(RECOVERED);
}
}
@Override
public VirtualHost<?> getVirtualHost()
{
return _virtualHost;
}
private void enqueueFromPostRecoveryQueue()
{
while(!_postRecoveryQueue.isEmpty())
{
EnqueueRequest request = _postRecoveryQueue.poll();
MessageReference<?> messageReference = request.getMessage();
doEnqueue(messageReference.getMessage(), request.getAction(), request.getEnqueueRecord());
messageReference.release();
}
}
protected void doEnqueue(final ServerMessage message, final Action<? super BaseMessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
final TransferQueueEntry entry = (TransferQueueEntry) _queueEntryList.add(message, enqueueRecord);
for (TransferQueueConsumer consumer : getConsumers())
{
if (consumer.hasInterest(entry))
{
consumer.notifyWork();
}
}
}
private Collection<TransferQueueConsumer> getConsumers()
{
return _consumers;
}
@Override
public TransferQueueConsumer addConsumer(final TransferTarget target,
final String consumerName)
{
TransferQueueConsumer consumer = new TransferQueueConsumer(this,
target,
consumerName);
QueueContext queueContext = new QueueContext(_queueEntryList.getHead());
consumer.setQueueContext(queueContext);
_consumers.add(consumer);
consumer.notifyWork();
return consumer;
}
boolean hasAvailableMessages(final TransferQueueConsumer queueConsumer)
{
return getNextAvailableEntry(queueConsumer) != null;
}
void setLastSeenEntry(final TransferQueueConsumer sub, final TransferQueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
if (subContext != null)
{
QueueEntry releasedEntry = subContext.getReleasedEntry();
QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
{
QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
}
}
}
TransferQueueEntry getNextAvailableEntry(final TransferQueueConsumer sub)
{
QueueContext context = sub.getQueueContext();
if(context != null)
{
QueueEntry lastSeen = context.getLastSeenEntry();
QueueEntry releasedNode = context.getReleasedEntry();
TransferQueueEntry node =
(TransferQueueEntry) ((releasedNode != null && lastSeen.compareTo(releasedNode) >= 0)
? releasedNode : _queueEntryList.next(lastSeen));
boolean expired = false;
while (node != null
&& (!node.isAvailable()
|| (expired = node.expired())
|| !sub.hasInterest(node)))
{
if (expired)
{
expired = false;
if (node.acquire())
{
dequeueEntry(node);
}
}
if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
{
QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
}
lastSeen = context.getLastSeenEntry();
releasedNode = context.getReleasedEntry();
node = (TransferQueueEntry) ((releasedNode != null && lastSeen.compareTo(releasedNode) >= 0)
? releasedNode
: _queueEntryList.next(lastSeen));
}
return node;
}
else
{
return null;
}
}
private void dequeueEntry(final QueueEntry node)
{
ServerTransaction txn = new AutoCommitTransaction(_virtualHost.getMessageStore());
dequeueEntry(node, txn);
}
private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
{
txn.dequeue(node.getEnqueueRecord(),
new ServerTransaction.Action()
{
public void postCommit()
{
node.delete();
}
public void onRollback()
{
}
});
}
@Override
public boolean isDurable()
{
return true;
}
@Override
public boolean isDeleted()
{
return false;
}
@Override
public String getName()
{
return "$transfer";
}
@Override
public NamedAddressSpace getAddressSpace()
{
return _virtualHost;
}
@Override
public void authorisePublish(final SecurityToken token, final Map<String, Object> arguments)
throws AccessControlException
{
// TODO
}
@Override
public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final String routingAddress,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
final Action<? super BaseMessageInstance> postEnqueueAction)
{
if (_virtualHost.getState() != State.ACTIVE)
{
throw new VirtualHostUnavailableException(this._virtualHost);
}
if(!message.isReferenced(this))
{
txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
{
MessageReference _reference = message.newReference();
public void postCommit(MessageEnqueueRecord... records)
{
try
{
TransferQueueImpl.this.enqueue(message, postEnqueueAction, records[0]);
}
finally
{
_reference.release();
}
}
public void onRollback()
{
_reference.release();
}
});
return 1;
}
else
{
return 0;
}
}
@Override
public UUID getId()
{
return TRANSFER_QUEUE_ID;
}
@Override
public MessageDurability getMessageDurability()
{
return MessageDurability.DEFAULT;
}
public void flushConsumer(final TransferQueueConsumer consumer, final int count)
{
}
private static class EnqueueRequest
{
private final MessageReference<?> _message;
private final Action<? super BaseMessageInstance> _action;
private final MessageEnqueueRecord _enqueueRecord;
public EnqueueRequest(final ServerMessage message,
final Action<? super BaseMessageInstance> action,
final MessageEnqueueRecord enqueueRecord)
{
_enqueueRecord = enqueueRecord;
_message = message.newReference();
_action = action;
}
public MessageReference<?> getMessage()
{
return _message;
}
public Action<? super BaseMessageInstance> getAction()
{
return _action;
}
public MessageEnqueueRecord getEnqueueRecord()
{
return _enqueueRecord;
}
}
}