blob: 652e4c135d2df445de4c25021b2fcfcfbc730b33 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.util.FileUtils;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
* transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
public static final int VERSION = 8;
private static final int LOCK_RETRY_ATTEMPTS = 5;
private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
//TODO: Add upgrader to remove BRIDGES and LINKS
private static String BRIDGEDB_NAME = "BRIDGES";
private static String LINKDB_NAME = "LINKS";
private static String XID_DB_NAME = "XIDS";
private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION";
private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME , CONFIGURED_OBJECT_HIERARCHY_DB_NAME};
private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME };
private EnvironmentFacade _environmentFacade;
private final AtomicLong _messageId = new AtomicLong(0);
private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
private long _totalStoreSize;
private boolean _limitBusted;
private long _persistentSizeLowThreshold;
private long _persistentSizeHighThreshold;
private final EventManager _eventManager = new EventManager();
private final EnvironmentFacadeFactory _environmentFacadeFactory;
private volatile Committer _committer;
public BDBMessageStore()
{
this(new StandardEnvironmentFacadeFactory());
}
public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
{
_environmentFacadeFactory = environmentFacadeFactory;
}
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
}
@Override
public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
if (_configurationStoreOpen.compareAndSet(false, true))
{
if (_environmentFacade == null)
{
EnvironmentFacadeTask[] initialisationTasks = null;
if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
{
String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() };
}
else
{
initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
}
_environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
}
else
{
throw new IllegalStateException("The database have been already opened as message store");
}
}
}
@Override
public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
try
{
int configVersion = getConfigVersion();
handler.begin(configVersion);
doVisitAllConfiguredObjectRecords(handler);
int newConfigVersion = handler.end();
if(newConfigVersion != configVersion)
{
updateConfigVersion(newConfigVersion);
}
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
}
}
private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
Cursor objectsCursor = null;
Cursor hierarchyCursor = null;
try
{
objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
BDBConfiguredObjectRecord configuredObject =
(BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
configuredObjects.put(configuredObject.getId(), configuredObject);
}
// set parents
hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
if(child != null)
{
ConfiguredObjectRecord parent = configuredObjects.get(parentId);
if(parent != null)
{
child.addParent(hk.getParentType(), parent);
}
else if(hk.getParentType().equals("Exchange"))
{
// TODO - remove this hack for the pre-defined exchanges
child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
}
}
}
}
finally
{
closeCursorSafely(objectsCursor);
closeCursorSafely(hierarchyCursor);
}
for (ConfiguredObjectRecord record : configuredObjects.values())
{
boolean shoudlContinue = handler.handle(record);
if (!shoudlContinue)
{
break;
}
}
}
@Override
public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException
{
if (_messageStoreOpen.compareAndSet(false, true))
{
Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null ? -1l :
overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
_persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
if (_environmentFacade == null)
{
_environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
}
_committer = _environmentFacade.createCommitter(parent.getName());
_committer.start();
}
}
@Override
public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
{
checkMessageStoreOpen();
return new BDBTransaction();
}
@Override
public String getStoreLocation()
{
if (_environmentFacade == null)
{
return null;
}
return _environmentFacade.getStoreLocation();
}
public EnvironmentFacade getEnvironmentFacade()
{
return _environmentFacade;
}
@Override
public void closeMessageStore() throws StoreException
{
if (_messageStoreOpen.compareAndSet(true, false))
{
try
{
if (_committer != null)
{
_committer.stop();
}
}
finally
{
if (!_configurationStoreOpen.get())
{
closeEnvironment();
}
}
}
}
@Override
public void closeConfigurationStore() throws StoreException
{
if (_configurationStoreOpen.compareAndSet(true, false))
{
try
{
if (_committer != null)
{
_committer.stop();
}
}
finally
{
if (!_messageStoreOpen.get())
{
closeEnvironment();
}
}
}
}
private void closeEnvironment()
{
if (_environmentFacade != null)
{
try
{
_environmentFacade.close();
}
catch(DatabaseException e)
{
throw new StoreException("Exception occured on message store close", e);
}
}
}
@SuppressWarnings("resource")
private void updateConfigVersion(int newConfigVersion) throws StoreException
{
Transaction txn = null;
Cursor cursor = null;
try
{
txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
cursor = getConfigVersionDb().openCursor(txn, null);
DatabaseEntry key = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0,key);
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
IntegerBinding.intToEntry(newConfigVersion, value);
OperationStatus status = cursor.put(key, value);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error setting config version: " + status);
}
}
cursor.close();
cursor = null;
txn.commit();
txn = null;
}
finally
{
closeCursorSafely(cursor);
abortTransactionIgnoringException("Error setting config version", txn);;
}
}
private int getConfigVersion() throws StoreException
{
Cursor cursor = null;
try
{
cursor = getConfigVersionDb().openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
return IntegerBinding.entryToInt(value);
}
// Insert 0 as the default config version
IntegerBinding.intToEntry(0,value);
ByteBinding.byteToEntry((byte) 0,key);
OperationStatus status = getConfigVersionDb().put(null, key, value);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error initialising config version: " + status);
}
return 0;
}
finally
{
closeCursorSafely(cursor);
}
}
private void closeCursorSafely(Cursor cursor) throws StoreException
{
if (cursor != null)
{
try
{
cursor.close();
}
catch(DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Cannot close cursor", e);
}
}
}
void removeMessage(long messageId, boolean sync) throws StoreException
{
boolean complete = false;
com.sleepycat.je.Transaction tx = null;
Random rand = null;
int attempts = 0;
try
{
do
{
tx = null;
try
{
tx = _environmentFacade.getEnvironment().beginTransaction(null, null);
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Removing message id " + messageId);
}
OperationStatus status = getMessageMetaDataDb().delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
messageId);
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Deleted metadata for message " + messageId);
}
//now remove the content data from the store if there is any.
DatabaseEntry contentKeyEntry = new DatabaseEntry();
LongBinding.longToEntry(messageId, contentKeyEntry);
getMessageContentDb().delete(tx, contentKeyEntry);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Deleted content for message " + messageId);
}
_environmentFacade.commit(tx);
_committer.commit(tx, sync);
complete = true;
tx = null;
}
catch (LockConflictException e)
{
try
{
if(tx != null)
{
tx.abort();
}
}
catch(DatabaseException e2)
{
LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2);
// rethrow the original log conflict exception, the secondary exception should already have
// been logged.
throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e);
}
LOGGER.warn("Lock timeout exception. Retrying (attempt "
+ (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
if(++attempts < LOCK_RETRY_ATTEMPTS)
{
if(rand == null)
{
rand = new Random();
}
try
{
Thread.sleep(500l + (long)(500l * rand.nextDouble()));
}
catch (InterruptedException e1)
{
}
}
else
{
// rethrow the lock conflict exception since we could not solve by retrying
throw _environmentFacade.handleDatabaseException("Cannot remove messages", e);
}
}
}
while(!complete);
}
catch (DatabaseException e)
{
LOGGER.error("Unexpected BDB exception", e);
try
{
abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
}
finally
{
tx = null;
}
throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
}
finally
{
try
{
abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
}
finally
{
tx = null;
}
}
}
private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx)
{
try
{
if (tx != null)
{
tx.abort();
}
}
catch (DatabaseException e1)
{
// We need the possible side effect of the handler restarting the environment but don't care about the exception
_environmentFacade.handleDatabaseException(null, e1);
LOGGER.warn(errorMessage, e1);
}
}
@Override
public void create(ConfiguredObjectRecord configuredObject) throws StoreException
{
checkConfigurationStoreOpen();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Create " + configuredObject);
}
com.sleepycat.je.Transaction txn = null;
try
{
txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
storeConfiguredObjectEntry(txn, configuredObject);
txn.commit();
txn = null;
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
+ " in database: " + e.getMessage(), e);
}
finally
{
if (txn != null)
{
abortTransactionIgnoringException("Error creating configured object", txn);
}
}
}
@Override
public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
{
checkConfigurationStoreOpen();
com.sleepycat.je.Transaction txn = null;
try
{
txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
for(ConfiguredObjectRecord record : objects)
{
if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS)
{
removed.add(record.getId());
}
}
txn.commit();
txn = null;
return removed.toArray(new UUID[removed.size()]);
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e);
}
finally
{
if (txn != null)
{
abortTransactionIgnoringException("Error deleting configured objects", txn);
}
}
}
@Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
checkConfigurationStoreOpen();
com.sleepycat.je.Transaction txn = null;
try
{
txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
for(ConfiguredObjectRecord record : records)
{
update(createIfNecessary, record, txn);
}
txn.commit();
txn = null;
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e);
}
finally
{
if (txn != null)
{
abortTransactionIgnoringException("Error updating configuration details within the store", txn);
}
}
}
private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record);
}
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
keyBinding.objectToEntry(record.getId(), key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT);
final boolean isNewRecord = status == OperationStatus.NOTFOUND;
if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord))
{
// write the updated entry to the store
configuredObjectBinding.objectToEntry(record, newValue);
status = getConfiguredObjectsDb().put(txn, key, newValue);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error updating configuration details within the store: " + status);
}
if(isNewRecord)
{
writeHierarchyRecords(txn, record);
}
}
else if (status != OperationStatus.NOTFOUND)
{
throw new StoreException("Error finding configuration details within the store: " + status);
}
}
/**
* Places a message onto a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
* @param queue The the queue to place the message on.
* @param messageId The message to enqueue.
*
* @throws StoreException If the operation fails for any reason.
*/
private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
try
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
}
getDeliveryDb().put(tx, key, value);
}
catch (DatabaseException e)
{
LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
+ queue.getName() + " with id " + queue.getId() + " to database", e);
}
}
/**
* Extracts a message from a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
* @param queue The queue to take the message from.
* @param messageId The message to dequeue.
*
* @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
UUID id = queue.getId();
keyBinding.objectToEntry(queueEntryKey, key);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Dequeue message id " + messageId + " from queue "
+ queue.getName() + " with id " + id);
}
try
{
OperationStatus status = getDeliveryDb().delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
throw new StoreException("Unable to find message with id " + messageId + " on queue "
+ queue.getName() + " with id " + id);
}
else if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Unable to remove message with id " + messageId + " on queue"
+ queue.getName() + " with id " + id);
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Removed message " + messageId + " on queue "
+ queue.getName() + " with id " + id);
}
}
catch (DatabaseException e)
{
LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e);
throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e);
}
}
private void recordXid(com.sleepycat.je.Transaction txn,
long format,
byte[] globalId,
byte[] branchId,
org.apache.qpid.server.store.Transaction.Record[] enqueues,
org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
XidBinding keyBinding = XidBinding.getInstance();
keyBinding.objectToEntry(xid,key);
DatabaseEntry value = new DatabaseEntry();
PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
valueBinding.objectToEntry(preparedTransaction, value);
try
{
getXidDb().put(txn, key, value);
}
catch (DatabaseException e)
{
LOGGER.error("Failed to write xid: " + e.getMessage(), e);
throw _environmentFacade.handleDatabaseException("Error writing xid to database", e);
}
}
private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
XidBinding keyBinding = XidBinding.getInstance();
keyBinding.objectToEntry(xid, key);
try
{
OperationStatus status = getXidDb().delete(txn, key);
if (status == OperationStatus.NOTFOUND)
{
throw new StoreException("Unable to find xid");
}
else if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Unable to remove xid");
}
}
catch (DatabaseException e)
{
LOGGER.error("Failed to remove xid in transaction " + txn, e);
throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e);
}
}
/**
* Commits all operations performed within a given transaction.
*
* @param tx The transaction to commit all operations for.
*
* @throws StoreException If the operation fails for any reason.
*/
private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is null at commitTran");
}
_environmentFacade.commit(tx);
StoreFuture result = _committer.commit(tx, syncCommit);
if (LOGGER.isDebugEnabled())
{
String transactionType = syncCommit ? "synchronous" : "asynchronous";
LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
}
return result;
}
/**
* Abandons all operations performed within a given transaction.
*
* @param tx The transaction to abandon.
*
* @throws StoreException If the operation fails for any reason.
*/
private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("abortTran called for transaction " + tx);
}
try
{
tx.abort();
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e);
}
}
/**
* Return a valid, currently unused message id.
*
* @return A fresh message id.
*/
private long getNewMessageId()
{
return _messageId.incrementAndGet();
}
/**
* Stores a chunk of message data.
*
* @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
*
* @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
ByteBuffer contentBody) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
ContentBinding messageBinding = ContentBinding.getInstance();
messageBinding.objectToEntry(contentBody.array(), value);
try
{
OperationStatus status = getMessageContentDb().put(tx, key, value);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error adding content for message id " + messageId + ": " + status);
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
}
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
/**
* Stores message meta-data.
*
* @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param messageMetaData The message meta data to store.
*
* @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
StorableMessageMetaData messageMetaData)
throws StoreException
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("storeMetaData called for transaction " + tx
+ ", messageId " + messageId
+ ", messageMetaData " + messageMetaData);
}
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
messageBinding.objectToEntry(messageMetaData, value);
try
{
getMessageMetaDataDb().put(tx, key, value);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
}
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
}
}
/**
* Retrieves message meta-data.
*
* @param messageId The message to get the meta-data for.
*
* @return The message meta data.
*
* @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ messageId + "): called");
}
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
try
{
OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Metadata not found for message with id " + messageId);
}
StorableMessageMetaData mdd = messageBinding.entryToObject(value);
return mdd;
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
}
}
/**
* Fills the provided ByteBuffer with as much content for the specified message as possible, starting
* from the specified offset in the message.
*
* @param messageId The message to get the data for.
* @param offset The offset of the data within the message.
* @param dst The destination of the content read back
*
* @return The number of bytes inserted into the destination
*
* @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
LongBinding.longToEntry(messageId, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
ContentBinding contentTupleBinding = ContentBinding.getInstance();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
}
try
{
int written = 0;
OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
if (status == OperationStatus.SUCCESS)
{
byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
int size = dataAsBytes.length;
if (offset > size)
{
throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ " for message id " + messageId + "!");
}
written = size - offset;
if(written > dst.remaining())
{
written = dst.remaining();
}
dst.put(dataAsBytes, offset, written);
}
return written;
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
@Override
public boolean isPersistent()
{
return true;
}
@Override
@SuppressWarnings("unchecked")
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
if(metaData.isPersistent())
{
return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
}
else
{
return new StoredMemoryMessage(getNewMessageId(), metaData);
}
}
private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Storing configured object record: " + configuredObject);
}
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
uuidBinding.objectToEntry(configuredObject.getId(), key);
DatabaseEntry value = new DatabaseEntry();
ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
queueBinding.objectToEntry(configuredObject, value);
try
{
OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error writing configured object " + configuredObject + " to database: "
+ status);
}
writeHierarchyRecords(txn, configuredObject);
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
+ " to database: " + e.getMessage(), e);
}
}
private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
{
OperationStatus status;
HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance();
DatabaseEntry hierarchyKey = new DatabaseEntry();
DatabaseEntry hierarchyValue = new DatabaseEntry();
for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet())
{
hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey);
UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue);
status = getConfiguredObjectHierarchyDb().put(txn, hierarchyKey, hierarchyValue);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: "
+ status);
}
}
}
private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException
{
UUID id = record.getId();
Map<String, ConfiguredObjectRecord> parents = record.getParents();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Removing configured object: " + id);
}
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
uuidBinding.objectToEntry(id, key);
OperationStatus status = getConfiguredObjectsDb().delete(tx, key);
if(status == OperationStatus.SUCCESS)
{
for(String parentType : parents.keySet())
{
DatabaseEntry hierarchyKey = new DatabaseEntry();
HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance();
keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey);
getConfiguredObjectHierarchyDb().delete(tx, hierarchyKey);
}
}
return status;
}
private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
{
private final long _messageId;
private final boolean _isRecovered;
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
private volatile SoftReference<byte[]> _dataRef;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
{
this(messageId, metaData, false);
}
StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
{
_messageId = messageId;
_isRecovered = isRecovered;
if(!_isRecovered)
{
_metaData = metaData;
}
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@Override
public StorableMessageMetaData getMetaData()
{
StorableMessageMetaData metaData = _metaDataRef.get();
if(metaData == null)
{
checkMessageStoreOpen();
metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
return metaData;
}
@Override
public long getMessageNumber()
{
return _messageId;
}
@Override
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
src = src.slice();
if(_data == null)
{
_data = new byte[src.remaining()];
_dataRef = new SoftReference<byte[]>(_data);
src.duplicate().get(_data);
}
else
{
byte[] oldData = _data;
_data = new byte[oldData.length + src.remaining()];
_dataRef = new SoftReference<byte[]>(_data);
System.arraycopy(oldData,0,_data,0,oldData.length);
src.duplicate().get(_data, oldData.length, src.remaining());
}
}
@Override
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
if(data != null)
{
int length = Math.min(dst.remaining(), data.length - offsetInMessage);
dst.put(data, offsetInMessage, length);
return length;
}
else
{
checkMessageStoreOpen();
return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
@Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
if(data != null)
{
return ByteBuffer.wrap(data,offsetInMessage,size);
}
else
{
ByteBuffer buf = ByteBuffer.allocate(size);
int length = getContent(offsetInMessage, buf);
buf.limit(length);
buf.position(0);
return buf;
}
}
synchronized void store(com.sleepycat.je.Transaction txn)
{
if (!stored())
{
try
{
_dataRef = new SoftReference<byte[]>(_data);
BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
BDBMessageStore.this.addContent(txn, _messageId, 0,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
{
_metaData = null;
_data = null;
}
}
}
@Override
public synchronized StoreFuture flushToStore()
{
if(!stored())
{
checkMessageStoreOpen();
com.sleepycat.je.Transaction txn;
try
{
txn = _environmentFacade.getEnvironment().beginTransaction(
null, null);
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
}
store(txn);
_environmentFacade.commit(txn);
_committer.commit(txn, true);
storedSizeChangeOccured(getMetaData().getContentSize());
}
return StoreFuture.IMMEDIATE_FUTURE;
}
@Override
public void remove()
{
checkMessageStoreOpen();
int delta = getMetaData().getContentSize();
BDBMessageStore.this.removeMessage(_messageId, false);
storedSizeChangeOccured(-delta);
}
private boolean stored()
{
return _metaData == null || _isRecovered;
}
}
private class BDBTransaction implements org.apache.qpid.server.store.Transaction
{
private com.sleepycat.je.Transaction _txn;
private int _storeSizeIncrease;
private BDBTransaction() throws StoreException
{
try
{
_txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
}
catch(DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e);
}
}
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
checkMessageStoreOpen();
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
storedMessage.store(_txn);
_storeSizeIncrease += storedMessage.getMetaData().getContentSize();
}
BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
}
@Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
checkMessageStoreOpen();
BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
}
@Override
public void commitTran() throws StoreException
{
checkMessageStoreOpen();
BDBMessageStore.this.commitTranImpl(_txn, true);
BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
}
@Override
public StoreFuture commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
return BDBMessageStore.this.commitTranImpl(_txn, false);
}
@Override
public void abortTran() throws StoreException
{
checkMessageStoreOpen();
BDBMessageStore.this.abortTran(_txn);
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
{
checkMessageStoreOpen();
BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
Record[] dequeues) throws StoreException
{
checkMessageStoreOpen();
BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
}
}
private void storedSizeChangeOccured(final int delta) throws StoreException
{
try
{
storedSizeChange(delta);
}
catch(DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Stored size change exception", e);
}
}
private void storedSizeChange(final int delta)
{
if(getPersistentSizeHighThreshold() > 0)
{
synchronized (this)
{
// the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
// time, so we do so only when there's been enough change that it is worth looking again. We do this by
// assuming the total size will change by less than twice the amount of the message data change.
long newSize = _totalStoreSize += 2*delta;
if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
{
_totalStoreSize = getSizeOnDisk();
if(_totalStoreSize > getPersistentSizeHighThreshold())
{
_limitBusted = true;
_eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
}
}
else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
{
long oldSize = _totalStoreSize;
_totalStoreSize = getSizeOnDisk();
if(oldSize <= _totalStoreSize)
{
reduceSizeOnDisk();
_totalStoreSize = getSizeOnDisk();
}
if(_totalStoreSize < getPersistentSizeLowThreshold())
{
_limitBusted = false;
_eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
}
}
}
}
private void checkConfigurationStoreOpen()
{
if (!_configurationStoreOpen.get())
{
throw new IllegalStateException("Configuration store is not open");
}
}
private void checkMessageStoreOpen()
{
if (!_messageStoreOpen.get())
{
throw new IllegalStateException("Message store is not open");
}
}
private void reduceSizeOnDisk()
{
_environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
boolean cleaned = false;
while (_environmentFacade.getEnvironment().cleanLog() > 0)
{
cleaned = true;
}
if (cleaned)
{
CheckpointConfig force = new CheckpointConfig();
force.setForce(true);
_environmentFacade.getEnvironment().checkpoint(force);
}
_environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
}
private long getSizeOnDisk()
{
return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize();
}
private long getPersistentSizeLowThreshold()
{
return _persistentSizeLowThreshold;
}
private long getPersistentSizeHighThreshold()
{
return _persistentSizeHighThreshold;
}
@Override
public void onDelete()
{
String storeLocation = getStoreLocation();
if (storeLocation != null)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Deleting store " + storeLocation);
}
File location = new File(storeLocation);
if (location.exists())
{
if (!FileUtils.delete(location, true))
{
LOGGER.error("Cannot delete " + storeLocation);
}
}
}
}
private Database getConfiguredObjectsDb()
{
return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
}
private Database getConfiguredObjectHierarchyDb()
{
return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME);
}
private Database getMessageContentDb()
{
return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME);
}
private Database getConfigVersionDb()
{
return _environmentFacade.getOpenDatabase(CONFIG_VERSION_DB_NAME);
}
private Database getMessageMetaDataDb()
{
return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME);
}
private Database getDeliveryDb()
{
return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME);
}
private Database getXidDb()
{
return _environmentFacade.getOpenDatabase(XID_DB_NAME);
}
class UpgradeTask implements EnvironmentFacadeTask
{
private ConfiguredObject<?> _parent;
public UpgradeTask(ConfiguredObject<?> parent)
{
_parent = parent;
}
@Override
public void execute(EnvironmentFacade facade)
{
try
{
new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary();
}
catch(DatabaseException e)
{
throw facade.handleDatabaseException("Cannot upgrade store", e);
}
}
}
class OpenDatabasesTask implements EnvironmentFacadeTask
{
private String[] _names;
public OpenDatabasesTask(String[] names)
{
_names = names;
}
@Override
public void execute(EnvironmentFacade facade)
{
try
{
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
facade.openDatabases(dbConfig, _names);
}
catch(DatabaseException e)
{
throw facade.handleDatabaseException("Cannot open databases", e);
}
}
}
class DiskSpaceTask implements EnvironmentFacadeTask
{
@Override
public void execute(EnvironmentFacade facade)
{
try
{
_totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize();
}
catch(DatabaseException e)
{
throw facade.handleDatabaseException("Cannot evaluate disk store size", e);
}
}
}
public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
{
private long _maxId;
@Override
public void execute(EnvironmentFacade facade)
{
visitMessagesInternal(this, facade);
_messageId.set(_maxId);
}
@Override
public boolean handle(StoredMessage<?> storedMessage)
{
long id = storedMessage.getMessageNumber();
if (_maxId<id)
{
_maxId = id;
}
return true;
}
}
@Override
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
visitMessagesInternal(handler, _environmentFacade);
}
private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
{
Cursor cursor = null;
try
{
cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
if (!handler.handle(message))
{
break;
}
}
}
catch (DatabaseException e)
{
throw environmentFacade.handleDatabaseException("Cannot recover messages", e);
}
finally
{
if (cursor != null)
{
try
{
cursor.close();
}
catch(DatabaseException e)
{
throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
}
}
}
}
@Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
Cursor cursor = null;
try
{
cursor = getDeliveryDb().openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
QueueEntryKey entry = keyBinding.entryToObject(key);
UUID queueId = entry.getQueueId();
long messageId = entry.getMessageId();
if (!handler.handle(queueId, messageId))
{
break;
}
}
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
}
finally
{
closeCursorSafely(cursor);
}
}
@Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
checkMessageStoreOpen();
Cursor cursor = null;
try
{
cursor = getXidDb().openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
XidBinding keyBinding = XidBinding.getInstance();
PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
Xid xid = keyBinding.entryToObject(key);
PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()))
{
break;
}
}
}
catch (DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
}
finally
{
closeCursorSafely(cursor);
}
}
}