blob: 953fcea946b6d088e8194cd45ca0a5f54a59324d [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the garbage collector thread that runs in the background to
* remove any entry log files that no longer contains any active ledger.
*/
public class GarbageCollectorThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
private static final int SECOND = 1000;
// Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
private Map<Long, EntryLogMetadata> entryLogMetaMap = new ConcurrentHashMap<Long, EntryLogMetadata>();
// This is how often we want to run the Garbage Collector Thread (in milliseconds).
final long gcWaitTime;
// Compaction parameters
boolean enableMinorCompaction = false;
final double minorCompactionThreshold;
final long minorCompactionInterval;
boolean enableMajorCompaction = false;
final double majorCompactionThreshold;
final long majorCompactionInterval;
long lastMinorCompactionTime;
long lastMajorCompactionTime;
final int maxOutstandingRequests;
final int compactionRate;
final CompactionScannerFactory scannerFactory;
// Entry Logger Handle
final EntryLogger entryLogger;
// Ledger Cache Handle
final LedgerCache ledgerCache;
final SnapshotMap<Long, Boolean> activeLedgers;
// flag to ensure gc thread will not be interrupted during compaction
// to reduce the risk getting entry log corrupted
final AtomicBoolean compacting = new AtomicBoolean(false);
volatile boolean running = true;
// track the last scanned successfully log id
long scannedLogId = 0;
final GarbageCollector garbageCollector;
final GarbageCleaner garbageCleaner;
private static class Offset {
final long ledger;
final long entry;
final long offset;
Offset(long ledger, long entry, long offset) {
this.ledger = ledger;
this.entry = entry;
this.offset = offset;
}
}
/**
* A scanner wrapper to check whether a ledger is alive in an entry log file
*/
class CompactionScannerFactory implements EntryLogger.EntryLogListener {
List<Offset> offsets = new ArrayList<Offset>();
EntryLogScanner newScanner(final EntryLogMetadata meta) {
final RateLimiter rateLimiter = RateLimiter.create(compactionRate);
return new EntryLogScanner() {
@Override
public boolean accept(long ledgerId) {
return meta.containsLedger(ledgerId);
}
@Override
public void process(final long ledgerId, long offset, ByteBuffer entry)
throws IOException {
rateLimiter.acquire();
synchronized (CompactionScannerFactory.this) {
if (offsets.size() > maxOutstandingRequests) {
waitEntrylogFlushed();
}
entry.getLong(); // discard ledger id, we already have it
long entryId = entry.getLong();
entry.rewind();
long newoffset = entryLogger.addEntry(ledgerId, entry);
flushed.set(false);
offsets.add(new Offset(ledgerId, entryId, newoffset));
}
}
};
}
AtomicBoolean flushed = new AtomicBoolean(false);
Object flushLock = new Object();
@Override
public void onEntryLogFlushed() {
synchronized (flushLock) {
flushed.set(true);
flushLock.notifyAll();
}
}
synchronized private void waitEntrylogFlushed() throws IOException {
try {
synchronized (flushLock) {
while (!flushed.get()
&& entryLogger.isFlushRequired()
&& running) {
flushLock.wait(1000);
}
if (!flushed.get()
&& entryLogger.isFlushRequired()
&& !running) {
throw new IOException("Shutdown before flushed");
}
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for flush", ie);
}
for (Offset o : offsets) {
ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset);
}
offsets.clear();
}
synchronized void flush() throws IOException {
waitEntrylogFlushed();
ledgerCache.flushLedger(true);
}
}
/**
* Create a garbage collector thread.
*
* @param conf
* Server Configuration Object.
* @throws IOException
*/
public GarbageCollectorThread(ServerConfiguration conf,
final LedgerCache ledgerCache,
EntryLogger entryLogger,
SnapshotMap<Long, Boolean> activeLedgers,
LedgerManager ledgerManager)
throws IOException {
super("GarbageCollectorThread");
this.ledgerCache = ledgerCache;
this.entryLogger = entryLogger;
this.activeLedgers = activeLedgers;
this.gcWaitTime = conf.getGcWaitTime();
this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests();
this.compactionRate = conf.getCompactionRate();
this.scannerFactory = new CompactionScannerFactory();
entryLogger.addListener(this.scannerFactory);
this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("delete ledger : " + ledgerId);
}
ledgerCache.deleteLedger(ledgerId);
} catch (IOException e) {
LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
}
}
};
this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers);
// compaction parameters
minorCompactionThreshold = conf.getMinorCompactionThreshold();
minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
majorCompactionThreshold = conf.getMajorCompactionThreshold();
majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
if (minorCompactionThreshold > 1.0f) {
throw new IOException("Invalid minor compaction threshold "
+ minorCompactionThreshold);
}
if (minorCompactionInterval <= gcWaitTime) {
throw new IOException("Too short minor compaction interval : "
+ minorCompactionInterval);
}
enableMinorCompaction = true;
}
if (majorCompactionInterval > 0 && majorCompactionThreshold > 0) {
if (majorCompactionThreshold > 1.0f) {
throw new IOException("Invalid major compaction threshold "
+ majorCompactionThreshold);
}
if (majorCompactionInterval <= gcWaitTime) {
throw new IOException("Too short major compaction interval : "
+ majorCompactionInterval);
}
enableMajorCompaction = true;
}
if (enableMinorCompaction && enableMajorCompaction) {
if (minorCompactionInterval >= majorCompactionInterval ||
minorCompactionThreshold >= majorCompactionThreshold) {
throw new IOException("Invalid minor/major compaction settings : minor ("
+ minorCompactionThreshold + ", " + minorCompactionInterval
+ "), major (" + majorCompactionThreshold + ", "
+ majorCompactionInterval + ")");
}
}
LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
+ minorCompactionThreshold + ", interval=" + minorCompactionInterval);
LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
+ majorCompactionThreshold + ", interval=" + majorCompactionInterval);
lastMinorCompactionTime = lastMajorCompactionTime = MathUtils.now();
}
@Override
public void run() {
while (running) {
synchronized (this) {
try {
wait(gcWaitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
}
// Extract all of the ledger ID's that comprise all of the entry logs
// (except for the current new one which is still being written to).
entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap);
// gc inactive/deleted ledgers
doGcLedgers();
// gc entry logs
doGcEntryLogs();
long curTime = MathUtils.now();
if (enableMajorCompaction &&
curTime - lastMajorCompactionTime > majorCompactionInterval) {
// enter major compaction
LOG.info("Enter major compaction");
doCompactEntryLogs(majorCompactionThreshold);
lastMajorCompactionTime = MathUtils.now();
// also move minor compaction time
lastMinorCompactionTime = lastMajorCompactionTime;
continue;
}
if (enableMinorCompaction &&
curTime - lastMinorCompactionTime > minorCompactionInterval) {
// enter minor compaction
LOG.info("Enter minor compaction");
doCompactEntryLogs(minorCompactionThreshold);
lastMinorCompactionTime = MathUtils.now();
}
}
}
/**
* Do garbage collection ledger index files
*/
private void doGcLedgers() {
garbageCollector.gc(garbageCleaner);
}
/**
* Garbage collect those entry loggers which are not associated with any active ledgers
*/
private void doGcEntryLogs() {
// Loop through all of the entry logs and remove the non-active ledgers.
for (Long entryLogId : entryLogMetaMap.keySet()) {
EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
for (Long entryLogLedger : meta.ledgersMap.keySet()) {
// Remove the entry log ledger from the set if it isn't active.
if (!activeLedgers.containsKey(entryLogLedger)) {
meta.removeLedger(entryLogLedger);
}
}
if (meta.isEmpty()) {
// This means the entry log is not associated with any active ledgers anymore.
// We can remove this entry log file now.
LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
removeEntryLog(entryLogId);
}
}
}
/**
* Compact entry logs if necessary.
*
* <p>
* Compaction will be executed from low unused space to high unused space.
* Those entry log files whose remaining size percentage is higher than threshold
* would not be compacted.
* </p>
*/
private void doCompactEntryLogs(double threshold) {
LOG.info("Do compaction to compact those files lower than " + threshold);
// sort the ledger meta by occupied unused space
Comparator<EntryLogMetadata> sizeComparator = new Comparator<EntryLogMetadata>() {
@Override
public int compare(EntryLogMetadata m1, EntryLogMetadata m2) {
long unusedSize1 = m1.totalSize - m1.remainingSize;
long unusedSize2 = m2.totalSize - m2.remainingSize;
if (unusedSize1 > unusedSize2) {
return -1;
} else if (unusedSize1 < unusedSize2) {
return 1;
} else {
return 0;
}
}
};
List<EntryLogMetadata> logsToCompact = new ArrayList<EntryLogMetadata>();
logsToCompact.addAll(entryLogMetaMap.values());
Collections.sort(logsToCompact, sizeComparator);
List<Long> toRemove = new ArrayList<Long>();
for (EntryLogMetadata meta : logsToCompact) {
if (meta.getUsage() >= threshold) {
break;
}
LOG.debug("Compacting entry log {} below threshold {}.", meta.entryLogId, threshold);
try {
compactEntryLog(scannerFactory, meta);
toRemove.add(meta.entryLogId);
} catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
LOG.warn("No writable ledger directory available, aborting compaction", nwlde);
break;
} catch (IOException ioe) {
// if compact entry log throws IOException, we don't want to remove that
// entry log. however, if some entries from that log have been readded
// to the entry log, and the offset updated, it's ok to flush that
LOG.error("Error compacting entry log. Log won't be deleted", ioe);
}
if (!running) { // if gc thread is not running, stop compaction
return;
}
}
try {
// compaction finished, flush any outstanding offsets
scannerFactory.flush();
} catch (IOException ioe) {
LOG.error("Cannot flush compacted entries, skip removal", ioe);
return;
}
// offsets have been flushed, its now safe to remove the old entrylogs
for (Long l : toRemove) {
removeEntryLog(l);
}
}
/**
* Shutdown the garbage collector thread.
*
* @throws InterruptedException if there is an exception stopping gc thread.
*/
public void shutdown() throws InterruptedException {
this.running = false;
if (compacting.compareAndSet(false, true)) {
// if setting compacting flag succeed, means gcThread is not compacting now
// it is safe to interrupt itself now
this.interrupt();
}
this.join();
}
/**
* Remove entry log.
*
* @param entryLogId
* Entry Log File Id
*/
private void removeEntryLog(long entryLogId) {
// remove entry log file successfully
if (entryLogger.removeEntryLog(entryLogId)) {
entryLogMetaMap.remove(entryLogId);
}
}
/**
* Compact an entry log.
*
* @param entryLogId
* Entry Log File Id
*/
protected void compactEntryLog(CompactionScannerFactory scannerFactory,
EntryLogMetadata entryLogMeta) throws IOException {
// Similar with Sync Thread
// try to mark compacting flag to make sure it would not be interrupted
// by shutdown during compaction. otherwise it will receive
// ClosedByInterruptException which may cause index file & entry logger
// closed and corrupted.
if (!compacting.compareAndSet(false, true)) {
// set compacting flag failed, means compacting is true now
// indicates another thread wants to interrupt gc thread to exit
return;
}
LOG.info("Compacting entry log : {}", entryLogMeta.entryLogId);
try {
entryLogger.scanEntryLog(entryLogMeta.entryLogId,
scannerFactory.newScanner(entryLogMeta));
} finally {
// clear compacting flag
compacting.set(false);
}
}
/**
* Records the total size, remaining size and the set of ledgers that comprise a entry log.
*/
static class EntryLogMetadata {
long entryLogId;
long totalSize;
long remainingSize;
ConcurrentHashMap<Long, Long> ledgersMap;
public EntryLogMetadata(long logId) {
this.entryLogId = logId;
totalSize = remainingSize = 0;
ledgersMap = new ConcurrentHashMap<Long, Long>();
}
public void addLedgerSize(long ledgerId, long size) {
totalSize += size;
remainingSize += size;
Long ledgerSize = ledgersMap.get(ledgerId);
if (null == ledgerSize) {
ledgerSize = 0L;
}
ledgerSize += size;
ledgersMap.put(ledgerId, ledgerSize);
}
public void removeLedger(long ledgerId) {
Long size = ledgersMap.remove(ledgerId);
if (null == size) {
return;
}
remainingSize -= size;
}
public boolean containsLedger(long ledgerId) {
return ledgersMap.containsKey(ledgerId);
}
public double getUsage() {
if (totalSize == 0L) {
return 0.0f;
}
return (double)remainingSize / totalSize;
}
public boolean isEmpty() {
return ledgersMap.isEmpty();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ totalSize = ").append(totalSize).append(", remainingSize = ")
.append(remainingSize).append(", ledgersMap = ").append(ledgersMap).append(" }");
return sb.toString();
}
}
/**
* A scanner used to extract entry log meta from entry log files.
*/
static class ExtractionScanner implements EntryLogScanner {
EntryLogMetadata meta;
public ExtractionScanner(EntryLogMetadata meta) {
this.meta = meta;
}
@Override
public boolean accept(long ledgerId) {
return true;
}
@Override
public void process(long ledgerId, long offset, ByteBuffer entry) {
// add new entry size of a ledger to entry log meta
meta.addLedgerSize(ledgerId, entry.limit() + 4);
}
}
/**
* Method to read in all of the entry logs (those that we haven't done so yet),
* and find the set of ledger ID's that make up each entry log file.
*
* @param entryLogMetaMap
* Existing EntryLogs to Meta
* @throws IOException
*/
protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) {
// Extract it for every entry log except for the current one.
// Entry Log ID's are just a long value that starts at 0 and increments
// by 1 when the log fills up and we roll to a new one.
long curLogId = entryLogger.getCurrentLogId();
boolean hasExceptionWhenScan = false;
for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
}
// check whether log file exists or not
// if it doesn't exist, this log file might have been garbage collected.
if (!entryLogger.logExists(entryLogId)) {
continue;
}
LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId);
try {
// Read through the entry log file and extract the entry log meta
EntryLogMetadata entryLogMeta = extractMetaFromEntryLog(entryLogger, entryLogId);
entryLogMetaMap.put(entryLogId, entryLogMeta);
} catch (IOException e) {
hasExceptionWhenScan = true;
LOG.warn("Premature exception when processing " + entryLogId +
" recovery will take care of the problem", e);
}
// if scan failed on some entry log, we don't move 'scannedLogId' to next id
// if scan succeed, we don't need to scan it again during next gc run,
// we move 'scannedLogId' to next id
if (!hasExceptionWhenScan) {
++scannedLogId;
}
}
return entryLogMetaMap;
}
static EntryLogMetadata extractMetaFromEntryLog(EntryLogger entryLogger, long entryLogId)
throws IOException {
EntryLogMetadata entryLogMeta = new EntryLogMetadata(entryLogId);
ExtractionScanner scanner = new ExtractionScanner(entryLogMeta);
// Read through the entry log file and extract the entry log meta
entryLogger.scanEntryLog(entryLogId, scanner);
LOG.debug("Retrieved entry log meta data entryLogId: {}, meta: {}",
entryLogId, entryLogMeta);
return entryLogMeta;
}
}