blob: 657be1259267859d75aa81b85bd17f30d604f2de [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.Xid;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class MemoryMessageStore implements MessageStore
{
public static final String TYPE = "Memory";
private final AtomicLong _messageId = new AtomicLong(1);
private final ConcurrentMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
private final Object _transactionLock = new Object();
private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
private final AtomicLong _inMemorySize = new AtomicLong();
private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final class MemoryMessageStoreTransaction implements Transaction
{
private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
@Override
public <X> ListenableFuture<X> commitTranAsync(final X val)
{
commitTran();
return Futures.immediateFuture(val);
}
@Override
public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
if (messageIds == null)
{
messageIds = new HashSet<Long>();
_localEnqueueMap.put(queue.getId(), messageIds);
}
messageIds.add(message.getMessageNumber());
return new MemoryEnqueueRecord(queue.getId(), message.getMessageNumber());
}
@Override
public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
{
dequeueMessage(enqueueRecord.getQueueId(), enqueueRecord.getMessageNumber());
}
private void dequeueMessage(final UUID queueId, final long messageNumber)
{
Set<Long> messageIds = _localDequeueMap.get(queueId);
if (messageIds == null)
{
messageIds = new HashSet<Long>();
_localDequeueMap.put(queueId, messageIds);
}
messageIds.add(messageNumber);
}
@Override
public void commitTran()
{
commitTransactionInternal(this);
_localEnqueueMap.clear();
_localDequeueMap.clear();
}
@Override
public void abortTran()
{
_localEnqueueMap.clear();
_localDequeueMap.clear();
}
@Override
public void removeXid(final StoredXidRecord record)
{
_localDistributedTransactionsRemoves.add(new Xid(record.getFormat(),
record.getGlobalId(),
record.getBranchId()));
}
@Override
public StoredXidRecord recordXid(final long format,
final byte[] globalId,
final byte[] branchId,
EnqueueRecord[] enqueues,
DequeueRecord[] dequeues)
{
_localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
return new MemoryStoredXidRecord(format, globalId, branchId);
}
}
private static class MemoryStoredXidRecord implements Transaction.StoredXidRecord
{
private final long _format;
private final byte[] _globalId;
private final byte[] _branchId;
public MemoryStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId)
{
_format = format;
_globalId = globalId;
_branchId = branchId;
}
@Override
public long getFormat()
{
return _format;
}
@Override
public byte[] getGlobalId()
{
return _globalId;
}
@Override
public byte[] getBranchId()
{
return _branchId;
}
@Override
public boolean equals(final Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}
final MemoryStoredXidRecord that = (MemoryStoredXidRecord) o;
return _format == that._format
&& Arrays.equals(_globalId, that._globalId)
&& Arrays.equals(_branchId, that._branchId);
}
@Override
public int hashCode()
{
int result = (int) (_format ^ (_format >>> 32));
result = 31 * result + Arrays.hashCode(_globalId);
result = 31 * result + Arrays.hashCode(_branchId);
return result;
}
}
private static final class DistributedTransactionRecords
{
private Transaction.EnqueueRecord[] _enqueues;
private Transaction.DequeueRecord[] _dequeues;
public DistributedTransactionRecords(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues)
{
super();
_enqueues = enqueues;
_dequeues = dequeues;
}
public Transaction.EnqueueRecord[] getEnqueues()
{
return _enqueues;
}
public Transaction.DequeueRecord[] getDequeues()
{
return _dequeues;
}
}
private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
{
synchronized (_transactionLock )
{
for (Map.Entry<UUID, Set<Long>> localEnqueuedEntry : transaction._localEnqueueMap.entrySet())
{
Set<Long> messageIds = _messageInstances.get(localEnqueuedEntry.getKey());
if (messageIds == null)
{
messageIds = new HashSet<Long>();
_messageInstances.put(localEnqueuedEntry.getKey(), messageIds);
}
messageIds.addAll(localEnqueuedEntry.getValue());
}
for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
{
Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
if (messageIds != null)
{
messageIds.removeAll(loacalDequeueEntry.getValue());
if (messageIds.isEmpty())
{
_messageInstances.remove(loacalDequeueEntry.getKey());
}
}
}
for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
{
_distributedTransactions.put(entry.getKey(), entry.getValue());
}
for (Xid removed : transaction._localDistributedTransactionsRemoves)
{
_distributedTransactions.remove(removed);
}
}
}
@Override
public void openMessageStore(final ConfiguredObject<?> parent)
{
}
@Override
public void upgradeStoreStructure() throws StoreException
{
}
@Override
public void addMessageDeleteListener(final MessageDeleteListener listener)
{
_messageDeleteListeners.add(listener);
}
@Override
public void removeMessageDeleteListener(final MessageDeleteListener listener)
{
_messageDeleteListeners.remove(listener);
}
@Override
public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(final T metaData)
{
long id = getNextMessageId();
StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData)
{
@Override
public synchronized StoredMessage<T> allContentAdded()
{
final StoredMessage<T> storedMessage = super.allContentAdded();
_inMemorySize.addAndGet(getContentSize());
return storedMessage;
}
@Override
public void remove()
{
_messages.remove(getMessageNumber());
if (!_messageDeleteListeners.isEmpty())
{
for (final MessageDeleteListener messageDeleteListener : _messageDeleteListeners)
{
messageDeleteListener.messageDeleted(this);
}
}
int bytesCleared = metaData.getStorableSize() + metaData.getContentSize();
super.remove();
_inMemorySize.addAndGet(-bytesCleared);
}
};
_messages.put(storedMemoryMessage.getMessageNumber(), storedMemoryMessage);
_inMemorySize.addAndGet(metaData.getStorableSize());
return storedMemoryMessage;
}
@Override
public long getNextMessageId()
{
return _messageId.getAndIncrement();
}
@Override
public boolean isPersistent()
{
return Boolean.parseBoolean(System.getProperty("qpid.tests.mms.messagestore.persistence", "false"));
}
@Override
public long getInMemorySize()
{
return _inMemorySize.get();
}
@Override
public long getBytesEvacuatedFromMemory()
{
return 0L;
}
@Override
public Transaction newTransaction()
{
return new MemoryMessageStoreTransaction();
}
@Override
public void closeMessageStore()
{
for (StoredMemoryMessage storedMemoryMessage : _messages.values())
{
storedMemoryMessage.clear();
}
_messages.clear();
_inMemorySize.set(0);
synchronized (_transactionLock)
{
_messageInstances.clear();
_distributedTransactions.clear();
}
}
@Override
public void addEventListener(final EventListener eventListener, final Event... events)
{
}
@Override
public String getStoreLocation()
{
return null;
}
@Override
public File getStoreLocationAsFile()
{
return null;
}
@Override
public void onDelete(ConfiguredObject<?> parent)
{
}
@Override
public MessageStoreReader newMessageStoreReader()
{
return new MemoryMessageStoreReader();
}
private static class MemoryEnqueueRecord implements MessageEnqueueRecord
{
private final UUID _queueId;
private final long _messageNumber;
public MemoryEnqueueRecord(final UUID queueId,
final long messageNumber)
{
_queueId = queueId;
_messageNumber = messageNumber;
}
@Override
public UUID getQueueId()
{
return _queueId;
}
@Override
public long getMessageNumber()
{
return _messageNumber;
}
}
private class MemoryMessageStoreReader implements MessageStoreReader
{
@Override
public StoredMessage<?> getMessage(final long messageId)
{
return _messages.get(messageId);
}
@Override
public void close()
{
}
@Override
public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
{
synchronized (_transactionLock)
{
for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
{
UUID resourceId = enqueuedEntry.getKey();
for (Long messageId : enqueuedEntry.getValue())
{
if (!handler.handle(new MemoryEnqueueRecord(resourceId, messageId)))
{
return;
}
}
}
}
}
@Override
public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
{
synchronized (_transactionLock)
{
Set<Long> ids = _messageInstances.get(queue.getId());
if(ids != null)
{
for (long id : ids)
{
if (!handler.handle(new MemoryEnqueueRecord(queue.getId(), id)))
{
return;
}
}
}
}
}
@Override
public void visitMessages(final MessageHandler handler) throws StoreException
{
for (StoredMemoryMessage message : _messages.values())
{
if (!handler.handle(message))
{
break;
}
}
}
@Override
public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
{
synchronized (_transactionLock)
{
for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
{
Xid xid = entry.getKey();
DistributedTransactionRecords records = entry.getValue();
if (!handler.handle(new MemoryStoredXidRecord(xid.getFormat(),
xid.getGlobalId(),
xid.getBranchId()),
records.getEnqueues(),
records.getDequeues()))
{
break;
}
}
}
}
}
}