blob: e49324b42ad4541952e65e6a020d13769dde4b5e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
import org.apache.bookkeeper.common.collections.RecyclableArrayList;
import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Provide journal related management.
public class Journal extends BookieCriticalThread implements CheckpointSource {
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
private static final RecyclableArrayList.Recycler<QueueEntry> entryListRecycler =
new RecyclableArrayList.Recycler<QueueEntry>();
* Filter to pickup journals.
public interface JournalIdFilter {
boolean accept(long journalId);
* For testability.
public interface BufferedChannelBuilder {
BufferedChannelBuilder DEFAULT_BCBUILDER = (FileChannel fc,
int capacity) -> new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
BufferedChannel create(FileChannel fc, int capacity) throws IOException;
* List all journal ids by a specified journal id filer.
* @param journalDir journal dir
* @param filter journal id filter
* @return list of filtered ids
public static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
File[] logFiles = journalDir.listFiles();
if (logFiles == null || logFiles.length == 0) {
return Collections.emptyList();
List<Long> logs = new ArrayList<Long>();
for (File f: logFiles) {
String name = f.getName();
if (!name.endsWith(".txn")) {
String idString = name.split("\\.")[0];
long id = Long.parseLong(idString, 16);
if (filter != null) {
if (filter.accept(id)) {
} else {
return logs;
* A wrapper over log mark to provide a checkpoint for users of journal
* to do checkpointing.
private static class LogMarkCheckpoint implements Checkpoint {
final LastLogMark mark;
public LogMarkCheckpoint(LastLogMark checkpoint) {
this.mark = checkpoint;
public int compareTo(Checkpoint o) {
if (o == Checkpoint.MAX) {
return -1;
} else if (o == Checkpoint.MIN) {
return 1;
return mark.getCurMark().compare(((LogMarkCheckpoint) o).mark.getCurMark());
public boolean equals(Object o) {
if (!(o instanceof LogMarkCheckpoint)) {
return false;
return 0 == compareTo((LogMarkCheckpoint) o);
public int hashCode() {
return mark.hashCode();
public String toString() {
return mark.toString();
* Last Log Mark.
public class LastLogMark {
private final LogMark curMark;
LastLogMark(long logId, long logPosition) {
this.curMark = new LogMark(logId, logPosition);
void setCurLogMark(long logId, long logPosition) {
curMark.setLogMark(logId, logPosition);
LastLogMark markLog() {
return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset());
public LogMark getCurMark() {
return curMark;
void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
// we should record <logId, logPosition> marked in markLog
// which is safe since records before lastMark have been
// persisted to disk (both index & entry logger)
if (LOG.isDebugEnabled()) {
LOG.debug("RollLog to persist last marked log : {}", lastMark.getCurMark());
List<File> writableLedgerDirs = ledgerDirsManager
for (File dir : writableLedgerDirs) {
File file = new File(dir, lastMarkFileName);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
fos = null;
} catch (IOException e) {
LOG.error("Problems writing to " + file, e);
} finally {
// if stream already closed in try block successfully,
// stream might have nullified, in such case below
// call will simply returns
IOUtils.close(LOG, fos);
* Read last mark from lastMark file.
* The last mark should first be max journal log id,
* and then max log position in max journal log.
void readLog() {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark();
for (File dir: ledgerDirsManager.getAllLedgerDirs()) {
File file = new File(dir, lastMarkFileName);
try {
try (FileInputStream fis = new FileInputStream(file)) {
int bytesRead =;
if (bytesRead != 16) {
throw new IOException("Couldn't read enough bytes from lastMark."
+ " Wanted " + 16 + ", got " + bytesRead);
if ( < 0) {
curMark.setLogMark(mark.getLogFileId(), mark.getLogFileOffset());
} catch (IOException e) {
LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this "
+ "bookie");
public String toString() {
return curMark.toString();
* Filter to return list of journals for rolling.
private static class JournalRollingFilter implements JournalIdFilter {
final LastLogMark lastMark;
JournalRollingFilter(LastLogMark lastMark) {
this.lastMark = lastMark;
public boolean accept(long journalId) {
return journalId < lastMark.getCurMark().getLogFileId();
* Scanner used to scan a journal.
public interface JournalScanner {
* Process a journal entry.
* @param journalVersion Journal Version
* @param offset File offset of the journal entry
* @param entry Journal Entry
* @throws IOException
void process(int journalVersion, long offset, ByteBuffer entry) throws IOException;
* Journal Entry to Record.
static class QueueEntry implements Runnable {
ByteBuf entry;
long ledgerId;
long entryId;
WriteCallback cb;
Object ctx;
long enqueueTime;
long enqueueCbThreadPooleQueueTime;
boolean ackBeforeSync;
OpStatsLogger journalAddEntryStats;
OpStatsLogger journalCbQueuedLatency;
Counter journalCbQueueSize;
Counter cbThreadPoolQueueSize;
Counter callbackTime;
static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId,
WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats,
Counter journalCbQueueSize, Counter cbThreadPoolQueueSize,
OpStatsLogger journalCbQueuedLatency, Counter callbackTime) {
QueueEntry qe = RECYCLER.get();
qe.entry = entry;
qe.ackBeforeSync = ackBeforeSync;
qe.cb = cb;
qe.ctx = ctx;
qe.ledgerId = ledgerId;
qe.entryId = entryId;
qe.enqueueTime = enqueueTime;
qe.journalAddEntryStats = journalAddEntryStats;
qe.journalCbQueuedLatency = journalCbQueuedLatency;
qe.journalCbQueueSize = journalCbQueueSize;
qe.cbThreadPoolQueueSize = cbThreadPoolQueueSize;
qe.callbackTime = callbackTime;
return qe;
public void setEnqueueCbThreadPooleQueueTime(long enqueueCbThreadPooleQueueTime) {
this.enqueueCbThreadPooleQueueTime = enqueueCbThreadPooleQueueTime;
public void run() {
MathUtils.elapsedNanos(enqueueCbThreadPooleQueueTime), TimeUnit.NANOSECONDS);
long startTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
cb.writeComplete(0, ledgerId, entryId, null, ctx);
private final Handle<QueueEntry> recyclerHandle;
private QueueEntry(Handle<QueueEntry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
private static final Recycler<QueueEntry> RECYCLER = new Recycler<QueueEntry>() {
protected QueueEntry newObject(Recycler.Handle<QueueEntry> handle) {
return new QueueEntry(handle);
private void recycle() {
* Token which represents the need to force a write to the Journal.
public class ForceWriteRequest {
private JournalChannel logFile;
private RecyclableArrayList<QueueEntry> forceWriteWaiters;
private boolean shouldClose;
private boolean isMarker;
private long lastFlushedPosition;
private long logId;
private long enqueueTime;
public int process(boolean shouldForceWrite) throws IOException {
.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
if (isMarker) {
return 0;
long startTime = MathUtils.nowInNano();
try {
if (shouldForceWrite) {
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
// Notify the waiters that the force write succeeded
for (int i = 0; i < forceWriteWaiters.size(); i++) {
QueueEntry qe = forceWriteWaiters.get(i);
if (qe != null) {
return forceWriteWaiters.size();
} catch (IOException e) {
.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
throw e;
} finally {
public void closeFileIfNecessary() {
// Close if shouldClose is set
if (shouldClose) {
// We should guard against exceptions so its
// safe to call in catch blocks
try {
// Call close only once
shouldClose = false;
} catch (IOException ioe) {
LOG.error("I/O exception while closing file", ioe);
private final Handle<ForceWriteRequest> recyclerHandle;
private ForceWriteRequest(Handle<ForceWriteRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
private void recycle() {
logFile = null;
if (forceWriteWaiters != null) {
forceWriteWaiters = null;
private ForceWriteRequest createForceWriteRequest(JournalChannel logFile,
long logId,
long lastFlushedPosition,
RecyclableArrayList<QueueEntry> forceWriteWaiters,
boolean shouldClose,
boolean isMarker) {
ForceWriteRequest req = forceWriteRequestsRecycler.get();
req.forceWriteWaiters = forceWriteWaiters;
req.logFile = logFile;
req.logId = logId;
req.lastFlushedPosition = lastFlushedPosition;
req.shouldClose = shouldClose;
req.isMarker = isMarker;
req.enqueueTime = MathUtils.nowInNano();
return req;
private final Recycler<ForceWriteRequest> forceWriteRequestsRecycler = new Recycler<ForceWriteRequest>() {
protected ForceWriteRequest newObject(
Recycler.Handle<ForceWriteRequest> handle) {
return new ForceWriteRequest(handle);
* ForceWriteThread is a background thread which makes the journal durable periodically.
private class ForceWriteThread extends BookieCriticalThread {
volatile boolean running = true;
// This holds the queue entries that should be notified after a
// successful force write
Thread threadToNotifyOnEx;
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
public ForceWriteThread(Thread threadToNotifyOnEx,
boolean enableGroupForceWrites,
StatsLogger statsLogger) {
this.threadToNotifyOnEx = threadToNotifyOnEx;
this.enableGroupForceWrites = enableGroupForceWrites;
this.forceWriteThreadTime = statsLogger.getThreadScopedCounter("force-write-thread-time");
public void run() {"ForceWrite Thread started");
ThreadRegistry.register(super.getName(), 0);
if (conf.isBusyWaitEnabled()) {
try {
} catch (Exception e) {
LOG.warn("Unable to acquire CPU core for Journal ForceWrite thread: {}", e.getMessage(), e);
boolean shouldForceWrite = true;
int numReqInLastForceWrite = 0;
long busyStartTime = System.nanoTime();
while (running) {
ForceWriteRequest req = null;
boolean forceWriteMarkerSent = false;
try {
req = forceWriteRequests.take();
busyStartTime = System.nanoTime();
// Force write the file and then notify the write completions
if (!req.isMarker) {
if (shouldForceWrite) {
// if we are going to force write, any request that is already in the
// queue will benefit from this force write - post a marker prior to issuing
// the flush so until this marker is encountered we can skip the force write
if (enableGroupForceWrites) {
ForceWriteRequest marker =
createForceWriteRequest(req.logFile, 0, 0, null, false, true);
forceWriteMarkerSent = forceWriteRequests.offer(marker);
if (!forceWriteMarkerSent) {
Counter failures = journalStats.getForceWriteGroupingFailures();;
"Fail to send force write grouping marker,"
+ " Journal.forceWriteRequests queue(capacity {}) is full,"
+ " current failure counter is {}.",
conf.getJournalQueueSize(), failures.get());
// If we are about to issue a write, record the number of requests in
// the last force write and then reset the counter so we can accumulate
// requests in the write we are about to issue
if (numReqInLastForceWrite > 0) {
numReqInLastForceWrite = 0;
numReqInLastForceWrite += req.process(shouldForceWrite);
if (enableGroupForceWrites
// if its a marker we should switch back to flushing
&& !req.isMarker
// If group marker sending failed, we can't figure out which writes are
// grouped in this force write. So, abandon it even if other writes could
// be grouped. This should be extremely rare as, usually, queue size is
// large enough to accommodate high flush frequencies.
&& forceWriteMarkerSent
// This indicates that this is the last request in a given file
// so subsequent requests will go to a different file so we should
// flush on the next request
&& !req.shouldClose) {
shouldForceWrite = false;
} else {
shouldForceWrite = true;
} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();"ForceWrite thread interrupted");
// close is idempotent
if (null != req) {
req.shouldClose = true;
running = false;
} finally {
if (req != null) {
// Regardless of what caused us to exit, we should notify the
// the parent thread as it should either exit or be in the process
// of exiting else we will have write requests hang
// shutdown sync thread
void shutdown() throws InterruptedException {
running = false;
private static class CbThreadFactory implements ThreadFactory {
private int counter = 0;
private String threadBaseName = "bookie-journal-callback";
public Thread newThread(Runnable r) {
int threadOrdinal = counter++;
Thread t = new Thread(r, threadBaseName + "-" + threadOrdinal);
ThreadRegistry.register(threadBaseName, threadOrdinal, t.getId());
return t;
static final int PADDING_MASK = -0x100;
static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int journalAlignSize)
throws IOException {
int bytesToAlign = (int) (jc.bc.position() % journalAlignSize);
if (0 != bytesToAlign) {
int paddingBytes = journalAlignSize - bytesToAlign;
if (paddingBytes < 8) {
paddingBytes = journalAlignSize - (8 - paddingBytes);
} else {
paddingBytes -= 8;
// padding mask
// padding len
// padding bytes
paddingBuffer.writerIndex(paddingBuffer.writerIndex() + paddingBytes);
// write padding bytes
static final long MB = 1024 * 1024L;
static final int KB = 1024;
// max journal file size
final long maxJournalSize;
// pre-allocation size for the journal files
final long journalPreAllocSize;
// write buffer size for the journal files
final int journalWriteBufferSize;
// number journal files kept before marked journal
final int maxBackupJournals;
final File journalDirectory;
final ServerConfiguration conf;
final ForceWriteThread forceWriteThread;
final FileChannelProvider fileChannelProvider;
// Time after which we will stop grouping and issue the flush
private final long maxGroupWaitInNanos;
// Threshold after which we flush any buffered journal entries
private final long bufferedEntriesThreshold;
// Threshold after which we flush any buffered journal writes
private final long bufferedWritesThreshold;
// should we flush if the queue is empty
private final boolean flushWhenQueueEmpty;
// should we hint the filesystem to remove pages from cache after force write
private final boolean removePagesFromCache;
private final int journalFormatVersionToWrite;
private final int journalAlignmentSize;
// control PageCache flush interval when syncData disabled to reduce disk io util
private final long journalPageCacheFlushIntervalMSec;
// Whether reuse journal files, it will use maxBackupJournal as the journal file pool.
private final boolean journalReuseFiles;
// Should data be fsynced on disk before triggering the callback
private final boolean syncData;
private final LastLogMark lastLogMark = new LastLogMark(0, 0);
private static final String LAST_MARK_DEFAULT_NAME = "lastMark";
private final String lastMarkFileName;
* The thread pool used to handle callback.
private final ExecutorService cbThreadPool;
private final Counter callbackTime;
private final Counter journalTime;
private static String journalThreadName = "BookieJournal";
// journal entry queue to commit
final BlockingQueue<QueueEntry> queue;
final BlockingQueue<ForceWriteRequest> forceWriteRequests;
volatile boolean running = true;
private final LedgerDirsManager ledgerDirsManager;
private final ByteBufAllocator allocator;
private final MemoryLimitController memoryLimitController;
// Expose Stats
private final JournalStats journalStats;
private JournalAliveListener journalAliveListener;
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) {
this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE,
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) {
super(journalThreadName + "-" + conf.getBookiePort());
this.allocator = allocator;
StatsLogger journalStatsLogger = statsLogger.scopeLabel("journalIndex", String.valueOf(journalIndex));
if (conf.isBusyWaitEnabled()) {
// To achieve lower latency, use busy-wait blocking queue implementation
queue = new BlockingMpscQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize());
} else {
queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
// Adjust the journal max memory in case there are multiple journals configured.
long journalMaxMemory = conf.getJournalMaxMemorySizeMb() / conf.getJournalDirNames().length * 1024 * 1024;
this.memoryLimitController = new MemoryLimitController(journalMaxMemory);
this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
this.journalDirectory = journalDirectory;
this.maxJournalSize = conf.getMaxJournalSizeMB() * MB;
this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
this.syncData = conf.getJournalSyncData();
this.maxBackupJournals = conf.getMaxBackupJournals();
this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites(),
this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
this.journalAlignmentSize = conf.getJournalAlignmentSize();
this.journalPageCacheFlushIntervalMSec = conf.getJournalPageCacheFlushIntervalMSec();
this.journalReuseFiles = conf.getJournalReuseFiles();
if (conf.getNumJournalCallbackThreads() > 0) {
this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
new CbThreadFactory());
this.callbackTime = journalStatsLogger.getThreadScopedCounter("callback-thread-time");
} else {
this.cbThreadPool = MoreExecutors.newDirectExecutorService();
this.callbackTime = journalStatsLogger.getThreadScopedCounter("callback-time");
this.journalTime = journalStatsLogger.getThreadScopedCounter("journal-thread-time");
// Unless there is a cap on the max wait (which requires group force writes)
// we cannot skip flushing for queue empty
this.flushWhenQueueEmpty = maxGroupWaitInNanos <= 0 || conf.getJournalFlushWhenQueueEmpty();
this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
// read last log mark
if (conf.getJournalDirs().length == 1) {
} else {
lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
if (LOG.isDebugEnabled()) {
LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
try {
this.fileChannelProvider = FileChannelProvider.newProvider(conf.getJournalChannelProvider());
} catch (IOException e) {
LOG.error("Failed to initiate file channel provider: {}", conf.getJournalChannelProvider());
throw new RuntimeException(e);
// Expose Stats
this.journalStats = new JournalStats(journalStatsLogger, journalMaxMemory,
() -> memoryLimitController.currentUsage());
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger,
ByteBufAllocator allocator, JournalAliveListener journalAliveListener) {
this(journalIndex, journalDirectory, conf, ledgerDirsManager, statsLogger, allocator);
this.journalAliveListener = journalAliveListener;
JournalStats getJournalStats() {
return this.journalStats;
public File getJournalDirectory() {
return journalDirectory;
public LastLogMark getLastLogMark() {
return lastLogMark;
* Update lastLogMark of the journal
* Indicates that the file has been processed.
* @param id
* @param scanOffset
void setLastLogMark(Long id, long scanOffset) {
lastLogMark.setCurLogMark(id, scanOffset);
* Application tried to schedule a checkpoint. After all the txns added
* before checkpoint are persisted, a <i>checkpoint</i> will be returned
* to application. Application could use <i>checkpoint</i> to do its logic.
public Checkpoint newCheckpoint() {
return new LogMarkCheckpoint(lastLogMark.markLog());
* Telling journal a checkpoint is finished.
* @throws IOException
public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
if (!(checkpoint instanceof LogMarkCheckpoint)) {
return; // we didn't create this checkpoint, so dont do anything with it
LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint;
LastLogMark mark = lmcheckpoint.mark;
if (compact) {
// list the journals that have been marked
List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter(mark));
// keep MAX_BACKUP_JOURNALS journal files before marked journal
if (logs.size() >= maxBackupJournals) {
int maxIdx = logs.size() - maxBackupJournals;
for (int i = 0; i < maxIdx; i++) {
long id = logs.get(i);
// make sure the journal id is smaller than marked journal id
if (id < mark.getCurMark().getLogFileId()) {
File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
if (!journalFile.delete()) {
LOG.warn("Could not delete old journal file {}", journalFile);
}"garbage collected journal " + journalFile.getName());
* Scan the journal.
* @param journalId Journal Log Id
* @param journalPos Offset to start scanning
* @param scanner Scanner to handle entries
* @return scanOffset - represents the byte till which journal was read
* @throws IOException
public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
conf, fileChannelProvider);
} else {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
journalPos, conf, fileChannelProvider);
int journalVersion = recLog.getFormatVersion();
try {
ByteBuffer lenBuff = ByteBuffer.allocate(4);
ByteBuffer recBuff = ByteBuffer.allocate(64 * 1024);
while (true) {
// entry start offset
long offset = recLog.fc.position();
// start reading entry
fullRead(recLog, lenBuff);
if (lenBuff.remaining() != 0) {
int len = lenBuff.getInt();
if (len == 0) {
boolean isPaddingRecord = false;
if (len < 0) {
if (len == PADDING_MASK && journalVersion >= JournalChannel.V5) {
// skip padding bytes
fullRead(recLog, lenBuff);
if (lenBuff.remaining() != 0) {
len = lenBuff.getInt();
if (len == 0) {
isPaddingRecord = true;
} else {
LOG.error("Invalid record found with negative length: {}", len);
throw new IOException("Invalid record found with negative length " + len);
if (recBuff.remaining() < len) {
recBuff = ByteBuffer.allocate(len);
if (fullRead(recLog, recBuff) != len) {
// This seems scary, but it just means that this is where we
// left off writing
if (!isPaddingRecord) {
scanner.process(journalVersion, offset, recBuff);
return recLog.fc.position();
} finally {
public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
* record an add entry operation in journal.
public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
// Retain entry until it gets written to journal
entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
null, false /* ackBeforeSync */, ledgerId,
BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
// Increment afterwards because the add operation could fail.
* Get the length of journal entries queue.
* @return length of journal entry queue.
public int getJournalQueueLength() {
return queue.size();
* A thread used for persisting journal entries to journal files.
* <p>
* Besides persisting journal entries, it also takes responsibility of
* rolling journal files when a journal file reaches journal file size
* limitation.
* </p>
* <p>
* During journal rolling, it first closes the writing journal, generates
* new journal file using current timestamp, and continue persistence logic.
* Those journals will be garbage collected in SyncThread.
* </p>
* @see org.apache.bookkeeper.bookie.SyncThread
public void run() {"Starting journal on {}", journalDirectory);
ThreadRegistry.register(journalThreadName, 0);
if (conf.isBusyWaitEnabled()) {
try {
} catch (Exception e) {
LOG.warn("Unable to acquire CPU core for Journal thread: {}", e.getMessage(), e);
RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
int numEntriesToFlush = 0;
ByteBuf lenBuff = Unpooled.buffer(4);
ByteBuf paddingBuff = Unpooled.buffer(2 * conf.getJournalAlignmentSize());
BufferedChannel bc = null;
JournalChannel logFile = null;
Stopwatch journalCreationWatcher = Stopwatch.createUnstarted();
Stopwatch journalFlushWatcher = Stopwatch.createUnstarted();
long batchSize = 0;
try {
List<Long> journalIds = listJournalIds(journalDirectory, null);
// Should not use, which use System.nanoTime() and
// could only be used to measure elapsed time.
long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
long lastFlushPosition = 0;
boolean groupWhenTimeout = false;
long dequeueStartTime = 0L;
long lastFlushTimeMs = System.currentTimeMillis();
long busyStartTime = System.nanoTime();
QueueEntry qe = null;
while (true) {
// new journal file to write
if (null == logFile) {
logId = logId + 1;
journalIds = listJournalIds(journalDirectory, null);
Long replaceLogId = fileChannelProvider.supportReuseFile() && journalReuseFiles
&& journalIds.size() >= maxBackupJournals
&& journalIds.get(0) < lastLogMark.getCurMark().getLogFileId()
? journalIds.get(0) : null;
logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
journalAlignmentSize, removePagesFromCache,
journalFormatVersionToWrite, getBufferedChannelBuilder(),
conf, fileChannelProvider, replaceLogId);
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
bc = logFile.getBufferedChannel();
lastFlushPosition = bc.position();
if (qe == null) {
if (dequeueStartTime != 0) {
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS);
if (numEntriesToFlush == 0) {
qe = queue.take();
dequeueStartTime = MathUtils.nowInNano();
busyStartTime = dequeueStartTime;
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
} else {
long pollWaitTimeNanos = maxGroupWaitInNanos
- MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
pollWaitTimeNanos = 0;
qe = queue.poll(pollWaitTimeNanos, TimeUnit.NANOSECONDS);
dequeueStartTime = MathUtils.nowInNano();
if (qe != null) {
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
boolean shouldFlush = false;
// We should issue a forceWrite if any of the three conditions below holds good
// 1. If the oldest pending entry has been pending for longer than the max wait time
if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils
.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
groupWhenTimeout = true;
} else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
&& (qe == null // no entry to group
|| MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) {
// when group timeout, it would be better to look forward, as there might be lots of
// entries already timeout
// due to a previous slow write (writing to filesystem which impacted by force write).
// Group those entries in the queue
// a) already timeout
// b) limit the number of entries to group
groupWhenTimeout = false;
shouldFlush = true;
} else if (qe != null
&& ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold)
|| (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
// 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
groupWhenTimeout = false;
shouldFlush = true;
} else if (qe == null && flushWhenQueueEmpty) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
// for timeout that would put is past the maxWait threshold
// 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
// publish at a time - common case in tests.
groupWhenTimeout = false;
shouldFlush = true;
// toFlush is non null and not empty so should be safe to access getFirst
if (shouldFlush) {
if (journalFormatVersionToWrite >= JournalChannel.V5) {
writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
for (int i = 0; i < toFlush.size(); i++) {
QueueEntry entry = toFlush.get(i);
if (entry != null && (!syncData || entry.ackBeforeSync)) {
toFlush.set(i, null);
lastFlushPosition = bc.position();
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
for (QueueEntry e : toFlush) {
if (e != null) {
LOG.debug("Written and queuing for flush Ledger: {} Entry: {}",
e.ledgerId, e.entryId);
boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
// Trigger data sync to disk in the "Force-Write" thread.
// Trigger data sync to disk has three situations:
// 1. journalSyncData enabled, usually for SSD used as journal storage
// 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize
// 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use
// journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk
// synchronize frequently, which will increase disk io util.
// when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s),
// it will trigger data sync to disk
if (syncData
|| shouldRolloverJournal
|| (System.currentTimeMillis() - lastFlushTimeMs
>= journalPageCacheFlushIntervalMSec)) {
forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
toFlush, shouldRolloverJournal, false));
lastFlushTimeMs = System.currentTimeMillis();
toFlush = entryListRecycler.newInstance();
numEntriesToFlush = 0;
batchSize = 0L;
// check whether journal file is over file limit
if (shouldRolloverJournal) {
// if the journal file is rolled over, the journal file will be closed after last
// entry is force written to disk.
logFile = null;
if (!running) {"Journal Manager is asked to shut down, quit.");
if (qe == null) { // no more queue entry
if ((qe.entryId == BookieImpl.METAENTRY_ID_LEDGER_EXPLICITLAC)
&& (journalFormatVersionToWrite < JournalChannel.V6)) {
* this means we are using new code which supports
* persisting explicitLac, but "journalFormatVersionToWrite"
* is set to some older value (< V6). In this case we
* shouldn't write this special entry
} else if (qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
int entrySize = qe.entry.readableBytes();
batchSize += (4 + entrySize);
// preAlloc based on size
logFile.preAllocIfNeeded(4 + entrySize);
qe = null;
} catch (IOException ioe) {
LOG.error("I/O exception in Journal thread!", ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();"Journal exits when shutting down");
} finally {
// There could be packets queued for forceWrite on this logFile
// That is fine as this exception is going to anyway take down
// the bookie. If we execute this as a part of graceful shutdown,
// close will flush the file system cache making any previous
// cached writes durable so this is fine as well.
IOUtils.close(LOG, bc);
if (journalAliveListener != null) {
}"Journal exited loop!");
public BufferedChannelBuilder getBufferedChannelBuilder() {
return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
* Shuts down the journal.
public synchronized void shutdown() {
try {
if (!running) {
}"Shutting down Journal");
if (fileChannelProvider != null) {
if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
running = false;
this.join();"Finished Shutting down Journal thread");
} catch (IOException | InterruptedException ie) {
LOG.warn("Interrupted during shutting down journal : ", ie);
private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
int total = 0;
while (bb.remaining() > 0) {
int rc =;
if (rc <= 0) {
return total;
total += rc;
return total;
* Wait for the Journal thread to exit.
* This is method is needed in order to mock the journal, we can't mock final method of java.lang.Thread class
* @throws InterruptedException
public void joinThread() throws InterruptedException {
long getMemoryUsage() {
return memoryLimitController.currentUsage();