blob: fbecdb8fb6b2ffea28a69b82bcd7bca839e71552 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
*
* @org.apache.xbean.XBean element="amqPersistenceAdapter"
*
*/
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
private Scheduler scheduler;
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
private static final boolean BROKEN_FILE_LOCK;
private static final boolean DISABLE_LOCKING;
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private long checkpointInterval = 1000 * 20;
private int maxCheckpointMessageAddSize = 1024 * 4;
private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);
private Runnable periodicCheckpointTask;
private Runnable periodicCleanupTask;
private boolean deleteAllMessages;
private boolean syncOnWrite;
private boolean syncOnTransaction=true;
private String brokerName = "";
private File directory;
private File directoryArchive;
private BrokerService brokerService;
private final AtomicLong storeSize = new AtomicLong();
private boolean persistentIndex=true;
private boolean useNio = true;
private boolean archiveDataLogs=false;
private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
private RandomAccessFile lockFile;
private FileLock lock;
private boolean disableLocking = DISABLE_LOCKING;
private boolean failIfJournalIsLocked;
private boolean lockLogged;
private boolean lockAquired;
private boolean recoverReferenceStore=true;
private boolean forceRecoverReferenceStore=false;
private boolean useDedicatedTaskRunner=false;
private int journalThreadPriority = Thread.MAX_PRIORITY;
public String getBrokerName() {
return this.brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
if (this.referenceStoreAdapter != null) {
this.referenceStoreAdapter.setBrokerName(brokerName);
}
}
public BrokerService getBrokerService() {
return brokerService;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public synchronized void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
}
if (this.directory == null) {
if (brokerService != null) {
this.directory = brokerService.getBrokerDataDirectory();
} else {
this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
this.directory = new File(directory, "amqstore");
directory.getAbsolutePath();
}
}
if (this.directoryArchive == null) {
this.directoryArchive = new File(this.directory,"archive");
}
if (this.brokerService != null) {
this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
this.scheduler = this.brokerService.getScheduler();
} else {
this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
true, 1000, isUseDedicatedTaskRunner());
this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
}
IOHelper.mkdirs(this.directory);
lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
lock();
LOG.info("AMQStore starting using directory: " + directory);
if (archiveDataLogs) {
IOHelper.mkdirs(this.directoryArchive);
}
if (this.usageManager != null) {
this.usageManager.getMemoryUsage().addUsageListener(this);
}
if (asyncDataManager == null) {
asyncDataManager = createAsyncDataManager();
}
if (referenceStoreAdapter == null) {
referenceStoreAdapter = createReferenceStoreAdapter();
}
referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
referenceStoreAdapter.setBrokerName(getBrokerName());
referenceStoreAdapter.setUsageManager(usageManager);
referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
if (failIfJournalIsLocked) {
asyncDataManager.lock();
} else {
while (true) {
try {
asyncDataManager.lock();
break;
} catch (IOException e) {
LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
try {
Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
}
}
}
}
asyncDataManager.start();
if (deleteAllMessages) {
asyncDataManager.delete();
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED " + new Date());
Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
asyncDataManager.setMark(location, true);
LOG.info("Journal deleted: ");
deleteAllMessages = false;
} catch (IOException e) {
throw e;
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
referenceStoreAdapter.deleteAllMessages();
}
referenceStoreAdapter.start();
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
LOG.info("Active data files: " + files);
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
public boolean iterate() {
doCheckpoint();
return false;
}
}, "ActiveMQ Journal Checkpoint Worker");
createTransactionStore();
//
// The following was attempting to reduce startup times by avoiding the
// log
// file scanning that recovery performs. The problem with it is that XA
// transactions
// only live in transaction log and are not stored in the reference
// store, but they still
// need to be recovered when the broker starts up.
if (isForceRecoverReferenceStore()
|| (isRecoverReferenceStore() && !referenceStoreAdapter
.isStoreValid())) {
LOG.warn("The ReferenceStore is not valid - recovering ...");
recover();
LOG.info("Finished recovering the ReferenceStore");
} else {
Location location = writeTraceMessage("RECOVERED " + new Date(),
true);
asyncDataManager.setMark(location, true);
// recover transactions
getTransactionStore().setPreparedTransactions(
referenceStoreAdapter.retrievePreparedState());
}
// Do a checkpoint periodically.
periodicCheckpointTask = new Runnable() {
public void run() {
checkpoint(false);
}
};
scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
periodicCleanupTask = new Runnable() {
public void run() {
cleanup();
}
};
scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
if (lockAquired && lockLogged) {
LOG.info("Aquired lock for AMQ Store" + getDirectory());
if (brokerService != null) {
brokerService.getBroker().nowMasterBroker();
}
}
}
public void stop() throws Exception {
if (!started.compareAndSet(true, false)) {
return;
}
unlock();
if (lockFile != null) {
lockFile.close();
lockFile = null;
}
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (this) {
scheduler.cancel(periodicCheckpointTask);
scheduler.cancel(periodicCleanupTask);
}
Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
while (queueIterator.hasNext()) {
AMQMessageStore ms = queueIterator.next();
ms.stop();
}
Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
while (topicIterator.hasNext()) {
final AMQTopicMessageStore ms = topicIterator.next();
ms.stop();
}
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
synchronized (this) {
checkpointTask.shutdown();
}
referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
queues.clear();
topics.clear();
IOException firstException = null;
referenceStoreAdapter.stop();
referenceStoreAdapter = null;
if (this.brokerService == null) {
this.taskRunnerFactory.shutdown();
this.scheduler.stop();
}
try {
LOG.debug("Journal close");
asyncDataManager.close();
} catch (Exception e) {
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
if (firstException != null) {
throw firstException;
}
}
/**
* When we checkpoint we move all the journalled data to long term storage.
*
* @param sync
*/
public void checkpoint(boolean sync) {
try {
if (asyncDataManager == null) {
throw new IllegalStateException("Journal is closed.");
}
CountDownLatch latch = null;
synchronized (this) {
latch = nextCheckpointCountDownLatch;
checkpointTask.wakeup();
}
if (sync) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waitng for checkpoint to complete.");
}
latch.await();
}
referenceStoreAdapter.checkpoint(sync);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Request to start checkpoint failed: " + e, e);
} catch (IOException e) {
LOG.error("checkpoint failed: " + e, e);
}
}
/**
* This does the actual checkpoint.
*
* @return true if successful
*/
public boolean doCheckpoint() {
CountDownLatch latch = null;
synchronized (this) {
latch = nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1);
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Checkpoint started.");
}
Location currentMark = asyncDataManager.getMark();
Location newMark = currentMark;
Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
while (queueIterator.hasNext()) {
final AMQMessageStore ms = queueIterator.next();
Location mark = ms.getMark();
if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
newMark = mark;
}
}
Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
while (topicIterator.hasNext()) {
final AMQTopicMessageStore ms = topicIterator.next();
Location mark = ms.getMark();
if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
newMark = mark;
}
}
try {
if (newMark != currentMark) {
if (LOG.isDebugEnabled()) {
LOG.debug("Marking journal at: " + newMark);
}
asyncDataManager.setMark(newMark, false);
writeTraceMessage("CHECKPOINT " + new Date(), true);
}
} catch (Exception e) {
LOG.error("Failed to mark the Journal: " + e, e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Checkpoint done.");
}
} finally {
latch.countDown();
}
return true;
}
/**
* Cleans up the data files
* @throws IOException
*/
public void cleanup() {
try {
Set<Integer>inProgress = new HashSet<Integer>();
if (LOG.isDebugEnabled()) {
LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
}
for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
inProgress.addAll(set.keySet());
}
Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
inProgress.add(lastDataFile);
lastDataFile = asyncDataManager.getMark().getDataFileId();
inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
Location lastActiveTx = transactionStore.checkpoint();
if (lastActiveTx != null) {
lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
}
LOG.debug("lastDataFile: " + lastDataFile);
asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
} catch (IOException e) {
LOG.error("Could not cleanup data files: " + e, e);
}
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
if (destination.isQueue()) {
return createQueueMessageStore((ActiveMQQueue)destination);
} else {
return createTopicMessageStore((ActiveMQTopic)destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
AMQMessageStore store = queues.get(destination);
if (store == null) {
ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
store = new AMQMessageStore(this, checkpointStore, destination);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
queues.put(destination, store);
}
return store;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
AMQTopicMessageStore store = topics.get(destinationName);
if (store == null) {
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
topics.put(destinationName, store);
}
return store;
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
AMQMessageStore store= queues.remove(destination);
referenceStoreAdapter.removeQueueMessageStore(destination);
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
public long getLastMessageBrokerSequenceId() throws IOException {
return referenceStoreAdapter.getLastMessageBrokerSequenceId();
}
public void beginTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.beginTransaction(context);
}
public void commitTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.commitTransaction(context);
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.rollbackTransaction(context);
}
public boolean isPersistentIndex() {
return persistentIndex;
}
public void setPersistentIndex(boolean persistentIndex) {
this.persistentIndex = persistentIndex;
}
/**
* @param location
* @return
* @throws IOException
*/
public DataStructure readCommand(Location location) throws IOException {
try {
ByteSequence packet = asyncDataManager.read(location);
return (DataStructure)wireFormat.unmarshal(packet);
} catch (IOException e) {
throw createReadException(location, e);
}
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
referenceStoreAdapter.clearMessages();
Location pos = null;
int redoCounter = 0;
LOG.info("Journal Recovery Started from: " + asyncDataManager);
long start = System.currentTimeMillis();
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
// While we have records in the journal.
while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
ByteSequence data = asyncDataManager.read(pos);
DataStructure c = (DataStructure)wireFormat.unmarshal(data);
if (c instanceof Message) {
Message message = (Message)c;
AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
if (message.isInTransaction()) {
transactionStore.addMessage(store, message, pos);
} else {
if (store.replayAddMessage(context, message, pos)) {
redoCounter++;
}
}
} else {
switch (c.getDataStructureType()) {
case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
}
break;
case JournalQueueAck.DATA_STRUCTURE_TYPE: {
JournalQueueAck command = (JournalQueueAck)c;
AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
if (command.getMessageAck().isInTransaction()) {
transactionStore.removeMessage(store, command.getMessageAck(), pos);
} else {
if (store.replayRemoveMessage(context, command.getMessageAck())) {
redoCounter++;
}
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE: {
JournalTopicAck command = (JournalTopicAck)c;
AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
if (command.getTransactionId() != null) {
transactionStore.acknowledge(store, command, pos);
} else {
if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
redoCounter++;
}
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE: {
JournalTransaction command = (JournalTransaction)c;
try {
// Try to replay the packet.
switch (command.getType()) {
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
if (tx == null) {
break; // We may be trying to replay a commit
}
// that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
AMQTxOperation op = (AMQTxOperation)iter.next();
if (op.replay(this, context)) {
redoCounter++;
}
}
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
transactionStore.replayRollback(command.getTransactionId());
break;
default:
throw new IOException("Invalid journal command type: " + command.getType());
}
} catch (IOException e) {
LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace)c;
LOG.debug("TRACE Entry: " + trace.getMessage());
break;
default:
LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
Location location = writeTraceMessage("RECOVERED " + new Date(), true);
asyncDataManager.setMark(location, true);
long end = System.currentTimeMillis();
LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
}
private IOException createReadException(Location location, Exception e) {
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
}
protected IOException createWriteException(DataStructure packet, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
}
protected IOException createWriteException(String command, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
}
protected IOException createRecoveryFailedException(Exception e) {
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
}
/**
* @param command
* @param syncHint
* @return
* @throws IOException
*/
public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
return writeCommand(command, syncHint,false);
}
public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
try {
return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
} catch (IOException ioe) {
LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
brokerService.handleIOException(ioe);
throw ioe;
}
}
private Location writeTraceMessage(String message, boolean sync) throws IOException {
JournalTrace trace = new JournalTrace();
trace.setMessage(message);
return writeCommand(trace, sync);
}
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
checkpoint(false);
}
}
public AMQTransactionStore getTransactionStore() {
return transactionStore;
}
public synchronized void deleteAllMessages() throws IOException {
deleteAllMessages = true;
}
@Override
public String toString() {
return "AMQPersistenceAdapter(" + directory + ")";
}
// /////////////////////////////////////////////////////////////////
// Subclass overridables
// /////////////////////////////////////////////////////////////////
protected AsyncDataManager createAsyncDataManager() {
AsyncDataManager manager = new AsyncDataManager(storeSize);
manager.setDirectory(new File(directory, "journal"));
manager.setDirectoryArchive(getDirectoryArchive());
manager.setArchiveDataLogs(isArchiveDataLogs());
manager.setMaxFileLength(maxFileLength);
manager.setUseNio(useNio);
return manager;
}
protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
adaptor.setPersistentIndex(isPersistentIndex());
adaptor.setIndexBinSize(getIndexBinSize());
adaptor.setIndexKeySize(getIndexKeySize());
adaptor.setIndexPageSize(getIndexPageSize());
adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
adaptor.setIndexLoadFactor(getIndexLoadFactor());
return adaptor;
}
// /////////////////////////////////////////////////////////////////
// Property Accessors
// /////////////////////////////////////////////////////////////////
public AsyncDataManager getAsyncDataManager() {
return asyncDataManager;
}
public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
this.asyncDataManager = asyncDataManager;
}
public ReferenceStoreAdapter getReferenceStoreAdapter() {
return referenceStoreAdapter;
}
public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
/**
* @return Returns the wireFormat.
*/
public WireFormat getWireFormat() {
return wireFormat;
}
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
public SystemUsage getUsageManager() {
return usageManager;
}
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
}
public int getMaxCheckpointMessageAddSize() {
return maxCheckpointMessageAddSize;
}
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
}
public synchronized File getDirectory() {
return directory;
}
public synchronized void setDirectory(File directory) {
this.directory = directory;
}
public boolean isSyncOnWrite() {
return this.syncOnWrite;
}
public void setSyncOnWrite(boolean syncOnWrite) {
this.syncOnWrite = syncOnWrite;
}
public boolean isSyncOnTransaction() {
return syncOnTransaction;
}
public void setSyncOnTransaction(boolean syncOnTransaction) {
this.syncOnTransaction = syncOnTransaction;
}
/**
* @param referenceStoreAdapter the referenceStoreAdapter to set
*/
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
this.referenceStoreAdapter = referenceStoreAdapter;
}
public long size(){
return storeSize.get();
}
public boolean isUseNio() {
return useNio;
}
public void setUseNio(boolean useNio) {
this.useNio = useNio;
}
public int getMaxFileLength() {
return maxFileLength;
}
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setMaxFileLength(int maxFileLength) {
this.maxFileLength = maxFileLength;
}
public long getCleanupInterval() {
return cleanupInterval;
}
public void setCleanupInterval(long cleanupInterval) {
this.cleanupInterval = cleanupInterval;
}
public long getCheckpointInterval() {
return checkpointInterval;
}
public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval;
}
public int getIndexBinSize() {
return indexBinSize;
}
public void setIndexBinSize(int indexBinSize) {
this.indexBinSize = indexBinSize;
}
public int getIndexKeySize() {
return indexKeySize;
}
public void setIndexKeySize(int indexKeySize) {
this.indexKeySize = indexKeySize;
}
public int getIndexPageSize() {
return indexPageSize;
}
public int getIndexMaxBinSize() {
return indexMaxBinSize;
}
public void setIndexMaxBinSize(int maxBinSize) {
this.indexMaxBinSize = maxBinSize;
}
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setIndexPageSize(int indexPageSize) {
this.indexPageSize = indexPageSize;
}
public void setIndexLoadFactor(int factor){
this.indexLoadFactor=factor;
}
public int getIndexLoadFactor(){
return this.indexLoadFactor;
}
public int getMaxReferenceFileLength() {
return maxReferenceFileLength;
}
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setMaxReferenceFileLength(int maxReferenceFileLength) {
this.maxReferenceFileLength = maxReferenceFileLength;
}
public File getDirectoryArchive() {
return directoryArchive;
}
public void setDirectoryArchive(File directoryArchive) {
this.directoryArchive = directoryArchive;
}
public boolean isArchiveDataLogs() {
return archiveDataLogs;
}
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
}
public boolean isDisableLocking() {
return disableLocking;
}
public void setDisableLocking(boolean disableLocking) {
this.disableLocking = disableLocking;
}
/**
* @return the recoverReferenceStore
*/
public boolean isRecoverReferenceStore() {
return recoverReferenceStore;
}
/**
* @param recoverReferenceStore the recoverReferenceStore to set
*/
public void setRecoverReferenceStore(boolean recoverReferenceStore) {
this.recoverReferenceStore = recoverReferenceStore;
}
/**
* @return the forceRecoverReferenceStore
*/
public boolean isForceRecoverReferenceStore() {
return forceRecoverReferenceStore;
}
/**
* @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
*/
public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
this.forceRecoverReferenceStore = forceRecoverReferenceStore;
}
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
/**
* @return the journalThreadPriority
*/
public int getJournalThreadPriority() {
return this.journalThreadPriority;
}
/**
* @param journalThreadPriority the journalThreadPriority to set
*/
public void setJournalThreadPriority(int journalThreadPriority) {
this.journalThreadPriority = journalThreadPriority;
}
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
if (map == null) {
map = new ConcurrentHashMap<Integer, AtomicInteger>();
dataFilesInProgress.put(store, map);
}
AtomicInteger count = map.get(dataFileId);
if (count == null) {
count = new AtomicInteger(0);
map.put(dataFileId, count);
}
count.incrementAndGet();
}
protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
if (map != null) {
AtomicInteger count = map.get(dataFileId);
if (count != null) {
int newCount = count.decrementAndGet();
if (newCount <=0) {
map.remove(dataFileId);
}
}
if (map.isEmpty()) {
dataFilesInProgress.remove(store);
}
}
}
protected void lock() throws Exception {
lockLogged = false;
lockAquired = false;
do {
if (doLock()) {
lockAquired = true;
} else {
if (!lockLogged) {
LOG.warn("Waiting to Lock the Store " + getDirectory());
lockLogged = true;
}
Thread.sleep(1000);
}
} while (!lockAquired && !disableLocking);
}
private synchronized void unlock() throws IOException {
if (!disableLocking && (null != lock)) {
//clear property doesn't work on some platforms
System.getProperties().remove(getPropertyKey());
System.clearProperty(getPropertyKey());
assert(System.getProperty(getPropertyKey())==null);
if (lock.isValid()) {
lock.release();
lock.channel().close();
}
lock = null;
}
}
protected boolean doLock() throws IOException {
boolean result = true;
if (!disableLocking && directory != null && lock == null) {
String key = getPropertyKey();
String property = System.getProperty(key);
if (null == property) {
if (!BROKEN_FILE_LOCK) {
lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false);
if (lock == null) {
result = false;
} else {
System.setProperty(key, new Date().toString());
}
}
} else { // already locked
result = false;
}
}
return result;
}
private String getPropertyKey() throws IOException {
return getClass().getName() + ".lock." + directory.getCanonicalPath();
}
static {
BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".FileLockBroken",
"false"));
DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".DisableLocking",
"false"));
}
public long getLastProducerSequenceId(ProducerId id) {
// reference store send has adequate duplicate suppression
return -1;
}
}