blob: 2f72098be7c6d0ea6f8ae14af9abb7f874f009eb [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.message;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerMessageImpl.class);
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater =
AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources");
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
private final Object _connectionReference;
@SuppressWarnings("unused")
private volatile Collection<UUID> _resources;
private volatile ServerMessage.ValidationStatus _validationStatus = ServerMessage.ValidationStatus.UNKNOWN;
private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, ServerMessage.ValidationStatus>
_validationStatusUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
ServerMessage.ValidationStatus.class,
"_validationStatus");
public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
{
_handle = handle;
_connectionReference = connectionReference;
}
@Override
public long getSize()
{
return _handle.getContentSize();
}
@Override
public long getSizeIncludingHeader()
{
return _handle.getContentSize() + _handle.getMetadataSize();
}
@Override
public StoredMessage<T> getStoredMessage()
{
return _handle;
}
private boolean incrementReference()
{
do
{
int count = _refCountUpdater.get(this);
if (count < 0)
{
return false;
}
else if (_refCountUpdater.compareAndSet(this, count, count + 1))
{
return true;
}
}
while (true);
}
/**
* Thread-safe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*
*/
private void decrementReference()
{
boolean updated;
do
{
int count = _refCountUpdater.get(this);
int newCount = count - 1;
if (newCount > 0)
{
updated = _refCountUpdater.compareAndSet(this, count, newCount);
}
else if (newCount == 0)
{
// set the reference count below 0 so that we can detect that the message has been deleted
updated = _refCountUpdater.compareAndSet(this, count, -1);
if (updated)
{
_handle.remove();
}
}
else
{
throw new ServerScopedRuntimeException("Reference count for message id " + debugIdentity()
+ " has gone below 0.");
}
}
while (!updated);
}
public String debugIdentity()
{
return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")";
}
private int getReferenceCount()
{
return _referenceCount;
}
@Override
final public MessageReference<X> newReference()
{
return new Reference(this);
}
@Override
final public MessageReference<X> newReference(TransactionLogResource object)
{
return new Reference(this, object);
}
@Override
final public boolean isReferenced(TransactionLogResource resource)
{
Collection<UUID> resources = _resources;
return resources != null && resources.contains(resource.getId());
}
@Override
final public boolean isReferenced()
{
Collection<UUID> resources = _resources;
return resources != null && !resources.isEmpty();
}
@Override
final public boolean isPersistent()
{
return _handle.getMetaData().isPersistent();
}
@Override
final public long getMessageNumber()
{
return getStoredMessage().getMessageNumber();
}
@Override
public QpidByteBuffer getContent()
{
return getContent(0, (int) getSize());
}
@Override
public QpidByteBuffer getContent(int offset, int length)
{
StoredMessage<T> storedMessage = getStoredMessage();
boolean wasInMemory = storedMessage.isInMemory();
try
{
return storedMessage.getContent(offset, length);
}
finally
{
if (!wasInMemory && checkValid())
{
storedMessage.flowToDisk();
}
}
}
@Override
final public Object getConnectionReference()
{
return _connectionReference;
}
@Override
public String toString()
{
return "Message[" + debugIdentity() + "]";
}
@Override
public ServerMessage.ValidationStatus getValidationStatus()
{
return _validationStatus;
}
@Override
public boolean checkValid()
{
ServerMessage.ValidationStatus status;
while ((status = _validationStatus) == ServerMessage.ValidationStatus.UNKNOWN)
{
ServerMessage.ValidationStatus newStatus;
try
{
validate();
newStatus = ServerMessage.ValidationStatus.VALID;
}
catch (RuntimeException e)
{
newStatus = ServerMessage.ValidationStatus.MALFORMED;
LOGGER.debug("Malformed message '{}' detected", this, e);
}
if (_validationStatusUpdater.compareAndSet(this, status, newStatus))
{
status = newStatus;
break;
}
}
return status == ServerMessage.ValidationStatus.VALID;
}
protected void validate()
{
// noop
}
private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
implements MessageReference<X>
{
private static final AtomicIntegerFieldUpdater<Reference> _releasedUpdater =
AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
private final AbstractServerMessageImpl<X, T> _message;
private final UUID _resourceId;
private volatile int _released;
private Reference(final AbstractServerMessageImpl<X, T> message) throws MessageDeletedException
{
this(message, null);
}
private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) throws MessageDeletedException
{
_message = message;
if(resource != null)
{
Collection<UUID> currentValue;
Collection<UUID> newValue;
_resourceId = resource.getId();
do
{
currentValue = _message._resources;
if(currentValue == null)
{
newValue = Collections.singleton(_resourceId);
}
else
{
if(currentValue.contains(_resourceId))
{
throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource);
}
newValue = new ArrayList<>(currentValue.size()+1);
newValue.addAll(currentValue);
newValue.add(_resourceId);
}
}
while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
}
else
{
_resourceId = null;
}
if(!_message.incrementReference())
{
throw new MessageDeletedException(message.getMessageNumber());
}
}
@Override
public X getMessage()
{
return (X) _message;
}
@Override
public synchronized void release()
{
if(_releasedUpdater.compareAndSet(this,0,1))
{
if(_resourceId != null)
{
Collection<UUID> currentValue;
Collection<UUID> newValue;
do
{
currentValue = _message._resources;
if(currentValue.size() == 1)
{
newValue = null;
}
else
{
UUID[] array = new UUID[currentValue.size()-1];
int pos = 0;
for(UUID uuid : currentValue)
{
if(!_resourceId.equals(uuid))
{
array[pos++] = uuid;
}
}
newValue = Arrays.asList(array);
}
}
while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
}
_message.decrementReference();
}
}
@Override
public void close()
{
release();
}
}
}