blob: f5c435dfa873e32e9f02f81a9f8bbca5fd76ffc2 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie.storage.ldb;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
//CHECKSTYLE.OFF: IllegalImport
import io.netty.util.internal.PlatformDependent;
//CHECKSTYLE.ON: IllegalImport
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.GarbageCollectionStatus;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerCache;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.commons.lang3.StringUtils;
/**
* Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
*/
@Slf4j
public class DbLedgerStorage implements LedgerStorage {
public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs";
private static final int MB = 1024 * 1024;
private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = (long) (0.25 * PlatformDependent.maxDirectMemory())
/ MB;
private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = (long) (0.25 * PlatformDependent.maxDirectMemory())
/ MB;
private int numberOfDirs;
private List<SingleDirectoryDbLedgerStorage> ledgerStorageList;
// Keep 1 single Bookie GC thread so the the compactions from multiple individual directories are serialized
private ScheduledExecutorService gcExecutor;
private DbLedgerStorageStats stats;
protected ByteBufAllocator allocator;
@Override
public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
Checkpointer checkpointer, StatsLogger statsLogger, ByteBufAllocator allocator) throws IOException {
long writeCacheMaxSize = getLongVariableOrDefault(conf, WRITE_CACHE_MAX_SIZE_MB,
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB,
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
this.allocator = allocator;
this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
log.info("Started Db Ledger Storage");
log.info(" - Number of directories: {}", numberOfDirs);
log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
if (readCacheMaxSize + writeCacheMaxSize > PlatformDependent.maxDirectMemory()) {
throw new IOException("Read and write cache sizes exceed the configured max direct memory size");
}
long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
gcExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollector"));
ledgerStorageList = Lists.newArrayList();
for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
// Create a ledger dirs manager for the single directory
File[] dirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
dirs[0] = ledgerDir.getParentFile();
LedgerDirsManager ldm = new LedgerDirsManager(conf, dirs, ledgerDirsManager.getDiskChecker(), statsLogger);
ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, indexDirsManager,
stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
}
this.stats = new DbLedgerStorageStats(
statsLogger,
() -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum(),
() -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum(),
() -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum(),
() -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum()
);
}
@VisibleForTesting
protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
throws IOException {
return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
stateManager, checkpointSource, checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize,
readCacheSize);
}
@Override
public void start() {
ledgerStorageList.forEach(LedgerStorage::start);
}
@Override
public void shutdown() throws InterruptedException {
for (LedgerStorage ls : ledgerStorageList) {
ls.shutdown();
}
}
@Override
public boolean ledgerExists(long ledgerId) throws IOException {
return getLedgerSorage(ledgerId).ledgerExists(ledgerId);
}
@Override
public boolean setFenced(long ledgerId) throws IOException {
return getLedgerSorage(ledgerId).setFenced(ledgerId);
}
@Override
public boolean isFenced(long ledgerId) throws IOException {
return getLedgerSorage(ledgerId).isFenced(ledgerId);
}
@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
getLedgerSorage(ledgerId).setMasterKey(ledgerId, masterKey);
}
@Override
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
return getLedgerSorage(ledgerId).readMasterKey(ledgerId);
}
@Override
public long addEntry(ByteBuf entry) throws IOException, BookieException {
long ledgerId = entry.getLong(entry.readerIndex());
return getLedgerSorage(ledgerId).addEntry(entry);
}
@Override
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
return getLedgerSorage(ledgerId).getEntry(ledgerId, entryId);
}
@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
return getLedgerSorage(ledgerId).getLastAddConfirmed(ledgerId);
}
@Override
public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
return getLedgerSorage(ledgerId).waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
@Override
public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
getLedgerSorage(ledgerId).cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
}
@Override
public void flush() throws IOException {
for (LedgerStorage ls : ledgerStorageList) {
ls.flush();
}
}
@Override
public void checkpoint(Checkpoint checkpoint) throws IOException {
for (LedgerStorage ls : ledgerStorageList) {
ls.checkpoint(checkpoint);
}
}
@Override
public void deleteLedger(long ledgerId) throws IOException {
getLedgerSorage(ledgerId).deleteLedger(ledgerId);
}
@Override
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
ledgerStorageList.forEach(ls -> ls.registerLedgerDeletionListener(listener));
}
@Override
public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
getLedgerSorage(ledgerId).setExplicitLac(ledgerId, lac);
}
@Override
public ByteBuf getExplicitLac(long ledgerId) throws IOException {
return getLedgerSorage(ledgerId).getExplicitLac(ledgerId);
}
public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
LedgerCache.PageEntriesIterable pages) throws Exception {
return getLedgerSorage(ledgerId).addLedgerToIndex(ledgerId, isFenced, masterKey, pages);
}
public long getLastEntryInLedger(long ledgerId) throws IOException {
return getLedgerSorage(ledgerId).getEntryLocationIndex().getLastEntryInLedger(ledgerId);
}
public long getLocation(long ledgerId, long entryId) throws IOException {
return getLedgerSorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, entryId);
}
private SingleDirectoryDbLedgerStorage getLedgerSorage(long ledgerId) {
return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, numberOfDirs));
}
public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
List<Iterable<Long>> listIt = new ArrayList<>(numberOfDirs);
for (SingleDirectoryDbLedgerStorage ls : ledgerStorageList) {
listIt.add(ls.getActiveLedgersInRange(firstLedgerId, lastLedgerId));
}
return Iterables.concat(listIt);
}
public ByteBuf getLastEntry(long ledgerId) throws IOException {
return getLedgerSorage(ledgerId).getLastEntry(ledgerId);
}
@VisibleForTesting
boolean isFlushRequired() {
return ledgerStorageList.stream().allMatch(SingleDirectoryDbLedgerStorage::isFlushRequired);
}
@VisibleForTesting
List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
return ledgerStorageList;
}
/**
* Reads ledger index entries to get list of entry-logger that contains given ledgerId.
*
* @param ledgerId
* @param serverConf
* @param processor
* @throws IOException
*/
public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration serverConf,
LedgerLoggerProcessor processor) throws IOException {
checkNotNull(serverConf, "ServerConfiguration can't be null");
checkNotNull(processor, "LedgerLoggger info processor can't null");
LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold()));
List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
int dirIndex = MathUtils.signSafeMod(ledgerId, ledgerDirs.size());
String ledgerBasePath = ledgerDirs.get(dirIndex).toString();
EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf,
(path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
ledgerBasePath, NullStatsLogger.INSTANCE);
try {
long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
for (long currentEntry = 0; currentEntry <= lastEntryId; currentEntry++) {
long offset = entryLocationIndex.getLocation(ledgerId, currentEntry);
if (offset <= 0) {
// entry not found in this bookie
continue;
}
long entryLogId = offset >> 32L;
long position = offset & 0xffffffffL;
processor.process(currentEntry, entryLogId, position);
}
} finally {
entryLocationIndex.close();
}
}
@Override
public void forceGC() {
ledgerStorageList.stream().forEach(SingleDirectoryDbLedgerStorage::forceGC);
}
@Override
public boolean isInForceGC() {
return ledgerStorageList.stream().anyMatch(SingleDirectoryDbLedgerStorage::isInForceGC);
}
@Override
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return ledgerStorageList.stream()
.map(single -> single.getGarbageCollectionStatus().get(0)).collect(Collectors.toList());
}
static long getLongVariableOrDefault(ServerConfiguration conf, String keyName, long defaultValue) {
Object obj = conf.getProperty(keyName);
if (obj instanceof Number) {
return ((Number) obj).longValue();
} else if (obj == null) {
return defaultValue;
} else if (StringUtils.isEmpty(conf.getString(keyName))) {
return defaultValue;
} else {
return conf.getLong(keyName);
}
}
@Override
public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
// check Issue #2078
throw new UnsupportedOperationException(
"getListOfEntriesOfLedger method is currently unsupported for DbLedgerStorage");
}
}