blob: 46baab8c8505620df540b23786da8fd13c34021e [file] [log] [blame]
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
public class SimpleQueueEntryList implements QueueEntryList
{
private final QueueEntryImpl _head;
private volatile QueueEntryImpl _tail;
static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl>
_tailUpdater =
AtomicReferenceFieldUpdater.newUpdater
(SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail");
private final AMQQueue _queue;
static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl>
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
public SimpleQueueEntryList(AMQQueue queue)
{
_queue = queue;
_head = new QueueEntryImpl(this);
_tail = _head;
}
void advanceHead()
{
QueueEntryImpl next = _head.nextNode();
QueueEntryImpl newNext = _head.getNext();
if (next == newNext)
{
if (_scavenges.incrementAndGet() > _scavengeCount)
{
_scavenges.set(0L);
scavenge();
}
}
}
void scavenge()
{
QueueEntryImpl next = _head.getNext();
while (next != null)
{
next = next.getNext();
}
}
public AMQQueue getQueue()
{
return _queue;
}
public QueueEntry add(ServerMessage message)
{
QueueEntryImpl node = createQueueEntry(message);
for (;;)
{
QueueEntryImpl tail = _tail;
QueueEntryImpl next = tail.nextNode();
if (tail == _tail)
{
if (next == null)
{
node.setEntryId(tail.getEntryId()+1);
if (_nextUpdater.compareAndSet(tail, null, node))
{
_tailUpdater.compareAndSet(this, tail, node);
return node;
}
}
else
{
_tailUpdater.compareAndSet(this,tail, next);
}
}
}
}
protected QueueEntryImpl createQueueEntry(ServerMessage message)
{
return new QueueEntryImpl(this, message);
}
public QueueEntry next(QueueEntry node)
{
return ((QueueEntryImpl)node).getNext();
}
public static class QueueEntryIteratorImpl implements QueueEntryIterator
{
private QueueEntryImpl _lastNode;
QueueEntryIteratorImpl(QueueEntryImpl startNode)
{
_lastNode = startNode;
}
public boolean atTail()
{
return _lastNode.nextNode() == null;
}
public QueueEntry getNode()
{
return _lastNode;
}
public boolean advance()
{
if(!atTail())
{
QueueEntryImpl nextNode = _lastNode.nextNode();
while(nextNode.isDispensed() && nextNode.nextNode() != null)
{
nextNode = nextNode.nextNode();
}
_lastNode = nextNode;
return true;
}
else
{
return false;
}
}
}
public QueueEntryIterator iterator()
{
return new QueueEntryIteratorImpl(_head);
}
public QueueEntry getHead()
{
return _head;
}
static class Factory implements QueueEntryListFactory
{
public QueueEntryList createQueueEntryList(AMQQueue queue)
{
return new SimpleQueueEntryList(queue);
}
}
}