blob: 72f337020c9087dde506a4a658c36f4c9b75588e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provide journal related management.
*/
class Journal extends Thread {
static Logger LOG = LoggerFactory.getLogger(Journal.class);
/**
* Filter to pickup journals
*/
private static interface JournalIdFilter {
public boolean accept(long journalId);
}
/**
* List all journal ids by a specified journal id filer
*
* @param journalDir journal dir
* @param filter journal id filter
* @return list of filtered ids
*/
private static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
File logFiles[] = journalDir.listFiles();
List<Long> logs = new ArrayList<Long>();
for(File f: logFiles) {
String name = f.getName();
if (!name.endsWith(".txn")) {
continue;
}
String idString = name.split("\\.")[0];
long id = Long.parseLong(idString, 16);
if (filter != null) {
if (filter.accept(id)) {
logs.add(id);
}
} else {
logs.add(id);
}
}
Collections.sort(logs);
return logs;
}
/**
* Last Log Mark
*/
class LastLogMark {
private long txnLogId;
private long txnLogPosition;
private LastLogMark lastMark;
LastLogMark(long logId, long logPosition) {
this.txnLogId = logId;
this.txnLogPosition = logPosition;
}
synchronized void setLastLogMark(long logId, long logPosition) {
txnLogId = logId;
txnLogPosition = logPosition;
}
synchronized void markLog() {
lastMark = new LastLogMark(txnLogId, txnLogPosition);
}
synchronized LastLogMark getLastMark() {
return lastMark;
}
synchronized long getTxnLogId() {
return txnLogId;
}
synchronized long getTxnLogPosition() {
return txnLogPosition;
}
synchronized void rollLog() throws NoWritableLedgerDirException {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
// we should record <logId, logPosition> marked in markLog
// which is safe since records before lastMark have been
// persisted to disk (both index & entry logger)
bb.putLong(lastMark.getTxnLogId());
bb.putLong(lastMark.getTxnLogPosition());
LOG.debug("RollLog to persist last marked log : {}", lastMark);
List<File> writableLedgerDirs = ledgerDirsManager
.getWritableLedgerDirs();
for (File dir : writableLedgerDirs) {
File file = new File(dir, "lastMark");
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
fos.write(buff);
fos.getChannel().force(true);
fos.close();
fos = null;
} catch (IOException e) {
LOG.error("Problems writing to " + file, e);
} finally {
// if stream already closed in try block successfully,
// stream might have nullified, in such case below
// call will simply returns
IOUtils.close(LOG, fos);
}
}
}
/**
* Read last mark from lastMark file.
* The last mark should first be max journal log id,
* and then max log position in max journal log.
*/
synchronized void readLog() {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
for(File dir: ledgerDirsManager.getAllLedgerDirs()) {
File file = new File(dir, "lastMark");
try {
FileInputStream fis = new FileInputStream(file);
try {
int bytesRead = fis.read(buff);
if (bytesRead != 16) {
throw new IOException("Couldn't read enough bytes from lastMark."
+ " Wanted " + 16 + ", got " + bytesRead);
}
} finally {
fis.close();
}
bb.clear();
long i = bb.getLong();
long p = bb.getLong();
if (i > txnLogId) {
txnLogId = i;
if(p > txnLogPosition) {
txnLogPosition = p;
}
}
} catch (IOException e) {
LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
}
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("LastMark: logId - ").append(txnLogId)
.append(" , position - ").append(txnLogPosition);
return sb.toString();
}
}
/**
* Filter to return list of journals for rolling
*/
private class JournalRollingFilter implements JournalIdFilter {
@Override
public boolean accept(long journalId) {
if (journalId < lastLogMark.getLastMark().getTxnLogId()) {
return true;
} else {
return false;
}
}
}
/**
* Scanner used to scan a journal
*/
public static interface JournalScanner {
/**
* Process a journal entry.
*
* @param journalVersion
* Journal Version
* @param offset
* File offset of the journal entry
* @param entry
* Journal Entry
* @throws IOException
*/
public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException;
}
/**
* Journal Entry to Record
*/
private static class QueueEntry {
QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
WriteCallback cb, Object ctx) {
this.entry = entry.duplicate();
this.cb = cb;
this.ctx = ctx;
this.ledgerId = ledgerId;
this.entryId = entryId;
}
ByteBuffer entry;
long ledgerId;
long entryId;
WriteCallback cb;
Object ctx;
}
final static long MB = 1024 * 1024L;
// max journal file size
final long maxJournalSize;
// number journal files kept before marked journal
final int maxBackupJournals;
final File journalDirectory;
final ServerConfiguration conf;
private LastLogMark lastLogMark = new LastLogMark(0, 0);
// journal entry queue to commit
LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
volatile boolean running = true;
private LedgerDirsManager ledgerDirsManager;
public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
super("BookieJournal-" + conf.getBookiePort());
this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
this.maxJournalSize = conf.getMaxJournalSize() * MB;
this.maxBackupJournals = conf.getMaxBackupJournals();
// read last log mark
lastLogMark.readLog();
LOG.debug("Last Log Mark : {}", lastLogMark);
}
LastLogMark getLastLogMark() {
return lastLogMark;
}
/**
* Records a <i>LastLogMark</i> in memory.
*
* <p>
* The <i>LastLogMark</i> contains two parts: first one is <i>txnLogId</i>
* (file id of a journal) and the second one is <i>txnLogPos</i> (offset in
* a journal). The <i>LastLogMark</i> indicates that those entries before
* it have been persisted to both index and entry log files.
* </p>
*
* <p>
* This method is called before flushing entry log files and ledger cache.
* </p>
*/
public void markLog() {
lastLogMark.markLog();
}
/**
* Persists the <i>LastLogMark</i> marked by #markLog() to disk.
*
* <p>
* This action means entries added before <i>LastLogMark</i> whose entry data
* and index pages were already persisted to disk. It is the time to safely
* remove journal files created earlier than <i>LastLogMark.txnLogId</i>.
* </p>
* <p>
* If the bookie has crashed before persisting <i>LastLogMark</i> to disk,
* it still has journal files contains entries for which index pages may not
* have been persisted. Consequently, when the bookie restarts, it inspects
* journal files to restore those entries; data isn't lost.
* </p>
* <p>
* This method is called after flushing entry log files and ledger cache successfully, which is to ensure <i>LastLogMark</i> is pesisted.
* </p>
* @see #markLog()
*/
public void rollLog() throws NoWritableLedgerDirException {
lastLogMark.rollLog();
}
/**
* Garbage collect older journals
*/
public void gcJournals() {
// list the journals that have been marked
List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter());
// keep MAX_BACKUP_JOURNALS journal files before marked journal
if (logs.size() >= maxBackupJournals) {
int maxIdx = logs.size() - maxBackupJournals;
for (int i=0; i<maxIdx; i++) {
long id = logs.get(i);
// make sure the journal id is smaller than marked journal id
if (id < lastLogMark.getLastMark().getTxnLogId()) {
File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
if (!journalFile.delete()) {
LOG.warn("Could not delete old journal file {}", journalFile);
}
LOG.info("garbage collected journal " + journalFile.getName());
}
}
}
}
/**
* Scan the journal
*
* @param journalId
* Journal Log Id
* @param journalPos
* Offset to start scanning
* @param scanner
* Scanner to handle entries
* @throws IOException
*/
public void scanJournal(long journalId, long journalPos, JournalScanner scanner)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
recLog = new JournalChannel(journalDirectory, journalId);
} else {
recLog = new JournalChannel(journalDirectory, journalId, journalPos);
}
int journalVersion = recLog.getFormatVersion();
try {
ByteBuffer lenBuff = ByteBuffer.allocate(4);
ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
while(true) {
// entry start offset
long offset = recLog.fc.position();
// start reading entry
lenBuff.clear();
fullRead(recLog, lenBuff);
if (lenBuff.remaining() != 0) {
break;
}
lenBuff.flip();
int len = lenBuff.getInt();
if (len == 0) {
break;
}
recBuff.clear();
if (recBuff.remaining() < len) {
recBuff = ByteBuffer.allocate(len);
}
recBuff.limit(len);
if (fullRead(recLog, recBuff) != len) {
// This seems scary, but it just means that this is where we
// left off writing
break;
}
recBuff.flip();
scanner.process(journalVersion, offset, recBuff);
}
} finally {
recLog.close();
}
}
/**
* Replay journal files
*
* @param scanner
* Scanner to process replayed entries.
* @throws IOException
*/
public void replay(JournalScanner scanner) throws IOException {
final long markedLogId = lastLogMark.getTxnLogId();
List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
@Override
public boolean accept(long journalId) {
if (journalId < markedLogId) {
return false;
}
return true;
}
});
// last log mark may be missed due to no sync up before
// validate filtered log ids only when we have markedLogId
if (markedLogId > 0) {
if (logs.size() == 0 || logs.get(0) != markedLogId) {
throw new IOException("Recovery log " + markedLogId + " is missing");
}
}
LOG.debug("Try to relay journal logs : {}", logs);
// TODO: When reading in the journal logs that need to be synced, we
// should use BufferedChannels instead to minimize the amount of
// system calls done.
for(Long id: logs) {
long logPosition = 0L;
if(id == markedLogId) {
logPosition = lastLogMark.getTxnLogPosition();
}
LOG.info("Replaying journal {} from position {}", id, logPosition);
scanJournal(id, logPosition, scanner);
}
}
/**
* record an add entry operation in journal
*/
public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
long ledgerId = entry.getLong();
long entryId = entry.getLong();
entry.rewind();
queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
}
/**
* Get the length of journal entries queue.
*
* @return length of journal entry queue.
*/
public int getJournalQueueLength() {
return queue.size();
}
/**
* A thread used for persisting journal entries to journal files.
*
* <p>
* Besides persisting journal entries, it also takes responsibility of
* rolling journal files when a journal file reaches journal file size
* limitation.
* </p>
* <p>
* During journal rolling, it first closes the writing journal, generates
* new journal file using current timestamp, and continue persistence logic.
* Those journals will be garbage collected in SyncThread.
* </p>
* @see Bookie#SyncThread
*/
@Override
public void run() {
LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
ByteBuffer lenBuff = ByteBuffer.allocate(4);
JournalChannel logFile = null;
try {
List<Long> journalIds = listJournalIds(journalDirectory, null);
// Should not use MathUtils.now(), which use System.nanoTime() and
// could only be used to measure elapsed time.
// http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
BufferedChannel bc = null;
long lastFlushPosition = 0;
QueueEntry qe = null;
while (true) {
// new journal file to write
if (null == logFile) {
logId = logId + 1;
logFile = new JournalChannel(journalDirectory, logId);
bc = logFile.getBufferedChannel();
lastFlushPosition = 0;
}
if (qe == null) {
if (toFlush.isEmpty()) {
qe = queue.take();
} else {
qe = queue.poll();
if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
//logFile.force(false);
bc.flush(true);
lastFlushPosition = bc.position();
lastLogMark.setLastLogMark(logId, lastFlushPosition);
for (QueueEntry e : toFlush) {
e.cb.writeComplete(BookieException.Code.OK,
e.ledgerId, e.entryId, null, e.ctx);
}
toFlush.clear();
// check whether journal file is over file limit
if (bc.position() > maxJournalSize) {
logFile.close();
logFile = null;
continue;
}
}
}
}
if (!running) {
LOG.info("Journal Manager is asked to shut down, quit.");
break;
}
if (qe == null) { // no more queue entry
continue;
}
lenBuff.clear();
lenBuff.putInt(qe.entry.remaining());
lenBuff.flip();
//
// we should be doing the following, but then we run out of
// direct byte buffers
// logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
bc.write(lenBuff);
bc.write(qe.entry);
logFile.preAllocIfNeeded();
toFlush.add(qe);
qe = null;
}
logFile.close();
logFile = null;
} catch (IOException ioe) {
LOG.error("I/O exception in Journal thread!", ioe);
} catch (InterruptedException ie) {
LOG.warn("Journal exits when shutting down", ie);
} finally {
IOUtils.close(LOG, logFile);
}
}
/**
* Shuts down the journal.
*/
public synchronized void shutdown() {
try {
if (!running) {
return;
}
running = false;
this.interrupt();
this.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted during shutting down journal : ", ie);
}
}
private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
int total = 0;
while(bb.remaining() > 0) {
int rc = fc.read(bb);
if (rc <= 0) {
return total;
}
total += rc;
}
return total;
}
}