blob: 6954f65d0a70ac2e5490ff04d4e6386bec28d221 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRYLOGGER_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGES_SCANNED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.Getter;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.DefaultEntryLogger.EntryLogListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Interleave ledger storage.
*
* <p>This ledger storage implementation stores all entries in a single
* file and maintains an index file for each ledger.
*/
@StatsDoc(
name = BOOKIE_SCOPE,
category = CATEGORY_SERVER,
help = "Bookie related stats"
)
public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener {
private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
DefaultEntryLogger entryLogger;
@Getter
LedgerCache ledgerCache;
protected CheckpointSource checkpointSource = CheckpointSource.DEFAULT;
protected Checkpointer checkpointer = Checkpointer.NULL;
private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners =
Lists.newCopyOnWriteArrayList();
// A sorted map to stored all active ledger ids
protected final SnapshotMap<Long, Boolean> activeLedgers;
// This is the thread that garbage collects the entry logs that do not
// contain any active ledgers in them; and compacts the entry logs that
// has lower remaining percentage to reclaim disk space.
GarbageCollectorThread gcThread;
// this indicates that a write has happened since the last flush
private final AtomicBoolean somethingWritten = new AtomicBoolean(false);
// Expose Stats
@StatsDoc(
name = STORAGE_GET_OFFSET,
help = "Operation stats of getting offset from ledger cache",
parent = BOOKIE_READ_ENTRY
)
private OpStatsLogger getOffsetStats;
@StatsDoc(
name = STORAGE_GET_ENTRY,
help = "Operation stats of getting entry from entry logger",
parent = BOOKIE_READ_ENTRY,
happensAfter = STORAGE_GET_OFFSET
)
private OpStatsLogger getEntryStats;
private OpStatsLogger pageScanStats;
private Counter retryCounter;
public InterleavedLedgerStorage() {
activeLedgers = new SnapshotMap<>();
}
@Override
public void initialize(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
StatsLogger statsLogger,
ByteBufAllocator allocator)
throws IOException {
initializeWithEntryLogListener(
conf,
ledgerManager,
ledgerDirsManager,
indexDirsManager,
this,
statsLogger,
allocator);
}
void initializeWithEntryLogListener(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
EntryLogListener entryLogListener,
StatsLogger statsLogger,
ByteBufAllocator allocator) throws IOException {
initializeWithEntryLogger(
conf,
ledgerManager,
ledgerDirsManager,
indexDirsManager,
new DefaultEntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE),
allocator),
statsLogger);
}
@Override
public void setStateManager(StateManager stateManager) {}
@Override
public void setCheckpointSource(CheckpointSource checkpointSource) {
this.checkpointSource = checkpointSource;
}
@Override
public void setCheckpointer(Checkpointer checkpointer) {
this.checkpointer = checkpointer;
}
public void initializeWithEntryLogger(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
EntryLogger entryLogger,
StatsLogger statsLogger) throws IOException {
checkNotNull(checkpointSource, "invalid null checkpoint source");
checkNotNull(checkpointer, "invalid null checkpointer");
this.entryLogger = (DefaultEntryLogger) entryLogger;
this.entryLogger.addListener(this);
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager,
this, entryLogger, statsLogger.scope("gc"));
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
// Expose Stats
getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
pageScanStats = statsLogger.getOpStatsLogger(STORAGE_SCRUB_PAGES_SCANNED);
retryCounter = statsLogger.getCounter(STORAGE_SCRUB_PAGE_RETRIES);
}
private LedgerDirsListener getLedgerDirsListener() {
return new LedgerDirsListener() {
@Override
public void diskAlmostFull(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
}
}
@Override
public void diskFull(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
gcThread.suspendMinorGC();
}
}
@Override
public void allDisksFull(boolean highPriorityWritesAllowed) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
gcThread.suspendMinorGC();
}
}
@Override
public void diskWritable(File disk) {
// we have enough space now
if (gcThread.isForceGCAllowWhenNoSpace) {
// disable force gc.
gcThread.disableForceGC();
} else {
// resume compaction to normal.
gcThread.resumeMajorGC();
gcThread.resumeMinorGC();
}
}
@Override
public void diskJustWritable(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace) {
// if a disk is just writable, we still need force gc.
gcThread.enableForceGC();
} else {
// still under warn threshold, only resume minor compaction.
gcThread.resumeMinorGC();
}
}
};
}
@Override
public void forceGC() {
gcThread.enableForceGC();
}
@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
gcThread.enableForceGC(forceMajor, forceMinor);
}
@Override
public boolean isInForceGC() {
return gcThread.isInForceGC();
}
@Override
public void start() {
gcThread.start();
}
@Override
public void shutdown() throws InterruptedException {
// shut down gc thread, which depends on zookeeper client
// also compaction will write entries again to entry log file
LOG.info("Shutting down InterleavedLedgerStorage");
LOG.info("Shutting down GC thread");
gcThread.shutdown();
LOG.info("Shutting down entry logger");
entryLogger.close();
try {
ledgerCache.close();
} catch (IOException e) {
LOG.error("Error while closing the ledger cache", e);
}
LOG.info("Complete shutting down Ledger Storage");
}
@Override
public boolean setFenced(long ledgerId) throws IOException {
return ledgerCache.setFenced(ledgerId);
}
@Override
public boolean isFenced(long ledgerId) throws IOException {
return ledgerCache.isFenced(ledgerId);
}
@Override
public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
ledgerCache.setExplicitLac(ledgerId, lac);
}
@Override
public ByteBuf getExplicitLac(long ledgerId) {
return ledgerCache.getExplicitLac(ledgerId);
}
@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
ledgerCache.setMasterKey(ledgerId, masterKey);
}
@Override
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
return ledgerCache.readMasterKey(ledgerId);
}
@Override
public boolean ledgerExists(long ledgerId) throws IOException {
return ledgerCache.ledgerExists(ledgerId);
}
@Override
public boolean entryExists(long ledgerId, long entryId) throws IOException {
//Implementation should be as simple as what's below, but this needs testing
//return ledgerCache.getEntryOffset(ledgerId, entryId) > 0;
throw new UnsupportedOperationException("entry exists not supported");
}
@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
Long lac = ledgerCache.getLastAddConfirmed(ledgerId);
if (lac == null) {
ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
if (null == bb) {
return BookieProtocol.INVALID_ENTRY_ID;
} else {
try {
bb.skipBytes(2 * Long.BYTES); // skip ledger & entry id
lac = bb.readLong();
lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
} finally {
bb.release();
}
}
}
return lac;
}
@Override
public boolean waitForLastAddConfirmedUpdate(long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
@Override
public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
ledgerCache.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
}
@Override
public long addEntry(ByteBuf entry) throws IOException {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
long lac = entry.getLong(entry.readerIndex() + 16);
processEntry(ledgerId, entryId, entry);
ledgerCache.updateLastAddConfirmed(ledgerId, lac);
return entryId;
}
@Override
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
long offset;
/*
* If entryId is BookieProtocol.LAST_ADD_CONFIRMED, then return the last written.
*/
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
entryId = ledgerCache.getLastEntry(ledgerId);
}
// Get Offset
long startTimeNanos = MathUtils.nowInNano();
boolean success = false;
try {
offset = ledgerCache.getEntryOffset(ledgerId, entryId);
if (offset == 0) {
throw new Bookie.NoEntryException(ledgerId, entryId);
}
success = true;
} finally {
if (success) {
getOffsetStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
getOffsetStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
}
// Get Entry
startTimeNanos = MathUtils.nowInNano();
success = false;
try {
ByteBuf retBytes = entryLogger.readEntry(ledgerId, entryId, offset);
success = true;
return retBytes;
} finally {
if (success) {
getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
getEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
}
}
private void flushOrCheckpoint(boolean isCheckpointFlush)
throws IOException {
boolean flushFailed = false;
try {
ledgerCache.flushLedger(true);
} catch (LedgerDirsManager.NoWritableLedgerDirException e) {
throw e;
} catch (IOException ioe) {
LOG.error("Exception flushing Ledger cache", ioe);
flushFailed = true;
}
try {
// if it is just a checkpoint flush, we just flush rotated entry log files
// in entry logger.
if (isCheckpointFlush) {
entryLogger.checkpoint();
} else {
entryLogger.flush();
}
} catch (LedgerDirsManager.NoWritableLedgerDirException e) {
throw e;
} catch (IOException ioe) {
LOG.error("Exception flushing Ledger", ioe);
flushFailed = true;
}
if (flushFailed) {
throw new IOException("Flushing to storage failed, check logs");
}
}
@Override
public void checkpoint(Checkpoint checkpoint) throws IOException {
// we don't need to check somethingwritten since checkpoint
// is scheduled when rotate an entry logger file. and we could
// not set somethingWritten to false after checkpoint, since
// current entry logger file isn't flushed yet.
flushOrCheckpoint(true);
}
@Override
public synchronized void flush() throws IOException {
if (!somethingWritten.compareAndSet(true, false)) {
return;
}
flushOrCheckpoint(false);
}
@Override
public void deleteLedger(long ledgerId) throws IOException {
activeLedgers.remove(ledgerId);
ledgerCache.deleteLedger(ledgerId);
for (LedgerDeletionListener listener : ledgerDeletionListeners) {
listener.ledgerDeleted(ledgerId);
}
}
@Override
public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
.subMap(firstLedgerId, true, lastLedgerId, false);
return subBkActiveLedgers.keySet();
}
@Override
public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
for (EntryLocation l : locations) {
try {
ledgerCache.putEntryOffset(l.ledger, l.entry, l.location);
} catch (NoLedgerException e) {
// Ledger was already deleted, we can skip it in the compaction
if (LOG.isDebugEnabled()) {
LOG.debug("Compaction failed for deleted ledger ledger: {} entry: {}", l.ledger, l.entry);
}
}
}
}
@Override
public void flushEntriesLocationsIndex() throws IOException {
ledgerCache.flushLedger(true);
}
public DefaultEntryLogger getEntryLogger() {
return entryLogger;
}
@Override
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
ledgerDeletionListeners.add(listener);
}
protected void processEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException {
processEntry(ledgerId, entryId, entry, true);
}
protected void processEntry(long ledgerId, long entryId, ByteBuf entry, boolean rollLog)
throws IOException {
/*
* Touch dirty flag
*/
somethingWritten.set(true);
/*
* Log the entry
*/
long pos = entryLogger.addEntry(ledgerId, entry, rollLog);
/*
* Set offset of entry id to be the current ledger position
*/
ledgerCache.putEntryOffset(ledgerId, entryId, pos);
}
@Override
public void onRotateEntryLog() {
// for interleaved ledger storage, we request a checkpoint when rotating a entry log file.
// the checkpoint represent the point that all the entries added before this point are already
// in ledger storage and ready to be synced to disk.
// TODO: we could consider remove checkpointSource and checkpointSouce#newCheckpoint
// later if we provide kind of LSN (Log/Journal Squeuence Number)
// mechanism when adding entry. {@link https://github.com/apache/bookkeeper/issues/279}
Checkpoint checkpoint = checkpointSource.newCheckpoint();
checkpointer.startCheckpoint(checkpoint);
}
/**
* Return iterable for index entries for ledgerId.
* @param ledgerId ledger to scan
* @return Iterator
*/
public LedgerCache.PageEntriesIterable getIndexEntries(long ledgerId) throws IOException {
return ledgerCache.listEntries(ledgerId);
}
/**
* Read implementation metadata for index file.
* @param ledgerId
* @return Implementation metadata
* @throws IOException
*/
public LedgerCache.LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
return ledgerCache.readLedgerIndexMetadata(ledgerId);
}
@Override
@SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE")
public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
long checkStart = MathUtils.nowInNano();
LOG.info("Starting localConsistencyCheck");
long checkedLedgers = 0;
long checkedPages = 0;
final MutableLong checkedEntries = new MutableLong(0);
final MutableLong pageRetries = new MutableLong(0);
NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
final List<DetectedInconsistency> errors = new ArrayList<>();
for (Long ledger : bkActiveLedgersSnapshot.keySet()) {
try (LedgerCache.PageEntriesIterable pages = ledgerCache.listEntries(ledger)) {
for (LedgerCache.PageEntries page : pages) {
@Cleanup LedgerEntryPage lep = page.getLEP();
MutableBoolean retry = new MutableBoolean(false);
do {
retry.setValue(false);
int version = lep.getVersion();
MutableBoolean success = new MutableBoolean(true);
long start = MathUtils.nowInNano();
lep.getEntries((entry, offset) -> {
rateLimiter.ifPresent(RateLimiter::acquire);
try {
entryLogger.checkEntry(ledger, entry, offset);
checkedEntries.increment();
} catch (DefaultEntryLogger.EntryLookupException e) {
if (version != lep.getVersion()) {
pageRetries.increment();
if (lep.isDeleted()) {
LOG.debug("localConsistencyCheck: ledger {} deleted", ledger);
} else {
LOG.debug("localConsistencyCheck: concurrent modification, retrying");
retry.setValue(true);
retryCounter.inc();
}
return false;
} else {
errors.add(new DetectedInconsistency(ledger, entry, e));
LOG.error("Got error: ", e);
}
success.setValue(false);
}
return true;
});
if (success.booleanValue()) {
pageScanStats.registerSuccessfulEvent(
MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
} else {
pageScanStats.registerFailedEvent(
MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
}
} while (retry.booleanValue());
checkedPages++;
}
} catch (NoLedgerException | FileInfo.FileInfoDeletedException e) {
if (activeLedgers.containsKey(ledger)) {
LOG.error("Cannot find ledger {}, should exist, exception is ", ledger, e);
errors.add(new DetectedInconsistency(ledger, -1, e));
} else {
LOG.debug("ledger {} deleted since snapshot taken", ledger);
}
} catch (Exception e) {
throw new IOException("Got other exception in localConsistencyCheck", e);
}
checkedLedgers++;
}
LOG.info(
"Finished localConsistencyCheck, took {}s to scan {} ledgers, {} pages, "
+ "{} entries with {} retries, {} errors",
TimeUnit.NANOSECONDS.toSeconds(MathUtils.elapsedNanos(checkStart)),
checkedLedgers,
checkedPages,
checkedEntries.longValue(),
pageRetries.longValue(),
errors.size());
return errors;
}
@Override
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return Collections.singletonList(gcThread.getGarbageCollectionStatus());
}
@Override
public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
return ledgerCache.getEntriesIterator(ledgerId);
}
@Override
public void setLimboState(long ledgerId) throws IOException {
throw new UnsupportedOperationException(
"Limbo state only supported for DbLedgerStorage");
}
@Override
public boolean hasLimboState(long ledgerId) throws IOException {
throw new UnsupportedOperationException(
"Limbo state only supported for DbLedgerStorage");
}
@Override
public void clearLimboState(long ledgerId) throws IOException {
throw new UnsupportedOperationException(
"Limbo state only supported for DbLedgerStorage");
}
@Override
public EnumSet<StorageState> getStorageStateFlags() throws IOException {
return EnumSet.noneOf(StorageState.class);
}
@Override
public void setStorageStateFlag(StorageState flags) throws IOException {
throw new UnsupportedOperationException(
"Storage state only flags supported for DbLedgerStorage");
}
@Override
public void clearStorageStateFlag(StorageState flags) throws IOException {
throw new UnsupportedOperationException(
"Storage state flags only supported for DbLedgerStorage");
}
}