blob: 58cf907afdb0ff9f0b089314f5278a4353cec710 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractKahaDBStore extends LockableServiceSupport {
static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "";
public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
protected File directory;
protected PageFile pageFile;
protected Journal journal;
protected AtomicLong journalSize = new AtomicLong(0);
protected boolean failIfDatabaseIsLocked;
protected long checkpointInterval = 5*1000;
protected long cleanupInterval = 30*1000;
private boolean cleanupOnStop = true;
protected boolean checkForCorruptJournalFiles = false;
protected boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
protected boolean archiveCorruptedIndex = false;
protected boolean enableIndexWriteAsync = false;
protected boolean enableJournalDiskSyncs = false;
protected boolean deleteAllJobs = false;
protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
protected boolean useIndexLFRUEviction = false;
protected float indexLFUEvictionFactor = 0.2f;
protected boolean ignoreMissingJournalfiles = false;
protected int indexCacheSize = 1000;
protected boolean enableIndexDiskSyncs = true;
protected boolean enableIndexRecoveryFile = true;
protected boolean enableIndexPageCaching = true;
protected boolean archiveDataLogs;
protected boolean purgeStoreOnStartup;
protected File directoryArchive;
protected AtomicBoolean opened = new AtomicBoolean();
protected Thread checkpointThread;
protected final Object checkpointThreadLock = new Object();
protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
* @return the name to give this store's PageFile instance.
protected abstract String getPageFileName();
* @return the location of the data directory if no set by configuration.
protected abstract File getDefaultDataDirectory();
* Loads the store from disk.
* Based on configuration this method can either load an existing store or it can purge
* an existing store and start in a clean state.
* @throws IOException if an error occurs during the load.
public abstract void load() throws IOException;
* Unload the state of the Store to disk and shuts down all resources assigned to this
* KahaDB store implementation.
* @throws IOException if an error occurs during the store unload.
public abstract void unload() throws IOException;
protected void doStart() throws Exception {
if (getDirectory() == null) {
try {
if (isPurgeStoreOnStartup()) {
journal = null;
getPageFile().delete();"{} Persistence store purged.", this);
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
} finally {
protected void doStop(ServiceStopper stopper) throws Exception {
public PageFile getPageFile() {
if (pageFile == null) {
pageFile = createPageFile();
return pageFile;
public Journal getJournal() throws IOException {
if (journal == null) {
journal = createJournal();
return journal;
public File getDirectory() {
return directory;
public void setDirectory(File directory) { = directory;
public boolean isArchiveCorruptedIndex() {
return archiveCorruptedIndex;
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
this.archiveCorruptedIndex = archiveCorruptedIndex;
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
public boolean isCheckForCorruptJournalFiles() {
return checkForCorruptJournalFiles;
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
public long getCheckpointInterval() {
return checkpointInterval;
public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval;
public long getCleanupInterval() {
return cleanupInterval;
public void setCleanupInterval(long cleanupInterval) {
this.cleanupInterval = cleanupInterval;
public void setCleanupOnStop(boolean cleanupOnStop) {
this.cleanupOnStop = cleanupOnStop;
public boolean getCleanupOnStop() {
return this.cleanupOnStop;
public boolean isChecksumJournalFiles() {
return checksumJournalFiles;
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
this.checksumJournalFiles = checksumJournalFiles;
public boolean isForceRecoverIndex() {
return forceRecoverIndex;
public void setForceRecoverIndex(boolean forceRecoverIndex) {
this.forceRecoverIndex = forceRecoverIndex;
public int getJournalMaxFileLength() {
return journalMaxFileLength;
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.journalMaxFileLength = journalMaxFileLength;
public int getJournalMaxWriteBatchSize() {
return journalMaxWriteBatchSize;
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
public boolean isEnableIndexWriteAsync() {
return enableIndexWriteAsync;
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.enableIndexWriteAsync = enableIndexWriteAsync;
public boolean isEnableJournalDiskSyncs() {
return enableJournalDiskSyncs;
public void setEnableJournalDiskSyncs(boolean syncWrites) {
this.enableJournalDiskSyncs = syncWrites;
public boolean isDeleteAllJobs() {
return deleteAllJobs;
public void setDeleteAllJobs(boolean deleteAllJobs) {
this.deleteAllJobs = deleteAllJobs;
* @return the archiveDataLogs
public boolean isArchiveDataLogs() {
return this.archiveDataLogs;
* @param archiveDataLogs the archiveDataLogs to set
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
* @return the directoryArchive
public File getDirectoryArchive() {
return this.directoryArchive;
* @param directoryArchive the directoryArchive to set
public void setDirectoryArchive(File directoryArchive) {
this.directoryArchive = directoryArchive;
public int getIndexCacheSize() {
return indexCacheSize;
public void setIndexCacheSize(int indexCacheSize) {
this.indexCacheSize = indexCacheSize;
public int getIndexWriteBatchSize() {
return indexWriteBatchSize;
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.indexWriteBatchSize = indexWriteBatchSize;
public boolean isUseIndexLFRUEviction() {
return useIndexLFRUEviction;
public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
this.useIndexLFRUEviction = useIndexLFRUEviction;
public float getIndexLFUEvictionFactor() {
return indexLFUEvictionFactor;
public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
this.indexLFUEvictionFactor = indexLFUEvictionFactor;
public boolean isEnableIndexDiskSyncs() {
return enableIndexDiskSyncs;
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
this.enableIndexDiskSyncs = enableIndexDiskSyncs;
public boolean isEnableIndexRecoveryFile() {
return enableIndexRecoveryFile;
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
this.enableIndexRecoveryFile = enableIndexRecoveryFile;
public boolean isEnableIndexPageCaching() {
return enableIndexPageCaching;
public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
this.enableIndexPageCaching = enableIndexPageCaching;
public boolean isPurgeStoreOnStartup() {
return this.purgeStoreOnStartup;
public void setPurgeStoreOnStartup(boolean purge) {
this.purgeStoreOnStartup = purge;
public boolean isIgnoreMissingJournalfiles() {
return ignoreMissingJournalfiles;
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
public long size() {
if (!isStarted()) {
return 0;
try {
return journalSize.get() + pageFile.getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
public Locker createDefaultLocker() throws IOException {
SharedFileLocker locker = new SharedFileLocker();
return locker;
public void init() throws Exception {
* Store a command in the Journal and process to update the Store index.
* @param command
* The specific JournalCommand to store and process.
* @returns the Location where the data was written in the Journal.
* @throws IOException if an error occurs storing or processing the command.
public Location store(JournalCommand<?> command) throws IOException {
return store(command, isEnableIndexDiskSyncs(), null, null, null);
* Store a command in the Journal and process to update the Store index.
* @param command
* The specific JournalCommand to store and process.
* @param sync
* Should the store operation be done synchronously. (ignored if completion passed).
* @returns the Location where the data was written in the Journal.
* @throws IOException if an error occurs storing or processing the command.
public Location store(JournalCommand<?> command, boolean sync) throws IOException {
return store(command, sync, null, null, null);
* Store a command in the Journal and process to update the Store index.
* @param command
* The specific JournalCommand to store and process.
* @param onJournalStoreComplete
* The Runnable to call when the Journal write operation completes.
* @returns the Location where the data was written in the Journal.
* @throws IOException if an error occurs storing or processing the command.
public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
* Store a command in the Journal and process to update the Store index.
* @param command
* The specific JournalCommand to store and process.
* @param sync
* Should the store operation be done synchronously. (ignored if completion passed).
* @param before
* The Runnable instance to execute before performing the store and process operation.
* @param after
* The Runnable instance to execute after performing the store and process operation.
* @returns the Location where the data was written in the Journal.
* @throws IOException if an error occurs storing or processing the command.
public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
return store(command, sync, before, after, null);
* All updated are are funneled through this method. The updates are converted to a
* JournalMessage which is logged to the journal and then the data from the JournalMessage
* is used to update the index just like it would be done during a recovery process.
* @param command
* The specific JournalCommand to store and process.
* @param sync
* Should the store operation be done synchronously. (ignored if completion passed).
* @param before
* The Runnable instance to execute before performing the store and process operation.
* @param after
* The Runnable instance to execute after performing the store and process operation.
* @param onJournalStoreComplete
* Callback to be run when the journal write operation is complete.
* @returns the Location where the data was written in the Journal.
* @throws IOException if an error occurs storing or processing the command.
public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
try {
if (before != null) {;
ByteSequence sequence = toByteSequence(command);
Location location;
try {
long start = System.currentTimeMillis();
location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
journal.write(sequence, onJournalStoreComplete);
long start2 = System.currentTimeMillis();
process(command, location);
long end = System.currentTimeMillis();
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {"Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
(start2-start), (end-start2));
} finally {
if (after != null) {;
if (checkpointThread != null && !checkpointThread.isAlive()) {
return location;
} catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal", ioe);
if (brokerService != null) {
throw ioe;
* Loads a previously stored JournalMessage
* @param location
* The location of the journal command to read.
* @return a new un-marshaled JournalCommand instance.
* @throws IOException if an error occurs reading the stored command.
protected JournalCommand<?> load(Location location) throws IOException {
ByteSequence data =;
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte);
if (type == null) {
try {
} catch (IOException e) {
throw new IOException("Could not load journal record. Invalid location: " + location);
JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
return message;
* Process a stored or recovered JournalCommand instance and update the DB Index with the
* state changes that this command produces. This can be called either as a new DB operation
* or as a replay during recovery operations.
* @param command
* The JournalCommand to process.
* @param location
* The location in the Journal where the command was written or read from.
protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
* Perform a checkpoint operation with optional cleanup.
* Called by the checkpoint background thread periodically to initiate a checkpoint operation
* and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
* longer needed journal log files etc.
* @param cleanup
* Should the method do a simple checkpoint or also perform a journal cleanup.
* @throws IOException if an error occurs during the checkpoint operation.
protected void checkpointUpdate(final boolean cleanup) throws IOException {
try {
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
} finally {
} finally {
* Perform the checkpoint update operation. If the cleanup flag is true then the
* operation should also purge any unused Journal log files.
* This method must always be called with the checkpoint and index write locks held.
* @param tx
* The TX under which to perform the checkpoint update.
* @param cleanup
* Should the checkpoint also do unused Journal file cleanup.
* @throws IOException if an error occurs while performing the checkpoint.
protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
* Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
* @param data
* The Journal Command that should be marshaled to bytes for writing.
* @return the byte representation of the given journal command.
* @throws IOException if an error occurs while serializing the command.
protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
return os.toByteSequence();
* Create the PageFile instance and configure it using the configuration options
* currently set.
* @return the newly created and configured PageFile instance.
protected PageFile createPageFile() {
PageFile index = new PageFile(getDirectory(), getPageFileName());
return index;
* Create a new Journal instance and configure it using the currently set configuration
* options. If an archive directory is configured than this method will attempt to create
* that directory if it does not already exist.
* @return the newly created an configured Journal instance.
* @throws IOException if an error occurs while creating the Journal object.
protected Journal createJournal() throws IOException {
Journal manager = new Journal();
manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
if (getDirectoryArchive() != null) {
return manager;
* Starts the checkpoint Thread instance if not already running and not disabled
* by configuration.
protected void startCheckpoint() {
if (checkpointInterval == 0 && cleanupInterval == 0) {"periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
synchronized (checkpointThreadLock) {
boolean start = false;
if (checkpointThread == null) {
start = true;
} else if (!checkpointThread.isAlive()) {
start = true;"KahaDB: Recovering checkpoint thread after death");
if (start) {
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
public void run() {
try {
long lastCleanup = System.currentTimeMillis();
long lastCheckpoint = System.currentTimeMillis();
// Sleep for a short time so we can periodically check
// to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
while (opened.get()) {
long now = System.currentTimeMillis();
if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
lastCleanup = now;
lastCheckpoint = now;
} else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
lastCheckpoint = now;
} catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread...
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
* Called from the worker thread to start a checkpoint.
* This method ensure that the store is in an opened state and optionaly logs information
* related to slow store access times.
* @param cleanup
* Should a cleanup of the journal occur during the checkpoint operation.
* @throws IOException if an error occurs during the checkpoint operation.
protected void checkpointCleanup(final boolean cleanup) throws IOException {
long start;
try {
start = System.currentTimeMillis();
if (!opened.get()) {
} finally {
long end = System.currentTimeMillis();
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {"Slow KahaDB access: cleanup took {}", (end - start));