blob: 1d35b3296f6794a8d7ca8ce2d094de5449c704a4 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
import org.apache.bookkeeper.common.collections.RecyclableArrayList;
import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provide journal related management.
*/
public class Journal extends BookieCriticalThread implements CheckpointSource {
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
private static final RecyclableArrayList.Recycler<QueueEntry> entryListRecycler =
new RecyclableArrayList.Recycler<QueueEntry>();
private static final RecyclableArrayList<QueueEntry> EMPTY_ARRAY_LIST = new RecyclableArrayList<>();
/**
* Filter to pickup journals.
*/
public interface JournalIdFilter {
boolean accept(long journalId);
}
/**
* For testability.
*/
@FunctionalInterface
public interface BufferedChannelBuilder {
BufferedChannelBuilder DEFAULT_BCBUILDER = (FileChannel fc,
int capacity) -> new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
BufferedChannel create(FileChannel fc, int capacity) throws IOException;
}
/**
* List all journal ids by a specified journal id filer.
*
* @param journalDir journal dir
* @param filter journal id filter
* @return list of filtered ids
*/
static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
File[] logFiles = journalDir.listFiles();
if (logFiles == null || logFiles.length == 0) {
return Collections.emptyList();
}
List<Long> logs = new ArrayList<Long>();
for (File f: logFiles) {
String name = f.getName();
if (!name.endsWith(".txn")) {
continue;
}
String idString = name.split("\\.")[0];
long id = Long.parseLong(idString, 16);
if (filter != null) {
if (filter.accept(id)) {
logs.add(id);
}
} else {
logs.add(id);
}
}
Collections.sort(logs);
return logs;
}
/**
* A wrapper over log mark to provide a checkpoint for users of journal
* to do checkpointing.
*/
private static class LogMarkCheckpoint implements Checkpoint {
final LastLogMark mark;
public LogMarkCheckpoint(LastLogMark checkpoint) {
this.mark = checkpoint;
}
@Override
public int compareTo(Checkpoint o) {
if (o == Checkpoint.MAX) {
return -1;
} else if (o == Checkpoint.MIN) {
return 1;
}
return mark.getCurMark().compare(((LogMarkCheckpoint) o).mark.getCurMark());
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LogMarkCheckpoint)) {
return false;
}
return 0 == compareTo((LogMarkCheckpoint) o);
}
@Override
public int hashCode() {
return mark.hashCode();
}
@Override
public String toString() {
return mark.toString();
}
}
/**
* Last Log Mark.
*/
public class LastLogMark {
private final LogMark curMark;
LastLogMark(long logId, long logPosition) {
this.curMark = new LogMark(logId, logPosition);
}
void setCurLogMark(long logId, long logPosition) {
curMark.setLogMark(logId, logPosition);
}
LastLogMark markLog() {
return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset());
}
public LogMark getCurMark() {
return curMark;
}
void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
// we should record <logId, logPosition> marked in markLog
// which is safe since records before lastMark have been
// persisted to disk (both index & entry logger)
lastMark.getCurMark().writeLogMark(bb);
if (LOG.isDebugEnabled()) {
LOG.debug("RollLog to persist last marked log : {}", lastMark.getCurMark());
}
List<File> writableLedgerDirs = ledgerDirsManager
.getWritableLedgerDirsForNewLog();
for (File dir : writableLedgerDirs) {
File file = new File(dir, lastMarkFileName);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
fos.write(buff);
fos.getChannel().force(true);
fos.close();
fos = null;
} catch (IOException e) {
LOG.error("Problems writing to " + file, e);
} finally {
// if stream already closed in try block successfully,
// stream might have nullified, in such case below
// call will simply returns
IOUtils.close(LOG, fos);
}
}
}
/**
* Read last mark from lastMark file.
* The last mark should first be max journal log id,
* and then max log position in max journal log.
*/
void readLog() {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark();
for (File dir: ledgerDirsManager.getAllLedgerDirs()) {
File file = new File(dir, lastMarkFileName);
try {
try (FileInputStream fis = new FileInputStream(file)) {
int bytesRead = fis.read(buff);
if (bytesRead != 16) {
throw new IOException("Couldn't read enough bytes from lastMark."
+ " Wanted " + 16 + ", got " + bytesRead);
}
}
bb.clear();
mark.readLogMark(bb);
if (curMark.compare(mark) < 0) {
curMark.setLogMark(mark.getLogFileId(), mark.getLogFileOffset());
}
} catch (IOException e) {
LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this "
+ "bookie");
}
}
}
@Override
public String toString() {
return curMark.toString();
}
}
/**
* Filter to return list of journals for rolling.
*/
private static class JournalRollingFilter implements JournalIdFilter {
final LastLogMark lastMark;
JournalRollingFilter(LastLogMark lastMark) {
this.lastMark = lastMark;
}
@Override
public boolean accept(long journalId) {
return journalId < lastMark.getCurMark().getLogFileId();
}
}
/**
* Scanner used to scan a journal.
*/
public interface JournalScanner {
/**
* Process a journal entry.
*
* @param journalVersion Journal Version
* @param offset File offset of the journal entry
* @param entry Journal Entry
* @throws IOException
*/
void process(int journalVersion, long offset, ByteBuffer entry) throws IOException;
}
/**
* Journal Entry to Record.
*/
static class QueueEntry implements Runnable {
ByteBuf entry;
long ledgerId;
long entryId;
WriteCallback cb;
Object ctx;
long enqueueTime;
boolean ackBeforeSync;
OpStatsLogger journalAddEntryStats;
Counter journalCbQueueSize;
static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId,
WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats,
Counter journalCbQueueSize) {
QueueEntry qe = RECYCLER.get();
qe.entry = entry;
qe.ackBeforeSync = ackBeforeSync;
qe.cb = cb;
qe.ctx = ctx;
qe.ledgerId = ledgerId;
qe.entryId = entryId;
qe.enqueueTime = enqueueTime;
qe.journalAddEntryStats = journalAddEntryStats;
qe.journalCbQueueSize = journalCbQueueSize;
return qe;
}
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
}
journalCbQueueSize.dec();
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
cb.writeComplete(0, ledgerId, entryId, null, ctx);
recycle();
}
private final Handle<QueueEntry> recyclerHandle;
private QueueEntry(Handle<QueueEntry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private static final Recycler<QueueEntry> RECYCLER = new Recycler<QueueEntry>() {
@Override
protected QueueEntry newObject(Recycler.Handle<QueueEntry> handle) {
return new QueueEntry(handle);
}
};
private void recycle() {
recyclerHandle.recycle(this);
}
}
/**
* Token which represents the need to force a write to the Journal.
*/
@VisibleForTesting
public class ForceWriteRequest {
private JournalChannel logFile;
private RecyclableArrayList<QueueEntry> forceWriteWaiters;
private boolean shouldClose;
private boolean isMarker;
private long lastFlushedPosition;
private long logId;
private long enqueueTime;
public int process(boolean shouldForceWrite) throws IOException {
journalStats.getForceWriteQueueSize().dec();
journalStats.getFwEnqueueTimeStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
if (isMarker) {
return 0;
}
try {
if (shouldForceWrite) {
long startTime = MathUtils.nowInNano();
this.logFile.forceWrite(false);
journalStats.getJournalSyncStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
// Notify the waiters that the force write succeeded
for (int i = 0; i < forceWriteWaiters.size(); i++) {
QueueEntry qe = forceWriteWaiters.get(i);
if (qe != null) {
cbThreadPool.execute(qe);
}
}
return forceWriteWaiters.size();
} finally {
closeFileIfNecessary();
}
}
public void closeFileIfNecessary() {
// Close if shouldClose is set
if (shouldClose) {
// We should guard against exceptions so its
// safe to call in catch blocks
try {
logFile.close();
// Call close only once
shouldClose = false;
} catch (IOException ioe) {
LOG.error("I/O exception while closing file", ioe);
}
}
}
private final Handle<ForceWriteRequest> recyclerHandle;
private ForceWriteRequest(Handle<ForceWriteRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private void recycle() {
logFile = null;
if (forceWriteWaiters != null) {
forceWriteWaiters.recycle();
forceWriteWaiters = null;
}
recyclerHandle.recycle(this);
}
}
private ForceWriteRequest createForceWriteRequest(JournalChannel logFile,
long logId,
long lastFlushedPosition,
RecyclableArrayList<QueueEntry> forceWriteWaiters,
boolean shouldClose,
boolean isMarker) {
ForceWriteRequest req = forceWriteRequestsRecycler.get();
req.forceWriteWaiters = forceWriteWaiters;
req.logFile = logFile;
req.logId = logId;
req.lastFlushedPosition = lastFlushedPosition;
req.shouldClose = shouldClose;
req.isMarker = isMarker;
req.enqueueTime = MathUtils.nowInNano();
journalStats.getForceWriteQueueSize().inc();
return req;
}
private final Recycler<ForceWriteRequest> forceWriteRequestsRecycler = new Recycler<ForceWriteRequest>() {
@Override
protected ForceWriteRequest newObject(
Recycler.Handle<ForceWriteRequest> handle) {
return new ForceWriteRequest(handle);
}
};
/**
* ForceWriteThread is a background thread which makes the journal durable periodically.
*
*/
private class ForceWriteThread extends BookieCriticalThread {
volatile boolean running = true;
// This holds the queue entries that should be notified after a
// successful force write
Thread threadToNotifyOnEx;
// should we group force writes
private final boolean enableGroupForceWrites;
// make flush interval as a parameter
public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites) {
super("ForceWriteThread");
this.threadToNotifyOnEx = threadToNotifyOnEx;
this.enableGroupForceWrites = enableGroupForceWrites;
}
@Override
public void run() {
LOG.info("ForceWrite Thread started");
if (conf.isBusyWaitEnabled()) {
try {
CpuAffinity.acquireCore();
} catch (Exception e) {
LOG.warn("Unable to acquire CPU core for Journal ForceWrite thread: {}", e.getMessage(), e);
}
}
boolean shouldForceWrite = true;
int numReqInLastForceWrite = 0;
while (running) {
ForceWriteRequest req = null;
try {
req = forceWriteRequests.take();
// Force write the file and then notify the write completions
//
if (!req.isMarker) {
if (shouldForceWrite) {
// if we are going to force write, any request that is already in the
// queue will benefit from this force write - post a marker prior to issuing
// the flush so until this marker is encountered we can skip the force write
if (enableGroupForceWrites) {
forceWriteRequests.put(createForceWriteRequest(req.logFile, 0, 0, null, false, true));
}
// If we are about to issue a write, record the number of requests in
// the last force write and then reset the counter so we can accumulate
// requests in the write we are about to issue
if (numReqInLastForceWrite > 0) {
journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);
numReqInLastForceWrite = 0;
}
}
}
numReqInLastForceWrite += req.process(shouldForceWrite);
if (enableGroupForceWrites
// if its a marker we should switch back to flushing
&& !req.isMarker
// This indicates that this is the last request in a given file
// so subsequent requests will go to a different file so we should
// flush on the next request
&& !req.shouldClose) {
shouldForceWrite = false;
} else {
shouldForceWrite = true;
}
} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("ForceWrite thread interrupted");
// close is idempotent
if (null != req) {
req.shouldClose = true;
req.closeFileIfNecessary();
}
running = false;
} finally {
if (req != null) {
req.recycle();
}
}
}
// Regardless of what caused us to exit, we should notify the
// the parent thread as it should either exit or be in the process
// of exiting else we will have write requests hang
threadToNotifyOnEx.interrupt();
}
// shutdown sync thread
void shutdown() throws InterruptedException {
running = false;
this.interrupt();
this.join();
}
}
static final int PADDING_MASK = -0x100;
static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int journalAlignSize)
throws IOException {
int bytesToAlign = (int) (jc.bc.position() % journalAlignSize);
if (0 != bytesToAlign) {
int paddingBytes = journalAlignSize - bytesToAlign;
if (paddingBytes < 8) {
paddingBytes = journalAlignSize - (8 - paddingBytes);
} else {
paddingBytes -= 8;
}
paddingBuffer.clear();
// padding mask
paddingBuffer.writeInt(PADDING_MASK);
// padding len
paddingBuffer.writeInt(paddingBytes);
// padding bytes
paddingBuffer.writerIndex(paddingBuffer.writerIndex() + paddingBytes);
jc.preAllocIfNeeded(paddingBuffer.readableBytes());
// write padding bytes
jc.bc.write(paddingBuffer);
}
}
static final long MB = 1024 * 1024L;
static final int KB = 1024;
// max journal file size
final long maxJournalSize;
// pre-allocation size for the journal files
final long journalPreAllocSize;
// write buffer size for the journal files
final int journalWriteBufferSize;
// number journal files kept before marked journal
final int maxBackupJournals;
final File journalDirectory;
final ServerConfiguration conf;
final ForceWriteThread forceWriteThread;
// Time after which we will stop grouping and issue the flush
private final long maxGroupWaitInNanos;
// Threshold after which we flush any buffered journal entries
private final long bufferedEntriesThreshold;
// Threshold after which we flush any buffered journal writes
private final long bufferedWritesThreshold;
// should we flush if the queue is empty
private final boolean flushWhenQueueEmpty;
// should we hint the filesystem to remove pages from cache after force write
private final boolean removePagesFromCache;
private final int journalFormatVersionToWrite;
private final int journalAlignmentSize;
// control PageCache flush interval when syncData disabled to reduce disk io util
private final long journalPageCacheFlushIntervalMSec;
// Should data be fsynced on disk before triggering the callback
private final boolean syncData;
private final LastLogMark lastLogMark = new LastLogMark(0, 0);
private static final String LAST_MARK_DEFAULT_NAME = "lastMark";
private final String lastMarkFileName;
/**
* The thread pool used to handle callback.
*/
private final ExecutorService cbThreadPool;
// journal entry queue to commit
final BlockingQueue<QueueEntry> queue;
final BlockingQueue<ForceWriteRequest> forceWriteRequests;
volatile boolean running = true;
private final LedgerDirsManager ledgerDirsManager;
private final ByteBufAllocator allocator;
private final MemoryLimitController memoryLimitController;
// Expose Stats
private final JournalStats journalStats;
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) {
this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE,
UnpooledByteBufAllocator.DEFAULT);
}
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) {
super("BookieJournal-" + conf.getBookiePort());
this.allocator = allocator;
if (conf.isBusyWaitEnabled()) {
// To achieve lower latency, use busy-wait blocking queue implementation
queue = new BlockingMpscQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize());
} else {
queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
}
// Adjust the journal max memory in case there are multiple journals configured.
long journalMaxMemory = conf.getJournalMaxMemorySizeMb() / conf.getJournalDirNames().length * 1024 * 1024;
this.memoryLimitController = new MemoryLimitController(journalMaxMemory);
this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
this.journalDirectory = journalDirectory;
this.maxJournalSize = conf.getMaxJournalSizeMB() * MB;
this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
this.syncData = conf.getJournalSyncData();
this.maxBackupJournals = conf.getMaxBackupJournals();
this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
this.journalAlignmentSize = conf.getJournalAlignmentSize();
this.journalPageCacheFlushIntervalMSec = conf.getJournalPageCacheFlushIntervalMSec();
if (conf.getNumJournalCallbackThreads() > 0) {
this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
new DefaultThreadFactory("bookie-journal-callback"));
} else {
this.cbThreadPool = MoreExecutors.newDirectExecutorService();
}
// Unless there is a cap on the max wait (which requires group force writes)
// we cannot skip flushing for queue empty
this.flushWhenQueueEmpty = maxGroupWaitInNanos <= 0 || conf.getJournalFlushWhenQueueEmpty();
this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
// read last log mark
if (conf.getJournalDirs().length == 1) {
lastMarkFileName = LAST_MARK_DEFAULT_NAME;
} else {
lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
}
lastLogMark.readLog();
if (LOG.isDebugEnabled()) {
LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
}
// Expose Stats
this.journalStats = new JournalStats(statsLogger);
}
JournalStats getJournalStats() {
return this.journalStats;
}
public File getJournalDirectory() {
return journalDirectory;
}
public LastLogMark getLastLogMark() {
return lastLogMark;
}
/**
* Update lastLogMark of the journal
* Indicates that the file has been processed.
* @param id
* @param scanOffset
*/
void setLastLogMark(Long id, long scanOffset) {
lastLogMark.setCurLogMark(id, scanOffset);
}
/**
* Application tried to schedule a checkpoint. After all the txns added
* before checkpoint are persisted, a <i>checkpoint</i> will be returned
* to application. Application could use <i>checkpoint</i> to do its logic.
*/
@Override
public Checkpoint newCheckpoint() {
return new LogMarkCheckpoint(lastLogMark.markLog());
}
/**
* Telling journal a checkpoint is finished.
*
* @throws IOException
*/
@Override
public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
if (!(checkpoint instanceof LogMarkCheckpoint)) {
return; // we didn't create this checkpoint, so dont do anything with it
}
LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint;
LastLogMark mark = lmcheckpoint.mark;
mark.rollLog(mark);
if (compact) {
// list the journals that have been marked
List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter(mark));
// keep MAX_BACKUP_JOURNALS journal files before marked journal
if (logs.size() >= maxBackupJournals) {
int maxIdx = logs.size() - maxBackupJournals;
for (int i = 0; i < maxIdx; i++) {
long id = logs.get(i);
// make sure the journal id is smaller than marked journal id
if (id < mark.getCurMark().getLogFileId()) {
File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
if (!journalFile.delete()) {
LOG.warn("Could not delete old journal file {}", journalFile);
}
LOG.info("garbage collected journal " + journalFile.getName());
}
}
}
}
}
/**
* Scan the journal.
*
* @param journalId Journal Log Id
* @param journalPos Offset to start scanning
* @param scanner Scanner to handle entries
* @return scanOffset - represents the byte till which journal was read
* @throws IOException
*/
public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
} else {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
journalPos);
}
int journalVersion = recLog.getFormatVersion();
try {
ByteBuffer lenBuff = ByteBuffer.allocate(4);
ByteBuffer recBuff = ByteBuffer.allocate(64 * 1024);
while (true) {
// entry start offset
long offset = recLog.fc.position();
// start reading entry
lenBuff.clear();
fullRead(recLog, lenBuff);
if (lenBuff.remaining() != 0) {
break;
}
lenBuff.flip();
int len = lenBuff.getInt();
if (len == 0) {
break;
}
boolean isPaddingRecord = false;
if (len < 0) {
if (len == PADDING_MASK && journalVersion >= JournalChannel.V5) {
// skip padding bytes
lenBuff.clear();
fullRead(recLog, lenBuff);
if (lenBuff.remaining() != 0) {
break;
}
lenBuff.flip();
len = lenBuff.getInt();
if (len == 0) {
continue;
}
isPaddingRecord = true;
} else {
LOG.error("Invalid record found with negative length: {}", len);
throw new IOException("Invalid record found with negative length " + len);
}
}
recBuff.clear();
if (recBuff.remaining() < len) {
recBuff = ByteBuffer.allocate(len);
}
recBuff.limit(len);
if (fullRead(recLog, recBuff) != len) {
// This seems scary, but it just means that this is where we
// left off writing
break;
}
recBuff.flip();
if (!isPaddingRecord) {
scanner.process(journalVersion, offset, recBuff);
}
}
return recLog.fc.position();
} finally {
recLog.close();
}
}
public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
}
/**
* record an add entry operation in journal.
*/
public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
}
@VisibleForTesting
public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
// Retain entry until it gets written to journal
entry.retain();
journalStats.getJournalQueueSize().inc();
journalStats.getJournalCbQueueSize().inc();
memoryLimitController.reserveMemory(entry.readableBytes());
queue.put(QueueEntry.create(
entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
journalStats.getJournalAddEntryStats(),
journalStats.getJournalCbQueueSize()));
}
void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
queue.add(QueueEntry.create(
null, false /* ackBeforeSync */, ledgerId,
BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
journalStats.getJournalForceLedgerStats(),
journalStats.getJournalCbQueueSize()));
// Increment afterwards because the add operation could fail.
journalStats.getJournalQueueSize().inc();
journalStats.getJournalCbQueueSize().inc();
}
/**
* Get the length of journal entries queue.
*
* @return length of journal entry queue.
*/
public int getJournalQueueLength() {
return queue.size();
}
/**
* A thread used for persisting journal entries to journal files.
*
* <p>
* Besides persisting journal entries, it also takes responsibility of
* rolling journal files when a journal file reaches journal file size
* limitation.
* </p>
* <p>
* During journal rolling, it first closes the writing journal, generates
* new journal file using current timestamp, and continue persistence logic.
* Those journals will be garbage collected in SyncThread.
* </p>
* @see org.apache.bookkeeper.bookie.SyncThread
*/
@Override
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
if (conf.isBusyWaitEnabled()) {
try {
CpuAffinity.acquireCore();
} catch (Exception e) {
LOG.warn("Unable to acquire CPU core for Journal thread: {}", e.getMessage(), e);
}
}
RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
int numEntriesToFlush = 0;
ByteBuf lenBuff = Unpooled.buffer(4);
ByteBuf paddingBuff = Unpooled.buffer(2 * conf.getJournalAlignmentSize());
paddingBuff.writeZero(paddingBuff.capacity());
BufferedChannel bc = null;
JournalChannel logFile = null;
forceWriteThread.start();
Stopwatch journalCreationWatcher = Stopwatch.createUnstarted();
Stopwatch journalFlushWatcher = Stopwatch.createUnstarted();
long batchSize = 0;
try {
List<Long> journalIds = listJournalIds(journalDirectory, null);
// Should not use MathUtils.now(), which use System.nanoTime() and
// could only be used to measure elapsed time.
// http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
long lastFlushPosition = 0;
boolean groupWhenTimeout = false;
long dequeueStartTime = 0L;
long lastFlushTimeMs = System.currentTimeMillis();
QueueEntry qe = null;
while (true) {
// new journal file to write
if (null == logFile) {
logId = logId + 1;
journalCreationWatcher.reset().start();
logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
journalAlignmentSize, removePagesFromCache,
journalFormatVersionToWrite, getBufferedChannelBuilder());
journalStats.getJournalCreationStats().registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
bc = logFile.getBufferedChannel();
lastFlushPosition = bc.position();
}
if (qe == null) {
if (dequeueStartTime != 0) {
journalStats.getJournalProcessTimeStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS);
}
if (numEntriesToFlush == 0) {
qe = queue.take();
dequeueStartTime = MathUtils.nowInNano();
journalStats.getJournalQueueSize().dec();
journalStats.getJournalQueueStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
} else {
long pollWaitTimeNanos = maxGroupWaitInNanos
- MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
pollWaitTimeNanos = 0;
}
qe = queue.poll(pollWaitTimeNanos, TimeUnit.NANOSECONDS);
dequeueStartTime = MathUtils.nowInNano();
if (qe != null) {
journalStats.getJournalQueueSize().dec();
journalStats.getJournalQueueStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
}
boolean shouldFlush = false;
// We should issue a forceWrite if any of the three conditions below holds good
// 1. If the oldest pending entry has been pending for longer than the max wait time
if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils
.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
groupWhenTimeout = true;
} else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
&& (qe == null // no entry to group
|| MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) {
// when group timeout, it would be better to look forward, as there might be lots of
// entries already timeout
// due to a previous slow write (writing to filesystem which impacted by force write).
// Group those entries in the queue
// a) already timeout
// b) limit the number of entries to group
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxWaitCounter().inc();
} else if (qe != null
&& ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold)
|| (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
// 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxOutstandingBytesCounter().inc();
} else if (qe == null && flushWhenQueueEmpty) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
// for timeout that would put is past the maxWait threshold
// 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
// publish at a time - common case in tests.
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushEmptyQueueCounter().inc();
}
// toFlush is non null and not empty so should be safe to access getFirst
if (shouldFlush) {
if (journalFormatVersionToWrite >= JournalChannel.V5) {
writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
}
journalFlushWatcher.reset().start();
bc.flush();
for (int i = 0; i < toFlush.size(); i++) {
QueueEntry entry = toFlush.get(i);
if (entry != null && (!syncData || entry.ackBeforeSync)) {
toFlush.set(i, null);
numEntriesToFlush--;
cbThreadPool.execute(entry);
}
}
lastFlushPosition = bc.position();
journalStats.getJournalFlushStats().registerSuccessfulEvent(
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
for (QueueEntry e : toFlush) {
if (e != null) {
LOG.debug("Written and queuing for flush Ledger: {} Entry: {}",
e.ledgerId, e.entryId);
}
}
}
journalStats.getForceWriteBatchEntriesStats()
.registerSuccessfulValue(numEntriesToFlush);
journalStats.getForceWriteBatchBytesStats()
.registerSuccessfulValue(batchSize);
boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
// Trigger data sync to disk in the "Force-Write" thread.
// Trigger data sync to disk has three situations:
// 1. journalSyncData enabled, usually for SSD used as journal storage
// 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize
// 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use
// journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk
// synchronize frequently, which will increase disk io util.
// when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s),
// it will trigger data sync to disk
if (syncData
|| shouldRolloverJournal
|| (System.currentTimeMillis() - lastFlushTimeMs
>= journalPageCacheFlushIntervalMSec)) {
forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
toFlush, shouldRolloverJournal, false));
lastFlushTimeMs = System.currentTimeMillis();
}
toFlush = entryListRecycler.newInstance();
numEntriesToFlush = 0;
batchSize = 0L;
// check whether journal file is over file limit
if (shouldRolloverJournal) {
// if the journal file is rolled over, the journal file will be closed after last
// entry is force written to disk.
logFile = null;
continue;
}
}
}
}
if (!running) {
LOG.info("Journal Manager is asked to shut down, quit.");
break;
}
if (qe == null) { // no more queue entry
continue;
}
if ((qe.entryId == BookieImpl.METAENTRY_ID_LEDGER_EXPLICITLAC)
&& (journalFormatVersionToWrite < JournalChannel.V6)) {
/*
* this means we are using new code which supports
* persisting explicitLac, but "journalFormatVersionToWrite"
* is set to some older value (< V6). In this case we
* shouldn't write this special entry
* (METAENTRY_ID_LEDGER_EXPLICITLAC) to Journal.
*/
memoryLimitController.releaseMemory(qe.entry.readableBytes());
qe.entry.release();
} else if (qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
int entrySize = qe.entry.readableBytes();
journalStats.getJournalWriteBytes().add(entrySize);
batchSize += (4 + entrySize);
lenBuff.clear();
lenBuff.writeInt(entrySize);
// preAlloc based on size
logFile.preAllocIfNeeded(4 + entrySize);
bc.write(lenBuff);
bc.write(qe.entry);
memoryLimitController.releaseMemory(qe.entry.readableBytes());
qe.entry.release();
}
toFlush.add(qe);
numEntriesToFlush++;
qe = null;
}
} catch (IOException ioe) {
LOG.error("I/O exception in Journal thread!", ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.info("Journal exits when shutting down");
} finally {
// There could be packets queued for forceWrite on this logFile
// That is fine as this exception is going to anyway take down the
// the bookie. If we execute this as a part of graceful shutdown,
// close will flush the file system cache making any previous
// cached writes durable so this is fine as well.
IOUtils.close(LOG, bc);
}
LOG.info("Journal exited loop!");
}
public BufferedChannelBuilder getBufferedChannelBuilder() {
return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
}
/**
* Shuts down the journal.
*/
public synchronized void shutdown() {
try {
if (!running) {
return;
}
LOG.info("Shutting down Journal");
forceWriteThread.shutdown();
cbThreadPool.shutdown();
if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
}
cbThreadPool.shutdownNow();
running = false;
this.interrupt();
this.join();
LOG.info("Finished Shutting down Journal thread");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted during shutting down journal : ", ie);
}
}
private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
int total = 0;
while (bb.remaining() > 0) {
int rc = fc.read(bb);
if (rc <= 0) {
return total;
}
total += rc;
}
return total;
}
//
/**
* Wait for the Journal thread to exit.
* This is method is needed in order to mock the journal, we can't mock final method of java.lang.Thread class
*
* @throws InterruptedException
*/
@VisibleForTesting
public void joinThread() throws InterruptedException {
join();
}
long getMemoryUsage() {
return memoryLimitController.currentUsage();
}
}