blob: dbad5438dc5c4fc05a0e21ebc95bdb7e3d573c9a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.log4j.Logger;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.CopyOnWriteArraySet;
public class QueueEntryImpl implements QueueEntry
{
/**
* Used for debugging purposes.
*/
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
private final SimpleQueueEntryList _queueEntryList;
private AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
private volatile EntryState _state = AVAILABLE_STATE;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
_stateUpdater =
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, EntryState.class, "_state");
private volatile Set<StateChangeListener> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
_listenersUpdater =
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, Set.class, "_stateChangeListeners");
private static final
AtomicLongFieldUpdater<QueueEntryImpl>
_entryIdUpdater =
AtomicLongFieldUpdater.newUpdater
(QueueEntryImpl.class, "_entryId");
private volatile long _entryId;
volatile QueueEntryImpl _next;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
_message = message;
_entryIdUpdater.set(this, entryId);
}
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
_queueEntryList = queueEntryList;
_message = message;
}
protected void setEntryId(long entryId)
{
_entryIdUpdater.set(this, entryId);
}
protected long getEntryId()
{
return _entryId;
}
public AMQQueue getQueue()
{
return _queueEntryList.getQueue();
}
public AMQMessage getMessage()
{
return _message;
}
public long getSize()
{
return getMessage().getSize();
}
public boolean getDeliveredToConsumer()
{
return getMessage().getDeliveredToConsumer();
}
public boolean expired() throws AMQException
{
return getMessage().expired(getQueue());
}
public boolean isAcquired()
{
return _state.getState() == State.ACQUIRED;
}
public boolean acquire()
{
return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
}
private boolean acquire(final EntryState state)
{
boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
if(acquired && _stateChangeListeners != null)
{
notifyStateChange(State.AVAILABLE, State.ACQUIRED);
}
return acquired;
}
public boolean acquire(Subscription sub)
{
return acquire(sub.getOwningState());
}
public boolean acquiredBySubscription()
{
return (_state instanceof SubscriptionAcquiredState);
}
public void setDeliveredToSubscription()
{
getMessage().setDeliveredToConsumer();
}
public void release()
{
_stateUpdater.set(this,AVAILABLE_STATE);
}
public String debugIdentity()
{
return getMessage().debugIdentity();
}
public boolean immediateAndNotDelivered()
{
return _message.immediateAndNotDelivered();
}
public void setRedelivered(boolean b)
{
getMessage().setRedelivered(b);
}
public Subscription getDeliveredSubscription()
{
EntryState state = _state;
if (state instanceof SubscriptionAcquiredState)
{
return ((SubscriptionAcquiredState) state).getSubscription();
}
else
{
return null;
}
}
public void reject()
{
reject(getDeliveredSubscription());
}
public void reject(Subscription subscription)
{
if (subscription != null)
{
if (_rejectedBy == null)
{
_rejectedBy = new HashSet<Subscription>();
}
_rejectedBy.add(subscription);
}
else
{
_log.warn("Requesting rejection by null subscriber:" + debugIdentity());
}
}
public boolean isRejectedBy(Subscription subscription)
{
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
return _rejectedBy.contains(subscription);
}
else // This messasge hasn't been rejected yet.
{
return false;
}
}
public void requeue(final StoreContext storeContext) throws AMQException
{
getQueue().requeue(storeContext, this);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
}
}
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
EntryState state = _state;
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
if (state instanceof SubscriptionAcquiredState)
{
Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
s.restoreCredit(this);
}
getQueue().dequeue(storeContext, this);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
}
}
}
private void notifyStateChange(final State oldState, final State newState)
{
for(StateChangeListener l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
}
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
if(delete())
{
getMessage().decrementReference(storeContext);
}
}
public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
{
//if the queue is null then the message is waiting to be acked, but has been removed.
if (getQueue() != null)
{
dequeue(storeContext);
}
dispose(storeContext);
}
public boolean isQueueDeleted()
{
return getQueue().isDeleted();
}
public void addStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
if(listeners == null)
{
_listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
public boolean removeStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
}
return false;
}
public int compareTo(final QueueEntry o)
{
QueueEntryImpl other = (QueueEntryImpl)o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
public QueueEntryImpl getNext()
{
QueueEntryImpl next = nextNode();
while(next != null && next.isDeleted())
{
final QueueEntryImpl newNext = next.nextNode();
if(newNext != null)
{
SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
next = nextNode();
}
else
{
next = null;
}
}
return next;
}
QueueEntryImpl nextNode()
{
return _next;
}
public boolean isDeleted()
{
return _state == DELETED_STATE;
}
public boolean delete()
{
EntryState state = _state;
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
_queueEntryList.advanceHead();
return true;
}
else
{
return false;
}
}
public QueueEntryList getQueueEntryList()
{
return _queueEntryList;
}
}