blob: f900159808b5a6e24841251b223555f468ff23ac [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
* transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
@SuppressWarnings({"unchecked"})
public class BDBMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
static final int DATABASE_FORMAT_VERSION = 5;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private Environment _environment;
private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
private String MESSAGECONTENTDB_NAME = "messageContentDb";
private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
private String DELIVERYDB_NAME = "deliveryDb";
private String EXCHANGEDB_NAME = "exchangeDb";
private String QUEUEDB_NAME = "queueDb";
private Database _messageMetaDataDb;
private Database _messageContentDb;
private Database _queueBindingsDb;
private Database _deliveryDb;
private Database _exchangeDb;
private Database _queueDb;
/* =======
* Schema:
* =======
*
* Queue:
* name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
* arguments(FieldTable encoded as binary), exclusive (boolean)
*
* Exchange:
* name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean)
*
* Binding:
* exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString),
* arguments (FieldTable encoded as binary) - 0 (zero)
*
* QueueEntry:
* queueName(AMQShortString), messageId (long) - 0 (zero)
*
* Message (MetaData):
* messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary)
*
* Message (Content):
* messageId (long), byteOffset (integer) - dataLength(integer), data(binary);
*/
private LogSubject _logSubject;
private final AtomicLong _messageId = new AtomicLong(0);
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
// Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore
private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
/** The data version this store should run with */
private int _version;
private enum State
{
INITIAL,
CONFIGURING,
CONFIGURED,
RECOVERING,
STARTED,
CLOSING,
CLOSED
}
private State _state = State.INITIAL;
private TransactionConfig _transactionConfig = new TransactionConfig();
private boolean _readOnly = false;
private boolean _configured;
public BDBMessageStore()
{
this(DATABASE_FORMAT_VERSION);
}
public BDBMessageStore(int version)
{
_version = version;
}
private void setDatabaseNames(int version)
{
if (version > 1)
{
MESSAGEMETADATADB_NAME += "_v" + version;
MESSAGECONTENTDB_NAME += "_v" + version;
QUEUEDB_NAME += "_v" + version;
DELIVERYDB_NAME += "_v" + version;
EXCHANGEDB_NAME += "_v" + version;
QUEUEBINDINGSDB_NAME += "_v" + version;
}
}
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
_logSubject = logSubject;
CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
if(_configured)
{
throw new Exception("ConfigStore already configured");
}
configure(name,storeConfiguration);
_configured = true;
stateTransition(State.CONFIGURING, State.CONFIGURED);
recover(recoveryHandler);
stateTransition(State.RECOVERING, State.STARTED);
}
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
throw new Exception("ConfigStore not configured");
}
recoverMessages(recoveryHandler);
}
public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
throw new Exception("ConfigStore not configured");
}
recoverQueueEntries(recoveryHandler);
}
public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
{
return new BDBTransaction();
}
/**
* Called after instantiation in order to configure the message store.
*
* @param name The name of the virtual host using this store
* @return whether a new store environment was created or not (to indicate whether recovery is necessary)
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
public boolean configure(String name, Configuration storeConfig) throws Exception
{
File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
System.getProperty("QPID_WORK") + "/bdbstore/" + name));
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
{
throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ "Ensure the path is correct and that the permissions are correct.");
}
}
CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
_version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
return configure(environmentPath, false);
}
/**
* @param environmentPath location for the store to be created in/recovered from
* @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
* @return whether or not a new store environment was created
* @throws AMQStoreException
* @throws DatabaseException
*/
protected boolean configure(File environmentPath, boolean readonly) throws AMQStoreException, DatabaseException
{
_readOnly = readonly;
stateTransition(State.INITIAL, State.CONFIGURING);
_log.info("Configuring BDB message store");
createTupleBindingFactories(_version);
setDatabaseNames(_version);
return setupStore(environmentPath, readonly);
}
private void createTupleBindingFactories(int version)
{
_bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
_queueTupleBindingFactory = new QueueTupleBindingFactory(version);
_metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version);
}
/**
* Move the store state from CONFIGURING to STARTED.
*
* This is required if you do not want to perform recovery of the store data
*
* @throws AMQStoreException if the store is not in the correct state
*/
public void start() throws AMQStoreException
{
stateTransition(State.CONFIGURING, State.STARTED);
}
private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException
{
checkState(State.CONFIGURING);
boolean newEnvironment = createEnvironment(storePath, readonly);
verifyVersionByTables();
openDatabases(readonly);
if (!readonly)
{
_commitThread.start();
}
return newEnvironment;
}
private void verifyVersionByTables() throws DatabaseException
{
for (String s : _environment.getDatabaseNames())
{
int versionIndex = s.indexOf("_v");
// lack of _v index suggests DB is v1
// so if _version is not v1 then error
if (versionIndex == -1)
{
if (_version != 1)
{
closeEnvironment();
throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
+ ". Store on disk contains version 1 data.");
}
else // DB is v1 and _version is v1
{
continue;
}
}
// Otherwise Check Versions
int version = Integer.parseInt(s.substring(versionIndex + 2));
if (version != _version)
{
closeEnvironment();
throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
+ ". Store on disk contains version " + version + " data.");
}
}
}
private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
{
if (_state != requiredState)
{
throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ "; currently in state: " + _state);
}
_state = newState;
}
private void checkState(State requiredState) throws AMQStoreException
{
if (_state != requiredState)
{
throw new AMQStoreException("Unexpected state: " + _state + "; required state: " + requiredState);
}
}
private boolean createEnvironment(File environmentPath, boolean readonly) throws DatabaseException
{
_log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
EnvironmentConfig envConfig = new EnvironmentConfig();
// This is what allows the creation of the store if it does not already exist.
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setConfigParam("je.lock.nLockTables", "7");
// Restore 500,000 default timeout.
//envConfig.setLockTimeout(15000);
// Added to help diagnosis of Deadlock issue
// http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
if (Boolean.getBoolean("qpid.bdb.lock.debug"))
{
envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
envConfig.setConfigParam("je.txn.dumpLocks", "true");
}
// Set transaction mode
_transactionConfig.setReadCommitted(true);
//This prevents background threads running which will potentially update the store.
envConfig.setReadOnly(readonly);
try
{
_environment = new Environment(environmentPath, envConfig);
return false;
}
catch (DatabaseException de)
{
if (de.getMessage().contains("Environment.setAllowCreate is false"))
{
//Allow the creation this time
envConfig.setAllowCreate(true);
if (_environment != null )
{
_environment.cleanLog();
_environment.close();
}
_environment = new Environment(environmentPath, envConfig);
return true;
}
else
{
throw de;
}
}
}
private void openDatabases(boolean readonly) throws DatabaseException
{
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
//This is required if we are wanting read only access.
dbConfig.setReadOnly(readonly);
_messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig);
_queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig);
_exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig);
_queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig);
_messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig);
_deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig);
}
/**
* Called to close and cleanup any resources used by the message store.
*
* @throws Exception If the close fails.
*/
public void close() throws Exception
{
if (_state != State.STARTED)
{
return;
}
_state = State.CLOSING;
_commitThread.close();
_commitThread.join();
if (_messageMetaDataDb != null)
{
_log.info("Closing message metadata database");
_messageMetaDataDb.close();
}
if (_messageContentDb != null)
{
_log.info("Closing message content database");
_messageContentDb.close();
}
if (_exchangeDb != null)
{
_log.info("Closing exchange database");
_exchangeDb.close();
}
if (_queueBindingsDb != null)
{
_log.info("Closing bindings database");
_queueBindingsDb.close();
}
if (_queueDb != null)
{
_log.info("Closing queue database");
_queueDb.close();
}
if (_deliveryDb != null)
{
_log.info("Close delivery database");
_deliveryDb.close();
}
closeEnvironment();
_state = State.CLOSED;
CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
}
private void closeEnvironment() throws DatabaseException
{
if (_environment != null)
{
if(!_readOnly)
{
// Clean the log before closing. This makes sure it doesn't contain
// redundant data. Closing without doing this means the cleaner may not
// get a chance to finish.
_environment.cleanLog();
}
_environment.close();
}
}
public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException
{
stateTransition(State.CONFIGURED, State.RECOVERING);
CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
try
{
QueueRecoveryHandler qrh = recoveryHandler.begin(this);
loadQueues(qrh);
ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
loadExchanges(erh);
BindingRecoveryHandler brh = erh.completeExchangeRecovery();
recoverBindings(brh);
brh.completeBindingRecovery();
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
}
}
private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
{
Cursor cursor = null;
try
{
cursor = _queueDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
TupleBinding binding = _queueTupleBindingFactory.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
String queueName = queueRecord.getNameShortString() == null ? null :
queueRecord.getNameShortString().asString();
String owner = queueRecord.getOwner() == null ? null :
queueRecord.getOwner().asString();
boolean exclusive = queueRecord.isExclusive();
FieldTable arguments = queueRecord.getArguments();
qrh.queue(queueName, owner, exclusive, arguments);
}
}
finally
{
if (cursor != null)
{
cursor.close();
}
}
}
private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException
{
Cursor cursor = null;
try
{
cursor = _exchangeDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
TupleBinding binding = new ExchangeTB();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
String exchangeName = exchangeRec.getNameShortString() == null ? null :
exchangeRec.getNameShortString().asString();
String type = exchangeRec.getType() == null ? null :
exchangeRec.getType().asString();
boolean autoDelete = exchangeRec.isAutoDelete();
erh.exchange(exchangeName, type, autoDelete);
}
}
finally
{
if (cursor != null)
{
cursor.close();
}
}
}
private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
{
Cursor cursor = null;
try
{
cursor = _queueBindingsDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
TupleBinding binding = _bindingTupleBindingFactory.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
//yes, this is retrieving all the useful information from the key only.
//For table compatibility it shall currently be left as is
BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
String exchangeName = bindingRecord.getExchangeName() == null ? null :
bindingRecord.getExchangeName().asString();
String queueName = bindingRecord.getQueueName() == null ? null :
bindingRecord.getQueueName().asString();
String routingKey = bindingRecord.getRoutingKey() == null ? null :
bindingRecord.getRoutingKey().asString();
ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
brh.binding(exchangeName, queueName, routingKey, argumentsBB);
}
}
finally
{
if (cursor != null)
{
cursor.close();
}
}
}
private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
{
StoredMessageRecoveryHandler mrh = msrh.begin();
Cursor cursor = null;
try
{
cursor = _messageMetaDataDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
long maxId = 0;
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
long messageId = (Long) keyBinding.entryToObject(key);
StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
mrh.message(message);
maxId = Math.max(maxId, messageId);
}
_messageId.set(maxId);
}
catch (DatabaseException e)
{
_log.error("Database Error: " + e.getMessage(), e);
throw e;
}
finally
{
if (cursor != null)
{
cursor.close();
}
}
}
private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
throws DatabaseException
{
QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
Cursor cursor = null;
try
{
cursor = _deliveryDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key);
entries.add(qek);
}
try
{
cursor.close();
}
finally
{
cursor = null;
}
for(QueueEntryKey entry : entries)
{
AMQShortString queueName = entry.getQueueName();
long messageId = entry.getMessageId();
qerh.queueEntry(queueName.asString(),messageId);
}
}
catch (DatabaseException e)
{
_log.error("Database Error: " + e.getMessage(), e);
throw e;
}
finally
{
if (cursor != null)
{
cursor.close();
}
}
qerh.completeQueueEntryRecovery();
}
/**
* Removes the specified message from the store.
*
* @param messageId Identifies the message to remove.
*
* @throws AMQInternalException If the operation fails for any reason.
*/
public void removeMessage(Long messageId) throws AMQStoreException
{
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
com.sleepycat.je.Transaction tx = null;
Cursor cursor = null;
try
{
tx = _environment.beginTransaction(null, null);
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
metaKeyBindingTuple.objectToEntry(messageId, key);
if (_log.isDebugEnabled())
{
_log.debug("Removing message id " + messageId);
}
OperationStatus status = _messageMetaDataDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
tx.abort();
throw new AMQStoreException("Message metadata not found for message id " + messageId);
}
if (_log.isDebugEnabled())
{
_log.debug("Deleted metadata for message " + messageId);
}
//now remove the content data from the store if there is any.
DatabaseEntry contentKeyEntry = new DatabaseEntry();
MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
//Use a partial record for the value to prevent retrieving the
//data itself as we only need the key to identify what to remove.
DatabaseEntry value = new DatabaseEntry();
value.setPartial(0, 0, true);
cursor = _messageContentDb.openCursor(tx, null);
status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
while (status == OperationStatus.SUCCESS)
{
mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
if(mck.getMessageId() != messageId)
{
//we have exhausted all chunks for this message id, break
break;
}
else
{
status = cursor.delete();
if(status == OperationStatus.NOTFOUND)
{
cursor.close();
cursor = null;
tx.abort();
throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
}
if (_log.isDebugEnabled())
{
_log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
}
}
status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
cursor.close();
cursor = null;
commit(tx, true);
}
catch (DatabaseException e)
{
e.printStackTrace();
if (tx != null)
{
try
{
if(cursor != null)
{
cursor.close();
cursor = null;
}
tx.abort();
}
catch (DatabaseException e1)
{
throw new AMQStoreException("Error aborting transaction " + e1, e1);
}
}
throw new AMQStoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
}
finally
{
if(cursor != null)
{
try
{
cursor.close();
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e);
}
}
}
}
/**
* @see DurableConfigurationStore#createExchange(Exchange)
*/
public void createExchange(Exchange exchange) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
exchange.getTypeShortString(), exchange.isAutoDelete());
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(exchange.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding exchangeBinding = new ExchangeTB();
exchangeBinding.objectToEntry(exchangeRec, value);
try
{
_exchangeDb.put(null, key, value);
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e);
}
}
}
/**
* @see DurableConfigurationStore#removeExchange(Exchange)
*/
public void removeExchange(Exchange exchange) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(exchange.getNameShortString(), key);
try
{
OperationStatus status = _exchangeDb.delete(null, key);
if (status == OperationStatus.NOTFOUND)
{
throw new AMQStoreException("Exchange " + exchange.getName() + " not found");
}
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e);
}
}
/**
* @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
*/
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
// _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey
// + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called");
if (_state != State.RECOVERING)
{
BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
queue.getNameShortString(), routingKey, args);
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
keyBinding.objectToEntry(bindingRecord, key);
//yes, this is writing out 0 as a value and putting all the
//useful info into the key, don't ask me why. For table
//compatibility it shall currently be left as is
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
try
{
_queueBindingsDb.put(null, key, value);
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
+ exchange.getName() + " to database: " + e.getMessage(), e);
}
}
}
/**
* @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
*/
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
try
{
OperationStatus status = _queueBindingsDb.delete(null, key);
if (status == OperationStatus.NOTFOUND)
{
throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange "
+ exchange.getName() + " not found");
}
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
+ exchange.getName() + " from database: " + e.getMessage(), e);
}
}
/**
* @see DurableConfigurationStore#createQueue(AMQQueue)
*/
public void createQueue(AMQQueue queue) throws AMQStoreException
{
createQueue(queue, null);
}
/**
* @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
*/
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
_log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
}
QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
queue.getOwner(), queue.isExclusive(), arguments);
createQueue(queueRecord);
}
/**
* Makes the specified queue persistent.
*
* Only intended for direct use during store upgrades.
*
* @param queueRecord Details of the queue to store.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
protected void createQueue(QueueRecord queueRecord) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
queueBinding.objectToEntry(queueRecord, value);
try
{
_queueDb.put(null, key, value);
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString()
+ " to database: " + e.getMessage(), e);
}
}
}
/**
* Updates the specified queue in the persistent store, IF it is already present. If the queue
* is not present in the store, it will not be added.
*
* NOTE: Currently only updates the exclusivity.
*
* @param queue The queue to update the entry for.
* @throws AMQStoreException If the operation fails for any reason.
*/
public void updateQueue(final AMQQueue queue) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
_log.debug("Updating queue: " + queue.getName());
}
try
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(queue.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
if(status == OperationStatus.SUCCESS)
{
//read the existing record and apply the new exclusivity setting
QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value);
queueRecord.setExclusive(queue.isExclusive());
//write the updated entry to the store
queueBinding.objectToEntry(queueRecord, newValue);
_queueDb.put(null, key, newValue);
}
else if(status != OperationStatus.NOTFOUND)
{
throw new AMQStoreException("Error updating queue details within the store: " + status);
}
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error updating queue details within the store: " + e,e);
}
}
/**
* Removes the specified queue from the persistent store.
*
* @param queue The queue to remove.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
AMQShortString name = queue.getNameShortString();
if (_log.isDebugEnabled())
{
_log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
}
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(name, key);
try
{
OperationStatus status = _queueDb.delete(null, key);
if (status == OperationStatus.NOTFOUND)
{
throw new AMQStoreException("Queue " + name + " not found");
}
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e);
}
}
/**
* Places a message onto a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
* @param queue The the queue to place the message on.
* @param messageId The message to enqueue.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
// _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
AMQShortString name = new AMQShortString(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
try
{
if (_log.isDebugEnabled())
{
_log.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]");
}
_deliveryDb.put(tx, key, value);
}
catch (DatabaseException e)
{
_log.error("Failed to enqueue: " + e.getMessage(), e);
throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ " to database", e);
}
}
/**
* Extracts a message from a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
* @param queue The name queue to take the message from.
* @param messageId The message to dequeue.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
AMQShortString name = new AMQShortString(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
if (_log.isDebugEnabled())
{
_log.debug("Dequeue message id " + messageId);
}
try
{
OperationStatus status = _deliveryDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
}
else if (status != OperationStatus.SUCCESS)
{
throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name);
}
if (_log.isDebugEnabled())
{
_log.debug("Removed message " + messageId + ", " + name + " from delivery db");
}
}
catch (DatabaseException e)
{
_log.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e);
_log.error(tx);
throw new AMQStoreException("Error accessing database while dequeuing message: " + e.getMessage(), e);
}
}
/**
* Commits all operations performed within a given transaction.
*
* @param tx The transaction to commit all operations for.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
{
//if (_log.isDebugEnabled())
//{
// _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
//}
if (tx == null)
{
throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
}
StoreFuture result;
try
{
result = commit(tx, syncCommit);
if (_log.isDebugEnabled())
{
_log.debug("commitTranImpl completed for [Transaction:" + tx + "]");
}
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
}
return result;
}
/**
* Abandons all operations performed within a given transaction.
*
* @param tx The transaction to abandon.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
_log.debug("abortTran called for [Transaction:" + tx + "]");
}
try
{
tx.abort();
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
}
}
/**
* Primarily for testing purposes.
*
* @param queueName
*
* @return a list of message ids for messages enqueued for a particular queue
*/
List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException
{
Cursor cursor = null;
try
{
cursor = _deliveryDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
QueueEntryKey dd = new QueueEntryKey(queueName, 0);
EntryBinding keyBinding = new QueueEntryTB();
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
LinkedList<Long> messageIds = new LinkedList<Long>();
OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
dd = (QueueEntryKey) keyBinding.entryToObject(key);
while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
{
messageIds.add(dd.getMessageId());
status = cursor.getNext(key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
dd = (QueueEntryKey) keyBinding.entryToObject(key);
}
}
return messageIds;
}
catch (DatabaseException e)
{
throw new AMQStoreException("Database error: " + e.getMessage(), e);
}
finally
{
if (cursor != null)
{
try
{
cursor.close();
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
}
}
}
}
/**
* Return a valid, currently unused message id.
*
* @return A fresh message id.
*/
public Long getNewMessageId()
{
return _messageId.incrementAndGet();
}
/**
* Stores a chunk of message data.
*
* @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset,
ByteBuffer contentBody) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
TupleBinding<MessageContentKey> keyBinding = new MessageContentKeyTB_5();
keyBinding.objectToEntry(new MessageContentKey_5(messageId, offset), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding<ByteBuffer> messageBinding = new ContentTB();
messageBinding.objectToEntry(contentBody, value);
try
{
OperationStatus status = _messageContentDb.put(tx, key, value);
if (status != OperationStatus.SUCCESS)
{
throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
+ status);
}
if (_log.isDebugEnabled())
{
_log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]");
}
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
/**
* Stores message meta-data.
*
* @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param messageMetaData The message meta data to store.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
throws AMQStoreException
{
if (_log.isDebugEnabled())
{
_log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
+ messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
}
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
messageBinding.objectToEntry(messageMetaData, value);
try
{
_messageMetaDataDb.put(tx, key, value);
if (_log.isDebugEnabled())
{
_log.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
}
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
}
}
/**
* Retrieves message meta-data.
*
* @param messageId The message to get the meta-data for.
*
* @return The message meta data.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
_log.debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ messageId + "): called");
}
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
try
{
OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
if (status != OperationStatus.SUCCESS)
{
throw new AMQStoreException("Metadata not found for message with id " + messageId);
}
StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
return mdd;
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
}
}
/**
* Fills the provided ByteBuffer with as much content for the specified message as possible, starting
* from the specified offset in the message.
*
* @param messageId The message to get the data for.
* @param offset The offset of the data within the message.
* @param dst The destination of the content read back
*
* @return The number of bytes inserted into the destination
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
//Start from 0 offset and search for the starting chunk.
MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0);
TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB();
if (_log.isDebugEnabled())
{
_log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
}
int written = 0;
int seenSoFar = 0;
Cursor cursor = null;
try
{
cursor = _messageContentDb.openCursor(null, null);
OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
while (status == OperationStatus.SUCCESS)
{
mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
long id = mck.getMessageId();
if(id != messageId)
{
//we have exhausted all chunks for this message id, break
break;
}
int offsetInMessage = mck.getOffset();
ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
final int size = (int) buf.limit();
seenSoFar += size;
if(seenSoFar >= offset)
{
byte[] dataAsBytes = buf.array();
int posInArray = offset + written - offsetInMessage;
int count = size - posInArray;
if(count > dst.remaining())
{
count = dst.remaining();
}
dst.put(dataAsBytes,posInArray,count);
written+=count;
if(dst.remaining() == 0)
{
break;
}
}
status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
return written;
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
finally
{
if(cursor != null)
{
try
{
cursor.close();
}
catch (DatabaseException e)
{
throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
}
}
public boolean isPersistent()
{
return true;
}
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
if(metaData.isPersistent())
{
return new StoredBDBMessage(getNewMessageId(), metaData);
}
else
{
return new StoredMemoryMessage(getNewMessageId(), metaData);
}
}
//protected getters for the TupleBindingFactories
protected QueueTupleBindingFactory getQueueTupleBindingFactory()
{
return _queueTupleBindingFactory;
}
protected BindingTupleBindingFactory getBindingTupleBindingFactory()
{
return _bindingTupleBindingFactory;
}
protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
{
return _metaDataTupleBindingFactory;
}
//Package getters for the various databases used by the Store
Database getMetaDataDb()
{
return _messageMetaDataDb;
}
Database getContentDb()
{
return _messageContentDb;
}
Database getQueuesDb()
{
return _queueDb;
}
Database getDeliveryDb()
{
return _deliveryDb;
}
Database getExchangesDb()
{
return _exchangeDb;
}
Database getBindingsDb()
{
return _queueBindingsDb;
}
void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_messageMetaDataDb, visitor);
}
void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_messageContentDb, visitor);
}
void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_queueDb, visitor);
}
void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_deliveryDb, visitor);
}
void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_exchangeDb, visitor);
}
void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_queueBindingsDb, visitor);
}
/**
* Generic visitDatabase allows iteration through the specified database.
*
* @param database The database to visit
* @param visitor The visitor to give each entry to.
*
* @throws DatabaseException If there is a problem with the Database structure
* @throws AMQStoreException If there is a problem with the Database contents
*/
void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
Cursor cursor = database.openCursor(null, null);
try
{
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
visitor.visit(key, value);
}
}
finally
{
if (cursor != null)
{
cursor.close();
}
}
}
private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
{
// _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called");
tx.commitNoSync();
BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
public void startCommitThread()
{
_commitThread.start();
}
private static final class BDBCommitFuture implements StoreFuture
{
// private static final Logger _log = Logger.getLogger(BDBCommitFuture.class);
private final CommitThread _commitThread;
private final com.sleepycat.je.Transaction _tx;
private DatabaseException _databaseException;
private boolean _complete;
private boolean _syncCommit;
public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
{
// _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
// + "): called");
_commitThread = commitThread;
_tx = tx;
_syncCommit = syncCommit;
}
public synchronized void complete()
{
if (_log.isDebugEnabled())
{
_log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
}
_complete = true;
notifyAll();
}
public synchronized void abort(DatabaseException databaseException)
{
// _log.debug("public synchronized void abort(DatabaseException databaseException = " + databaseException
// + "): called");
_complete = true;
_databaseException = databaseException;
notifyAll();
}
public void commit() throws DatabaseException
{
//_log.debug("public void commit(): called");
_commitThread.addJob(this);
if(!_syncCommit)
{
_log.debug("CommitAsync was requested, returning immediately.");
return;
}
synchronized (BDBCommitFuture.this)
{
while (!_complete)
{
try
{
wait(250);
}
catch (InterruptedException e)
{
// _log.error("Unexpected thread interruption: " + e, e);
throw new RuntimeException(e);
}
}
// _log.debug("Commit completed, _databaseException = " + _databaseException);
if (_databaseException != null)
{
throw _databaseException;
}
}
}
public synchronized boolean isComplete()
{
return _complete;
}
public void waitForCompletion()
{
while (!isComplete())
{
try
{
wait(250);
}
catch (InterruptedException e)
{
//TODO Should we ignore, or throw a 'StoreException'?
throw new RuntimeException(e);
}
}
}
}
/**
* Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
* completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collarations </table>
*/
private class CommitThread extends Thread
{
// private final Logger _log = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
public CommitThread(String name)
{
super(name);
_config.setForce(true);
}
public void run()
{
while (!_stopped.get())
{
synchronized (_lock)
{
while (!_stopped.get() && !hasJobs())
{
try
{
// RHM-7 Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
_lock.wait(250);
}
catch (InterruptedException e)
{
// _log.info(getName() + " interrupted. ");
}
}
}
processJobs();
}
}
private void processJobs()
{
// _log.debug("private void processJobs(): called");
// we replace the old queue atomically with a new one and this avoids any need to
// copy elements out of the queue
Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
try
{
// _environment.checkpoint(_config);
_environment.sync();
for (BDBCommitFuture commit : jobs)
{
commit.complete();
}
}
catch (DatabaseException e)
{
for (BDBCommitFuture commit : jobs)
{
commit.abort(e);
}
}
}
private boolean hasJobs()
{
return !_jobQueue.get().isEmpty();
}
public void addJob(BDBCommitFuture commit)
{
synchronized (_lock)
{
_jobQueue.get().add(commit);
_lock.notifyAll();
}
}
public void close()
{
synchronized (_lock)
{
_stopped.set(true);
_lock.notifyAll();
}
}
}
private class StoredBDBMessage implements StoredMessage
{
private final long _messageId;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private com.sleepycat.je.Transaction _txn;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
{
this(messageId, metaData, true);
}
StoredBDBMessage(long messageId,
StorableMessageMetaData metaData, boolean persist)
{
try
{
_messageId = messageId;
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
if(persist)
{
_txn = _environment.beginTransaction(null, null);
storeMetaData(_txn, messageId, metaData);
}
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
catch (AMQStoreException e)
{
throw new RuntimeException(e);
}
}
public StorableMessageMetaData getMetaData()
{
StorableMessageMetaData metaData = _metaDataRef.get();
if(metaData == null)
{
try
{
metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
}
catch (AMQStoreException e)
{
throw new RuntimeException(e);
}
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
return metaData;
}
public long getMessageNumber()
{
return _messageId;
}
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
try
{
BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
}
catch (AMQStoreException e)
{
throw new RuntimeException(e);
}
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
try
{
return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
catch (AMQStoreException e)
{
throw new RuntimeException(e);
}
}
public StoreFuture flushToStore()
{
try
{
if(_txn != null)
{
//if(_log.isDebugEnabled())
//{
// _log.debug("Flushing message " + _messageId + " to store");
//}
BDBMessageStore.this.commitTranImpl(_txn, true);
}
}
catch (AMQStoreException e)
{
throw new RuntimeException(e);
}
finally
{
_txn = null;
}
return IMMEDIATE_FUTURE;
}
public void remove()
{
flushToStore();
try
{
BDBMessageStore.this.removeMessage(_messageId);
}
catch (AMQStoreException e)
{
throw new RuntimeException(e);
}
}
}
private class BDBTransaction implements Transaction
{
private com.sleepycat.je.Transaction _txn;
private BDBTransaction()
{
try
{
_txn = _environment.beginTransaction(null, null);
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
}
public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
}
public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
}
public void commitTran() throws AMQStoreException
{
BDBMessageStore.this.commitTranImpl(_txn, true);
}
public StoreFuture commitTranAsync() throws AMQStoreException
{
return BDBMessageStore.this.commitTranImpl(_txn, false);
}
public void abortTran() throws AMQStoreException
{
BDBMessageStore.this.abortTran(_txn);
}
}
}