blob: a7080ba4f3e672f9242fe355861a344cb9f1a4f4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.IteratorUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@code SortedLedgerStorage} is an extension of {@link InterleavedLedgerStorage}. It
* is comprised of two {@code MemTable}s and a {@code InterleavedLedgerStorage}. All the
* entries will be first added into a {@code MemTable}, and then be flushed back to the
* {@code InterleavedLedgerStorage} when the {@code MemTable} becomes full.
*/
public class SortedLedgerStorage
implements LedgerStorage, CacheCallback, SkipListFlusher,
CompactableLedgerStorage, DefaultEntryLogger.EntryLogListener {
private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
EntryMemTable memTable;
private ScheduledExecutorService scheduler;
private StateManager stateManager;
private ServerConfiguration conf;
private StatsLogger statsLogger;
private final InterleavedLedgerStorage interleavedLedgerStorage;
public SortedLedgerStorage() {
this(new InterleavedLedgerStorage());
}
@VisibleForTesting
protected SortedLedgerStorage(InterleavedLedgerStorage ils) {
interleavedLedgerStorage = ils;
}
@Override
public void initialize(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
StatsLogger statsLogger,
ByteBufAllocator allocator)
throws IOException {
this.conf = conf;
this.statsLogger = statsLogger;
interleavedLedgerStorage.initializeWithEntryLogListener(
conf,
ledgerManager,
ledgerDirsManager,
indexDirsManager,
// uses sorted ledger storage's own entry log listener
// since it manages entry log rotations and checkpoints.
this,
statsLogger,
allocator);
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("SortedLedgerStorage-%d")
.setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build());
}
@Override
public void setStateManager(StateManager stateManager) {
interleavedLedgerStorage.setStateManager(stateManager);
this.stateManager = stateManager;
}
@Override
public void setCheckpointSource(CheckpointSource checkpointSource) {
interleavedLedgerStorage.setCheckpointSource(checkpointSource);
if (conf.isEntryLogPerLedgerEnabled()) {
this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
} else {
this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
}
}
@Override
public void setCheckpointer(Checkpointer checkpointer) {
interleavedLedgerStorage.setCheckpointer(checkpointer);
}
@VisibleForTesting
ScheduledExecutorService getScheduler() {
return scheduler;
}
@Override
public void start() {
try {
flush();
} catch (IOException e) {
LOG.error("Exception thrown while flushing ledger cache.", e);
}
interleavedLedgerStorage.start();
}
@Override
public void shutdown() throws InterruptedException {
// Wait for any jobs currently scheduled to be completed and then shut down.
scheduler.shutdown();
if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
try {
memTable.close();
} catch (Exception e) {
LOG.error("Error while closing the memtable", e);
}
interleavedLedgerStorage.shutdown();
}
@Override
public boolean ledgerExists(long ledgerId) throws IOException {
// Done this way because checking the skip list is an O(logN) operation compared to
// the O(1) for the ledgerCache.
if (!interleavedLedgerStorage.ledgerExists(ledgerId)) {
EntryKeyValue kv = memTable.getLastEntry(ledgerId);
if (null == kv) {
return interleavedLedgerStorage.ledgerExists(ledgerId);
}
}
return true;
}
@Override
public boolean entryExists(long ledgerId, long entryId) throws IOException {
// can probably be implemented as above, but I'm not going to test it
throw new UnsupportedOperationException("Not supported for SortedLedgerStorage");
}
@Override
public boolean setFenced(long ledgerId) throws IOException {
return interleavedLedgerStorage.setFenced(ledgerId);
}
@Override
public boolean isFenced(long ledgerId) throws IOException {
return interleavedLedgerStorage.isFenced(ledgerId);
}
@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
interleavedLedgerStorage.setMasterKey(ledgerId, masterKey);
}
@Override
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
return interleavedLedgerStorage.readMasterKey(ledgerId);
}
@Override
public long addEntry(ByteBuf entry) throws IOException {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
long lac = entry.getLong(entry.readerIndex() + 16);
memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this);
interleavedLedgerStorage.ledgerCache.updateLastAddConfirmed(ledgerId, lac);
return entryId;
}
/**
* Get the last entry id for a particular ledger.
* @param ledgerId
* @return
*/
private ByteBuf getLastEntryId(long ledgerId) throws IOException {
EntryKeyValue kv = memTable.getLastEntry(ledgerId);
if (null != kv) {
return kv.getValueAsByteBuffer();
}
// If it doesn't exist in the skip list, then fallback to the ledger cache+index.
return interleavedLedgerStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
}
@Override
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException {
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
return getLastEntryId(ledgerId);
}
ByteBuf buffToRet;
try {
buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
} catch (Bookie.NoEntryException nee) {
EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
if (null == kv) {
// The entry might have been flushed since we last checked, so query the ledger cache again.
// If the entry truly doesn't exist, then this will throw a NoEntryException
buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
} else {
buffToRet = kv.getValueAsByteBuffer();
}
}
// buffToRet will not be null when we reach here.
return buffToRet;
}
@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
return interleavedLedgerStorage.getLastAddConfirmed(ledgerId);
}
@Override
public boolean waitForLastAddConfirmedUpdate(long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
return interleavedLedgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
@Override
public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
interleavedLedgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
}
@Override
public void checkpoint(final Checkpoint checkpoint) throws IOException {
long numBytesFlushed = memTable.flush(this, checkpoint);
interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
interleavedLedgerStorage.checkpoint(checkpoint);
}
@Override
public void deleteLedger(long ledgerId) throws IOException {
interleavedLedgerStorage.deleteLedger(ledgerId);
}
@Override
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
interleavedLedgerStorage.registerLedgerDeletionListener(listener);
}
@Override
public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
interleavedLedgerStorage.setExplicitLac(ledgerId, lac);
}
@Override
public ByteBuf getExplicitLac(long ledgerId) {
return interleavedLedgerStorage.getExplicitLac(ledgerId);
}
@Override
public void process(long ledgerId, long entryId,
ByteBuf buffer) throws IOException {
interleavedLedgerStorage.processEntry(ledgerId, entryId, buffer, false);
}
@Override
public void flush() throws IOException {
memTable.flush(this, Checkpoint.MAX);
interleavedLedgerStorage.flush();
}
// CacheCallback functions.
@Override
public void onSizeLimitReached(final Checkpoint cp) throws IOException {
LOG.info("Reached size {}", cp);
// when size limit reached, we get the previous checkpoint from snapshot mem-table.
// at this point, we are safer to schedule a checkpoint, since the entries added before
// this checkpoint already written to entry logger.
// but it would be better not to let mem-table flush to different entry log files,
// so we roll entry log files in SortedLedgerStorage itself.
// After that, we could make the process writing data to entry logger file not bound with checkpoint.
// otherwise, it hurts add performance.
//
// The only exception for the size limitation is if a file grows to be more than hard limit 2GB,
// we have to force rolling log, which it might cause slight performance effects
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Started flushing mem table.");
interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
memTable.flush(SortedLedgerStorage.this);
if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
}
} catch (Exception e) {
stateManager.transitionToReadOnlyMode();
LOG.error("Exception thrown while flushing skip list cache.", e);
}
}
});
}
@Override
public void onRotateEntryLog() {
// override the behavior at interleaved ledger storage.
// we don't trigger any checkpoint logic when an entry log file is rotated, because entry log file rotation
// can happen because compaction. in a sorted ledger storage, checkpoint should happen after the data is
// flushed to the entry log file.
}
BookieStateManager getStateManager(){
return (BookieStateManager) stateManager;
}
public DefaultEntryLogger getEntryLogger() {
return interleavedLedgerStorage.getEntryLogger();
}
@Override
public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
}
@Override
public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
interleavedLedgerStorage.updateEntriesLocations(locations);
}
@Override
public void flushEntriesLocationsIndex() throws IOException {
interleavedLedgerStorage.flushEntriesLocationsIndex();
}
@Override
public LedgerStorage getUnderlyingLedgerStorage() {
return interleavedLedgerStorage;
}
@Override
public void forceGC() {
interleavedLedgerStorage.forceGC();
}
@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
interleavedLedgerStorage.forceGC(forceMajor, forceMinor);
}
@Override
public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
return interleavedLedgerStorage.localConsistencyCheck(rateLimiter);
}
@Override
public boolean isInForceGC() {
return interleavedLedgerStorage.isInForceGC();
}
@Override
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return interleavedLedgerStorage.getGarbageCollectionStatus();
}
@Override
public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
PrimitiveIterator.OfLong entriesInMemtableItr = memTable.getListOfEntriesOfLedger(ledgerId);
PrimitiveIterator.OfLong entriesFromILSItr = interleavedLedgerStorage.getListOfEntriesOfLedger(ledgerId);
return IteratorUtility.mergePrimitiveLongIterator(entriesInMemtableItr, entriesFromILSItr);
}
@Override
public void setLimboState(long ledgerId) throws IOException {
throw new UnsupportedOperationException(
"Limbo state only supported for DbLedgerStorage");
}
@Override
public boolean hasLimboState(long ledgerId) throws IOException {
throw new UnsupportedOperationException(
"Limbo state only supported for DbLedgerStorage");
}
@Override
public void clearLimboState(long ledgerId) throws IOException {
throw new UnsupportedOperationException(
"Limbo state only supported for DbLedgerStorage");
}
@Override
public EnumSet<StorageState> getStorageStateFlags() throws IOException {
return EnumSet.noneOf(StorageState.class);
}
@Override
public void setStorageStateFlag(StorageState flags) throws IOException {
throw new UnsupportedOperationException(
"Storage state only flags supported for DbLedgerStorage");
}
@Override
public void clearStorageStateFlag(StorageState flags) throws IOException {
throw new UnsupportedOperationException(
"Storage state flags only supported for DbLedgerStorage");
}
}