blob: 58cf907afdb0ff9f0b089314f5278a4353cec710 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.SharedFileLocker;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractKahaDBStore extends LockableServiceSupport {
static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
protected File directory;
protected PageFile pageFile;
protected Journal journal;
protected AtomicLong journalSize = new AtomicLong(0);
protected boolean failIfDatabaseIsLocked;
protected long checkpointInterval = 5*1000;
protected long cleanupInterval = 30*1000;
private boolean cleanupOnStop = true;
protected boolean checkForCorruptJournalFiles = false;
protected boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
protected boolean archiveCorruptedIndex = false;
protected boolean enableIndexWriteAsync = false;
protected boolean enableJournalDiskSyncs = false;
protected boolean deleteAllJobs = false;
protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
protected boolean useIndexLFRUEviction = false;
protected float indexLFUEvictionFactor = 0.2f;
protected boolean ignoreMissingJournalfiles = false;
protected int indexCacheSize = 1000;
protected boolean enableIndexDiskSyncs = true;
protected boolean enableIndexRecoveryFile = true;
protected boolean enableIndexPageCaching = true;
protected boolean archiveDataLogs;
protected boolean purgeStoreOnStartup;
protected File directoryArchive;
protected AtomicBoolean opened = new AtomicBoolean();
protected Thread checkpointThread;
protected final Object checkpointThreadLock = new Object();
protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
/**
* @return the name to give this store's PageFile instance.
*/
protected abstract String getPageFileName();
/**
* @return the location of the data directory if no set by configuration.
*/
protected abstract File getDefaultDataDirectory();
/**
* Loads the store from disk.
*
* Based on configuration this method can either load an existing store or it can purge
* an existing store and start in a clean state.
*
* @throws IOException if an error occurs during the load.
*/
public abstract void load() throws IOException;
/**
* Unload the state of the Store to disk and shuts down all resources assigned to this
* KahaDB store implementation.
*
* @throws IOException if an error occurs during the store unload.
*/
public abstract void unload() throws IOException;
@Override
protected void doStart() throws Exception {
this.indexLock.writeLock().lock();
if (getDirectory() == null) {
setDirectory(getDefaultDataDirectory());
}
IOHelper.mkdirs(getDirectory());
try {
if (isPurgeStoreOnStartup()) {
getJournal().start();
getJournal().delete();
getJournal().close();
journal = null;
getPageFile().delete();
LOG.info("{} Persistence store purged.", this);
setPurgeStoreOnStartup(false);
}
load();
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
} finally {
this.indexLock.writeLock().unlock();
}
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
unload();
}
public PageFile getPageFile() {
if (pageFile == null) {
pageFile = createPageFile();
}
return pageFile;
}
public Journal getJournal() throws IOException {
if (journal == null) {
journal = createJournal();
}
return journal;
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public boolean isArchiveCorruptedIndex() {
return archiveCorruptedIndex;
}
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
this.archiveCorruptedIndex = archiveCorruptedIndex;
}
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
}
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
}
public boolean isCheckForCorruptJournalFiles() {
return checkForCorruptJournalFiles;
}
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
}
public long getCheckpointInterval() {
return checkpointInterval;
}
public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval;
}
public long getCleanupInterval() {
return cleanupInterval;
}
public void setCleanupInterval(long cleanupInterval) {
this.cleanupInterval = cleanupInterval;
}
public void setCleanupOnStop(boolean cleanupOnStop) {
this.cleanupOnStop = cleanupOnStop;
}
public boolean getCleanupOnStop() {
return this.cleanupOnStop;
}
public boolean isChecksumJournalFiles() {
return checksumJournalFiles;
}
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
this.checksumJournalFiles = checksumJournalFiles;
}
public boolean isForceRecoverIndex() {
return forceRecoverIndex;
}
public void setForceRecoverIndex(boolean forceRecoverIndex) {
this.forceRecoverIndex = forceRecoverIndex;
}
public int getJournalMaxFileLength() {
return journalMaxFileLength;
}
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.journalMaxFileLength = journalMaxFileLength;
}
public int getJournalMaxWriteBatchSize() {
return journalMaxWriteBatchSize;
}
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
}
public boolean isEnableIndexWriteAsync() {
return enableIndexWriteAsync;
}
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.enableIndexWriteAsync = enableIndexWriteAsync;
}
public boolean isEnableJournalDiskSyncs() {
return enableJournalDiskSyncs;
}
public void setEnableJournalDiskSyncs(boolean syncWrites) {
this.enableJournalDiskSyncs = syncWrites;
}
public boolean isDeleteAllJobs() {
return deleteAllJobs;
}
public void setDeleteAllJobs(boolean deleteAllJobs) {
this.deleteAllJobs = deleteAllJobs;
}
/**
* @return the archiveDataLogs
*/
public boolean isArchiveDataLogs() {
return this.archiveDataLogs;
}
/**
* @param archiveDataLogs the archiveDataLogs to set
*/
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
}
/**
* @return the directoryArchive
*/
public File getDirectoryArchive() {
return this.directoryArchive;
}
/**
* @param directoryArchive the directoryArchive to set
*/
public void setDirectoryArchive(File directoryArchive) {
this.directoryArchive = directoryArchive;
}
public int getIndexCacheSize() {
return indexCacheSize;
}
public void setIndexCacheSize(int indexCacheSize) {
this.indexCacheSize = indexCacheSize;
}
public int getIndexWriteBatchSize() {
return indexWriteBatchSize;
}
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.indexWriteBatchSize = indexWriteBatchSize;
}
public boolean isUseIndexLFRUEviction() {
return useIndexLFRUEviction;
}
public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
this.useIndexLFRUEviction = useIndexLFRUEviction;
}
public float getIndexLFUEvictionFactor() {
return indexLFUEvictionFactor;
}
public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
this.indexLFUEvictionFactor = indexLFUEvictionFactor;
}
public boolean isEnableIndexDiskSyncs() {
return enableIndexDiskSyncs;
}
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
this.enableIndexDiskSyncs = enableIndexDiskSyncs;
}
public boolean isEnableIndexRecoveryFile() {
return enableIndexRecoveryFile;
}
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
this.enableIndexRecoveryFile = enableIndexRecoveryFile;
}
public boolean isEnableIndexPageCaching() {
return enableIndexPageCaching;
}
public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
this.enableIndexPageCaching = enableIndexPageCaching;
}
public boolean isPurgeStoreOnStartup() {
return this.purgeStoreOnStartup;
}
public void setPurgeStoreOnStartup(boolean purge) {
this.purgeStoreOnStartup = purge;
}
public boolean isIgnoreMissingJournalfiles() {
return ignoreMissingJournalfiles;
}
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
}
public long size() {
if (!isStarted()) {
return 0;
}
try {
return journalSize.get() + pageFile.getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Locker createDefaultLocker() throws IOException {
SharedFileLocker locker = new SharedFileLocker();
locker.setDirectory(this.getDirectory());
return locker;
}
@Override
public void init() throws Exception {
}
/**
* Store a command in the Journal and process to update the Store index.
*
* @param command
* The specific JournalCommand to store and process.
*
* @returns the Location where the data was written in the Journal.
*
* @throws IOException if an error occurs storing or processing the command.
*/
public Location store(JournalCommand<?> command) throws IOException {
return store(command, isEnableIndexDiskSyncs(), null, null, null);
}
/**
* Store a command in the Journal and process to update the Store index.
*
* @param command
* The specific JournalCommand to store and process.
* @param sync
* Should the store operation be done synchronously. (ignored if completion passed).
*
* @returns the Location where the data was written in the Journal.
*
* @throws IOException if an error occurs storing or processing the command.
*/
public Location store(JournalCommand<?> command, boolean sync) throws IOException {
return store(command, sync, null, null, null);
}
/**
* Store a command in the Journal and process to update the Store index.
*
* @param command
* The specific JournalCommand to store and process.
* @param onJournalStoreComplete
* The Runnable to call when the Journal write operation completes.
*
* @returns the Location where the data was written in the Journal.
*
* @throws IOException if an error occurs storing or processing the command.
*/
public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
}
/**
* Store a command in the Journal and process to update the Store index.
*
* @param command
* The specific JournalCommand to store and process.
* @param sync
* Should the store operation be done synchronously. (ignored if completion passed).
* @param before
* The Runnable instance to execute before performing the store and process operation.
* @param after
* The Runnable instance to execute after performing the store and process operation.
*
* @returns the Location where the data was written in the Journal.
*
* @throws IOException if an error occurs storing or processing the command.
*/
public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
return store(command, sync, before, after, null);
}
/**
* All updated are are funneled through this method. The updates are converted to a
* JournalMessage which is logged to the journal and then the data from the JournalMessage
* is used to update the index just like it would be done during a recovery process.
*
* @param command
* The specific JournalCommand to store and process.
* @param sync
* Should the store operation be done synchronously. (ignored if completion passed).
* @param before
* The Runnable instance to execute before performing the store and process operation.
* @param after
* The Runnable instance to execute after performing the store and process operation.
* @param onJournalStoreComplete
* Callback to be run when the journal write operation is complete.
*
* @returns the Location where the data was written in the Journal.
*
* @throws IOException if an error occurs storing or processing the command.
*/
public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
try {
if (before != null) {
before.run();
}
ByteSequence sequence = toByteSequence(command);
Location location;
checkpointLock.readLock().lock();
try {
long start = System.currentTimeMillis();
location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
journal.write(sequence, onJournalStoreComplete);
long start2 = System.currentTimeMillis();
process(command, location);
long end = System.currentTimeMillis();
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
(start2-start), (end-start2));
}
} finally {
checkpointLock.readLock().unlock();
}
if (after != null) {
after.run();
}
if (checkpointThread != null && !checkpointThread.isAlive()) {
startCheckpoint();
}
return location;
} catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal", ioe);
if (brokerService != null) {
brokerService.handleIOException(ioe);
}
throw ioe;
}
}
/**
* Loads a previously stored JournalMessage
*
* @param location
* The location of the journal command to read.
*
* @return a new un-marshaled JournalCommand instance.
*
* @throws IOException if an error occurs reading the stored command.
*/
protected JournalCommand<?> load(Location location) throws IOException {
ByteSequence data = journal.read(location);
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte);
if (type == null) {
try {
is.close();
} catch (IOException e) {
}
throw new IOException("Could not load journal record. Invalid location: " + location);
}
JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
message.mergeFramed(is);
return message;
}
/**
* Process a stored or recovered JournalCommand instance and update the DB Index with the
* state changes that this command produces. This can be called either as a new DB operation
* or as a replay during recovery operations.
*
* @param command
* The JournalCommand to process.
* @param location
* The location in the Journal where the command was written or read from.
*/
protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
/**
* Perform a checkpoint operation with optional cleanup.
*
* Called by the checkpoint background thread periodically to initiate a checkpoint operation
* and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
* longer needed journal log files etc.
*
* @param cleanup
* Should the method do a simple checkpoint or also perform a journal cleanup.
*
* @throws IOException if an error occurs during the checkpoint operation.
*/
protected void checkpointUpdate(final boolean cleanup) throws IOException {
checkpointLock.writeLock().lock();
try {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
}
});
} finally {
this.indexLock.writeLock().unlock();
}
} finally {
checkpointLock.writeLock().unlock();
}
}
/**
* Perform the checkpoint update operation. If the cleanup flag is true then the
* operation should also purge any unused Journal log files.
*
* This method must always be called with the checkpoint and index write locks held.
*
* @param tx
* The TX under which to perform the checkpoint update.
* @param cleanup
* Should the checkpoint also do unused Journal file cleanup.
*
* @throws IOException if an error occurs while performing the checkpoint.
*/
protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
/**
* Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
*
* @param data
* The Journal Command that should be marshaled to bytes for writing.
*
* @return the byte representation of the given journal command.
*
* @throws IOException if an error occurs while serializing the command.
*/
protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeFramed(os);
return os.toByteSequence();
}
/**
* Create the PageFile instance and configure it using the configuration options
* currently set.
*
* @return the newly created and configured PageFile instance.
*/
protected PageFile createPageFile() {
PageFile index = new PageFile(getDirectory(), getPageFileName());
index.setEnableWriteThread(isEnableIndexWriteAsync());
index.setWriteBatchSize(getIndexWriteBatchSize());
index.setPageCacheSize(getIndexCacheSize());
index.setUseLFRUEviction(isUseIndexLFRUEviction());
index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
index.setEnablePageCaching(isEnableIndexPageCaching());
return index;
}
/**
* Create a new Journal instance and configure it using the currently set configuration
* options. If an archive directory is configured than this method will attempt to create
* that directory if it does not already exist.
*
* @return the newly created an configured Journal instance.
*
* @throws IOException if an error occurs while creating the Journal object.
*/
protected Journal createJournal() throws IOException {
Journal manager = new Journal();
manager.setDirectory(getDirectory());
manager.setMaxFileLength(getJournalMaxFileLength());
manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles());
manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
manager.setArchiveDataLogs(isArchiveDataLogs());
manager.setSizeAccumulator(journalSize);
manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive());
}
return manager;
}
/**
* Starts the checkpoint Thread instance if not already running and not disabled
* by configuration.
*/
protected void startCheckpoint() {
if (checkpointInterval == 0 && cleanupInterval == 0) {
LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
return;
}
synchronized (checkpointThreadLock) {
boolean start = false;
if (checkpointThread == null) {
start = true;
} else if (!checkpointThread.isAlive()) {
start = true;
LOG.info("KahaDB: Recovering checkpoint thread after death");
}
if (start) {
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
@Override
public void run() {
try {
long lastCleanup = System.currentTimeMillis();
long lastCheckpoint = System.currentTimeMillis();
// Sleep for a short time so we can periodically check
// to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
while (opened.get()) {
Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
checkpointCleanup(true);
lastCleanup = now;
lastCheckpoint = now;
} else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
checkpointCleanup(false);
lastCheckpoint = now;
}
}
} catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread...
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
}
}
};
checkpointThread.setDaemon(true);
checkpointThread.start();
}
}
}
/**
* Called from the worker thread to start a checkpoint.
*
* This method ensure that the store is in an opened state and optionaly logs information
* related to slow store access times.
*
* @param cleanup
* Should a cleanup of the journal occur during the checkpoint operation.
*
* @throws IOException if an error occurs during the checkpoint operation.
*/
protected void checkpointCleanup(final boolean cleanup) throws IOException {
long start;
this.indexLock.writeLock().lock();
try {
start = System.currentTimeMillis();
if (!opened.get()) {
return;
}
} finally {
this.indexLock.writeLock().unlock();
}
checkpointUpdate(cleanup);
long end = System.currentTimeMillis();
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: cleanup took {}", (end - start));
}
}
}