blob: 2dfa1a14958600bb3a30127ecb215e52d2ba4675 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_MAX;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_USED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PrimitiveIterator.OfLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.stats.BookieStats;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.SimpleBookieServiceInfoProvider;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements a bookie.
*/
public class BookieImpl extends BookieCriticalThread implements Bookie {
private static final Logger LOG = LoggerFactory.getLogger(Bookie.class);
final List<File> journalDirectories;
final ServerConfiguration conf;
final SyncThread syncThread;
final LedgerManagerFactory ledgerManagerFactory;
final LedgerManager ledgerManager;
final LedgerStorage ledgerStorage;
final List<Journal> journals;
final HandleFactory handles;
final boolean entryLogPerLedgerEnabled;
public static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
public static final long METAENTRY_ID_FENCE_KEY = -0x2000;
public static final long METAENTRY_ID_FORCE_LEDGER = -0x4000;
static final long METAENTRY_ID_LEDGER_EXPLICITLAC = -0x8000;
private final LedgerDirsManager ledgerDirsManager;
protected final Supplier<BookieServiceInfo> bookieServiceInfoProvider;
private final LedgerDirsManager indexDirsManager;
LedgerDirsMonitor dirsMonitor;
// Registration Manager for managing registration
protected final MetadataBookieDriver metadataDriver;
private int exitCode = ExitCode.OK;
private final ConcurrentLongHashMap<byte[]> masterKeyCache = new ConcurrentLongHashMap<>();
protected StateManager stateManager;
// Expose Stats
final StatsLogger statsLogger;
private final BookieStats bookieStats;
private final ByteBufAllocator allocator;
private final boolean writeDataToJournal;
@StatsDoc(
name = JOURNAL_MEMORY_MAX,
help = "The max amount of memory in bytes that can be used by the bookie journal"
)
private final Gauge<Long> journalMemoryMaxStats;
@StatsDoc(
name = JOURNAL_MEMORY_USED,
help = "The actual amount of memory in bytes currently used by the bookie journal"
)
private final Gauge<Long> journalMemoryUsedStats;
// Write Callback do nothing
static class NopWriteCallback implements WriteCallback {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieId addr, Object ctx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
entryId, ledgerId, addr, rc);
}
}
}
public static void checkDirectoryStructure(File dir) throws IOException {
if (!dir.exists()) {
File parent = dir.getParentFile();
File preV3versionFile = new File(dir.getParent(),
BookKeeperConstants.VERSION_FILENAME);
final AtomicBoolean oldDataExists = new AtomicBoolean(false);
parent.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (name.endsWith(".txn") || name.endsWith(".idx") || name.endsWith(".log")) {
oldDataExists.set(true);
}
return true;
}
});
if (preV3versionFile.exists() || oldDataExists.get()) {
String err = "Directory layout version is less than 3, upgrade needed";
LOG.error(err);
throw new IOException(err);
}
if (!dir.mkdirs()) {
String err = "Unable to create directory " + dir;
LOG.error(err);
throw new IOException(err);
}
}
}
/**
* Check that the environment for the bookie is correct.
* This means that the configuration has stayed the same as the
* first run and the filesystem structure is up to date.
*/
private void checkEnvironment(MetadataBookieDriver metadataDriver)
throws BookieException, IOException {
List<File> allLedgerDirs = new ArrayList<File>(ledgerDirsManager.getAllLedgerDirs().size()
+ indexDirsManager.getAllLedgerDirs().size());
allLedgerDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
if (indexDirsManager != ledgerDirsManager) {
allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs());
}
if (metadataDriver == null) { // exists only for testing, just make sure directories are correct
for (File journalDirectory : journalDirectories) {
checkDirectoryStructure(journalDirectory);
}
for (File dir : allLedgerDirs) {
checkDirectoryStructure(dir);
}
return;
}
checkEnvironmentWithStorageExpansion(conf, metadataDriver, journalDirectories, allLedgerDirs);
checkIfDirsOnSameDiskPartition(allLedgerDirs);
checkIfDirsOnSameDiskPartition(journalDirectories);
}
/**
* Checks if multiple directories are in same diskpartition/filesystem/device.
* If ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION config parameter is not enabled, and
* if it is found that there are multiple directories in the same DiskPartition then
* it will throw DiskPartitionDuplicationException.
*
* @param dirs dirs to validate
*
* @throws IOException
*/
private void checkIfDirsOnSameDiskPartition(List<File> dirs) throws DiskPartitionDuplicationException {
boolean allowDiskPartitionDuplication = conf.isAllowMultipleDirsUnderSameDiskPartition();
final MutableBoolean isDuplicationFoundAndNotAllowed = new MutableBoolean(false);
Map<FileStore, List<File>> fileStoreDirsMap = new HashMap<FileStore, List<File>>();
for (File dir : dirs) {
FileStore fileStore;
try {
fileStore = Files.getFileStore(dir.toPath());
} catch (IOException e) {
LOG.error("Got IOException while trying to FileStore of {}", dir);
throw new BookieException.DiskPartitionDuplicationException(e);
}
if (fileStoreDirsMap.containsKey(fileStore)) {
fileStoreDirsMap.get(fileStore).add(dir);
} else {
List<File> dirsList = new ArrayList<File>();
dirsList.add(dir);
fileStoreDirsMap.put(fileStore, dirsList);
}
}
fileStoreDirsMap.forEach((fileStore, dirsList) -> {
if (dirsList.size() > 1) {
if (allowDiskPartitionDuplication) {
LOG.warn("Dirs: {} are in same DiskPartition/FileSystem: {}", dirsList, fileStore);
} else {
LOG.error("Dirs: {} are in same DiskPartition/FileSystem: {}", dirsList, fileStore);
isDuplicationFoundAndNotAllowed.setValue(true);
}
}
});
if (isDuplicationFoundAndNotAllowed.getValue()) {
throw new BookieException.DiskPartitionDuplicationException();
}
}
static List<BookieId> possibleBookieIds(ServerConfiguration conf)
throws BookieException {
// we need to loop through all possible bookie identifiers to ensure it is treated as a new environment
// just because of bad configuration
List<BookieId> addresses = Lists.newArrayListWithExpectedSize(3);
// we are checking all possibilities here, so we don't need to fail if we can only get
// loopback address. it will fail anyway when the bookie attempts to listen on loopback address.
try {
// ip address
addresses.add(getBookieAddress(
new ServerConfiguration(conf)
.setUseHostNameAsBookieID(false)
.setAdvertisedAddress(null)
.setAllowLoopback(true)
).toBookieId());
// host name
addresses.add(getBookieAddress(
new ServerConfiguration(conf)
.setUseHostNameAsBookieID(true)
.setAdvertisedAddress(null)
.setAllowLoopback(true)
).toBookieId());
// advertised address
if (null != conf.getAdvertisedAddress()) {
addresses.add(getBookieAddress(conf).toBookieId());
}
if (null != conf.getBookieId()) {
addresses.add(BookieId.parse(conf.getBookieId()));
}
} catch (UnknownHostException e) {
throw new UnknownBookieIdException(e);
}
return addresses;
}
static Versioned<Cookie> readAndVerifyCookieFromRegistrationManager(
Cookie masterCookie, RegistrationManager rm,
List<BookieId> addresses, boolean allowExpansion)
throws BookieException {
Versioned<Cookie> rmCookie = null;
for (BookieId address : addresses) {
try {
rmCookie = Cookie.readFromRegistrationManager(rm, address);
// If allowStorageExpansion option is set, we should
// make sure that the new set of ledger/index dirs
// is a super set of the old; else, we fail the cookie check
if (allowExpansion) {
masterCookie.verifyIsSuperSet(rmCookie.getValue());
} else {
masterCookie.verify(rmCookie.getValue());
}
} catch (CookieNotFoundException e) {
continue;
}
}
return rmCookie;
}
private static Pair<List<File>, List<Cookie>> verifyAndGetMissingDirs(
Cookie masterCookie, boolean allowExpansion, List<File> dirs)
throws InvalidCookieException, IOException {
List<File> missingDirs = Lists.newArrayList();
List<Cookie> existedCookies = Lists.newArrayList();
for (File dir : dirs) {
checkDirectoryStructure(dir);
try {
Cookie c = Cookie.readFromDirectory(dir);
if (allowExpansion) {
masterCookie.verifyIsSuperSet(c);
} else {
masterCookie.verify(c);
}
existedCookies.add(c);
} catch (FileNotFoundException fnf) {
missingDirs.add(dir);
}
}
return Pair.of(missingDirs, existedCookies);
}
private static void stampNewCookie(ServerConfiguration conf,
Cookie masterCookie,
RegistrationManager rm,
Version version,
List<File> journalDirectories,
List<File> allLedgerDirs)
throws BookieException, IOException {
// backfill all the directories that miss cookies (for storage expansion)
LOG.info("Stamping new cookies on all dirs {} {}",
journalDirectories, allLedgerDirs);
for (File journalDirectory : journalDirectories) {
masterCookie.writeToDirectory(journalDirectory);
}
for (File dir : allLedgerDirs) {
masterCookie.writeToDirectory(dir);
}
masterCookie.writeToRegistrationManager(rm, conf, version);
}
public static void checkEnvironmentWithStorageExpansion(
ServerConfiguration conf,
MetadataBookieDriver metadataDriver,
List<File> journalDirectories,
List<File> allLedgerDirs) throws BookieException {
RegistrationManager rm = metadataDriver.getRegistrationManager();
try {
// 1. retrieve the instance id
String instanceId = rm.getClusterInstanceId();
// 2. build the master cookie from the configuration
Cookie.Builder builder = Cookie.generateCookie(conf);
if (null != instanceId) {
builder.setInstanceId(instanceId);
}
Cookie masterCookie = builder.build();
boolean allowExpansion = conf.getAllowStorageExpansion();
// 3. read the cookie from registration manager. it is the `source-of-truth` of a given bookie.
// if it doesn't exist in registration manager, this bookie is a new bookie, otherwise it is
// an old bookie.
List<BookieId> possibleBookieIds = possibleBookieIds(conf);
final Versioned<Cookie> rmCookie = readAndVerifyCookieFromRegistrationManager(
masterCookie, rm, possibleBookieIds, allowExpansion);
// 4. check if the cookie appear in all the directories.
List<File> missedCookieDirs = new ArrayList<>();
List<Cookie> existingCookies = Lists.newArrayList();
if (null != rmCookie) {
existingCookies.add(rmCookie.getValue());
}
// 4.1 verify the cookies in journal directories
Pair<List<File>, List<Cookie>> journalResult =
verifyAndGetMissingDirs(masterCookie,
allowExpansion, journalDirectories);
missedCookieDirs.addAll(journalResult.getLeft());
existingCookies.addAll(journalResult.getRight());
// 4.2. verify the cookies in ledger directories
Pair<List<File>, List<Cookie>> ledgerResult =
verifyAndGetMissingDirs(masterCookie,
allowExpansion, allLedgerDirs);
missedCookieDirs.addAll(ledgerResult.getLeft());
existingCookies.addAll(ledgerResult.getRight());
// 5. if there are directories missing cookies,
// this is either a:
// - new environment
// - a directory is being added
// - a directory has been corrupted/wiped, which is an error
if (!missedCookieDirs.isEmpty()) {
if (rmCookie == null) {
// 5.1 new environment: all directories should be empty
verifyDirsForNewEnvironment(missedCookieDirs);
stampNewCookie(conf, masterCookie, rm, Version.NEW,
journalDirectories, allLedgerDirs);
} else if (allowExpansion) {
// 5.2 storage is expanding
Set<File> knownDirs = getKnownDirs(existingCookies);
verifyDirsForStorageExpansion(missedCookieDirs, knownDirs);
stampNewCookie(conf, masterCookie,
rm, rmCookie.getVersion(),
journalDirectories, allLedgerDirs);
} else {
// 5.3 Cookie-less directories and
// we can't do anything with them
LOG.error("There are directories without a cookie,"
+ " and this is neither a new environment,"
+ " nor is storage expansion enabled. "
+ "Empty directories are {}", missedCookieDirs);
throw new InvalidCookieException();
}
} else {
if (rmCookie == null) {
// No corresponding cookie found in registration manager. The bookie should fail to come up.
LOG.error("Cookie for this bookie is not stored in metadata store. Bookie failing to come up");
throw new InvalidCookieException();
}
}
} catch (IOException ioe) {
LOG.error("Error accessing cookie on disks", ioe);
throw new BookieException.InvalidCookieException(ioe);
}
}
private static void verifyDirsForNewEnvironment(List<File> missedCookieDirs)
throws InvalidCookieException {
List<File> nonEmptyDirs = new ArrayList<>();
for (File dir : missedCookieDirs) {
String[] content = dir.list();
if (content != null && content.length != 0) {
nonEmptyDirs.add(dir);
}
}
if (!nonEmptyDirs.isEmpty()) {
LOG.error("Not all the new directories are empty. New directories that are not empty are: " + nonEmptyDirs);
throw new InvalidCookieException();
}
}
private static Set<File> getKnownDirs(List<Cookie> cookies) {
return cookies.stream()
.flatMap((c) -> Arrays.stream(c.getLedgerDirPathsFromCookie()))
.map((s) -> new File(s))
.collect(Collectors.toSet());
}
private static void verifyDirsForStorageExpansion(
List<File> missedCookieDirs,
Set<File> existingLedgerDirs) throws InvalidCookieException {
List<File> dirsMissingData = new ArrayList<File>();
List<File> nonEmptyDirs = new ArrayList<File>();
for (File dir : missedCookieDirs) {
if (existingLedgerDirs.contains(dir.getParentFile())) {
// if one of the existing ledger dirs doesn't have cookie,
// let us not proceed further
dirsMissingData.add(dir);
continue;
}
String[] content = dir.list();
if (content != null && content.length != 0) {
nonEmptyDirs.add(dir);
}
}
if (dirsMissingData.size() > 0 || nonEmptyDirs.size() > 0) {
LOG.error("Either not all local directories have cookies or directories being added "
+ " newly are not empty. "
+ "Directories missing cookie file are: " + dirsMissingData
+ " New directories that are not empty are: " + nonEmptyDirs);
throw new InvalidCookieException();
}
}
public static BookieId getBookieId(ServerConfiguration conf) throws UnknownHostException {
String customBookieId = conf.getBookieId();
if (customBookieId != null) {
return BookieId.parse(customBookieId);
}
return getBookieAddress(conf).toBookieId();
}
/**
* Return the configured address of the bookie.
*/
public static BookieSocketAddress getBookieAddress(ServerConfiguration conf)
throws UnknownHostException {
// Advertised address takes precedence over the listening interface and the
// useHostNameAsBookieID settings
if (conf.getAdvertisedAddress() != null && conf.getAdvertisedAddress().trim().length() > 0) {
String hostAddress = conf.getAdvertisedAddress().trim();
return new BookieSocketAddress(hostAddress, conf.getBookiePort());
}
String iface = conf.getListeningInterface();
if (iface == null) {
iface = "default";
}
String hostName = DNS.getDefaultHost(iface);
InetSocketAddress inetAddr = new InetSocketAddress(hostName, conf.getBookiePort());
if (inetAddr.isUnresolved()) {
throw new UnknownHostException("Unable to resolve default hostname: "
+ hostName + " for interface: " + iface);
}
String hostAddress = null;
InetAddress iAddress = inetAddr.getAddress();
if (conf.getUseHostNameAsBookieID()) {
hostAddress = iAddress.getCanonicalHostName();
if (conf.getUseShortHostName()) {
/*
* if short hostname is used, then FQDN is not used. Short
* hostname is the hostname cut at the first dot.
*/
hostAddress = hostAddress.split("\\.", 2)[0];
}
} else {
hostAddress = iAddress.getHostAddress();
}
BookieSocketAddress addr =
new BookieSocketAddress(hostAddress, conf.getBookiePort());
if (addr.getSocketAddress().getAddress().isLoopbackAddress()
&& !conf.getAllowLoopback()) {
throw new UnknownHostException("Trying to listen on loopback address, "
+ addr + " but this is forbidden by default "
+ "(see ServerConfiguration#getAllowLoopback()).\n"
+ "If this happen, you can consider specifying the network interface"
+ " to listen on (e.g. listeningInterface=eth0) or specifying the"
+ " advertised address (e.g. advertisedAddress=172.x.y.z)");
}
return addr;
}
public LedgerDirsManager getLedgerDirsManager() {
return ledgerDirsManager;
}
LedgerDirsManager getIndexDirsManager() {
return indexDirsManager;
}
public long getTotalDiskSpace() throws IOException {
return getLedgerDirsManager().getTotalDiskSpace(ledgerDirsManager.getAllLedgerDirs());
}
public long getTotalFreeSpace() throws IOException {
return getLedgerDirsManager().getTotalFreeSpace(ledgerDirsManager.getAllLedgerDirs());
}
public static File getCurrentDirectory(File dir) {
return new File(dir, BookKeeperConstants.CURRENT_DIR);
}
public static File[] getCurrentDirectories(File[] dirs) {
File[] currentDirs = new File[dirs.length];
for (int i = 0; i < dirs.length; i++) {
currentDirs[i] = getCurrentDirectory(dirs[i]);
}
return currentDirs;
}
public BookieImpl(ServerConfiguration conf)
throws IOException, InterruptedException, BookieException {
this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT, new SimpleBookieServiceInfoProvider(conf));
}
private static LedgerStorage buildLedgerStorage(ServerConfiguration conf) throws IOException {
// Instantiate the ledger storage implementation
String ledgerStorageClass = conf.getLedgerStorageClass();
LOG.info("Using ledger storage: {}", ledgerStorageClass);
return LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
}
/**
* Initialize LedgerStorage instance without checkpointing for use within the shell
* and other RO users. ledgerStorage must not have already been initialized.
*
* <p>The caller is responsible for disposing of the ledgerStorage object.
*
* @param conf Bookie config.
* @param ledgerStorage Instance to initialize.
* @return Passed ledgerStorage instance
* @throws IOException
*/
public static LedgerStorage mountLedgerStorageOffline(ServerConfiguration conf, LedgerStorage ledgerStorage)
throws IOException {
StatsLogger statsLogger = NullStatsLogger.INSTANCE;
DiskChecker diskChecker = new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
LedgerDirsManager ledgerDirsManager = createLedgerDirsManager(
conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
LedgerDirsManager indexDirsManager = createIndexDirsManager(
conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE), ledgerDirsManager);
if (null == ledgerStorage) {
ledgerStorage = buildLedgerStorage(conf);
}
CheckpointSource checkpointSource = new CheckpointSource() {
@Override
public Checkpoint newCheckpoint() {
return Checkpoint.MAX;
}
@Override
public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
}
};
Checkpointer checkpointer = Checkpointer.NULL;
ledgerStorage.initialize(
conf,
null,
ledgerDirsManager,
indexDirsManager,
null,
checkpointSource,
checkpointer,
statsLogger,
UnpooledByteBufAllocator.DEFAULT);
return ledgerStorage;
}
public BookieImpl(ServerConfiguration conf, StatsLogger statsLogger,
ByteBufAllocator allocator, Supplier<BookieServiceInfo> bookieServiceInfoProvider)
throws IOException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
this.bookieServiceInfoProvider = bookieServiceInfoProvider;
this.statsLogger = statsLogger;
this.conf = conf;
this.journalDirectories = Lists.newArrayList();
for (File journalDirectory : conf.getJournalDirs()) {
this.journalDirectories.add(getCurrentDirectory(journalDirectory));
}
DiskChecker diskChecker = createDiskChecker(conf);
this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
this.ledgerDirsManager);
this.writeDataToJournal = conf.getJournalWriteData();
this.allocator = allocator;
// instantiate zookeeper client to initialize ledger manager
this.metadataDriver = instantiateMetadataDriver(conf);
checkEnvironment(this.metadataDriver);
try {
if (this.metadataDriver != null) {
// current the registration manager is zookeeper only
ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
ledgerManager = ledgerManagerFactory.newLedgerManager();
} else {
ledgerManagerFactory = null;
ledgerManager = null;
}
} catch (MetadataException e) {
throw new MetadataStoreException("Failed to initialize ledger manager", e);
}
stateManager = initializeStateManager();
// register shutdown handler using trigger mode
stateManager.setShutdownHandler(exitCode -> triggerBookieShutdown(exitCode));
// Initialise dirsMonitor. This would look through all the
// configured directories. When disk errors or all the ledger
// directories are full, would throws exception and fail bookie startup.
List<LedgerDirsManager> dirsManagers = new ArrayList<>();
dirsManagers.add(ledgerDirsManager);
if (indexDirsManager != ledgerDirsManager) {
dirsManagers.add(indexDirsManager);
}
this.dirsMonitor = new LedgerDirsMonitor(conf, diskChecker, dirsManagers);
try {
this.dirsMonitor.init();
} catch (NoWritableLedgerDirException nle) {
// start in read-only mode if no writable dirs and read-only allowed
if (!conf.isReadOnlyModeEnabled()) {
throw nle;
} else {
this.stateManager.transitionToReadOnlyMode();
}
}
// instantiate the journals
journals = Lists.newArrayList();
for (int i = 0; i < journalDirectories.size(); i++) {
journals.add(new Journal(i, journalDirectories.get(i),
conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
}
this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
CheckpointSource checkpointSource = new CheckpointSourceList(journals);
ledgerStorage = buildLedgerStorage(conf);
boolean isDbLedgerStorage = ledgerStorage instanceof DbLedgerStorage;
/*
* with this change https://github.com/apache/bookkeeper/pull/677,
* LedgerStorage drives the checkpoint logic.
*
* <p>There are two exceptions:
*
* 1) with multiple entry logs, checkpoint logic based on a entry log is
* not possible, hence it needs to be timebased recurring thing and
* it is driven by SyncThread. SyncThread.start does that and it is
* started in Bookie.start method.
*
* 2) DbLedgerStorage
*/
if (entryLogPerLedgerEnabled || isDbLedgerStorage) {
syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource) {
@Override
public void startCheckpoint(Checkpoint checkpoint) {
/*
* in the case of entryLogPerLedgerEnabled, LedgerStorage
* dont drive checkpoint logic, but instead it is done
* periodically by SyncThread. So startCheckpoint which
* will be called by LedgerStorage will be no-op.
*/
}
@Override
public void start() {
executor.scheduleAtFixedRate(() -> {
doCheckpoint(checkpointSource.newCheckpoint());
}, conf.getFlushInterval(), conf.getFlushInterval(), TimeUnit.MILLISECONDS);
}
};
} else {
syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
}
ledgerStorage.initialize(
conf,
ledgerManager,
ledgerDirsManager,
indexDirsManager,
stateManager,
checkpointSource,
syncThread,
statsLogger,
allocator);
handles = new HandleFactoryImpl(ledgerStorage);
// Expose Stats
this.bookieStats = new BookieStats(statsLogger);
journalMemoryMaxStats = new Gauge<Long>() {
final long journalMaxMemory = conf.getJournalMaxMemorySizeMb() * 1024 * 1024;
@Override
public Long getDefaultValue() {
return journalMaxMemory;
}
@Override
public Long getSample() {
return journalMaxMemory;
}
};
statsLogger.scope(JOURNAL_SCOPE).registerGauge(JOURNAL_MEMORY_MAX, journalMemoryMaxStats);
journalMemoryUsedStats = new Gauge<Long>() {
@Override
public Long getDefaultValue() {
return -1L;
}
@Override
public Long getSample() {
long totalMemory = 0L;
for (int i = 0; i < journals.size(); i++) {
totalMemory += journals.get(i).getMemoryUsage();
}
return totalMemory;
}
};
statsLogger.scope(JOURNAL_SCOPE).registerGauge(JOURNAL_MEMORY_USED, journalMemoryUsedStats);
}
StateManager initializeStateManager() throws IOException {
return new BookieStateManager(conf, statsLogger, metadataDriver,
ledgerDirsManager, bookieServiceInfoProvider);
}
void readJournal() throws IOException, BookieException {
long startTs = System.currentTimeMillis();
JournalScanner scanner = new JournalScanner() {
@Override
public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
long ledgerId = recBuff.getLong();
long entryId = recBuff.getLong();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Replay journal - ledger id : {}, entry id : {}.", ledgerId, entryId);
}
if (entryId == METAENTRY_ID_LEDGER_KEY) {
if (journalVersion >= JournalChannel.V3) {
int masterKeyLen = recBuff.getInt();
byte[] masterKey = new byte[masterKeyLen];
recBuff.get(masterKey);
masterKeyCache.put(ledgerId, masterKey);
// Force to re-insert the master key in ledger storage
handles.getHandle(ledgerId, masterKey);
} else {
throw new IOException("Invalid journal. Contains journalKey "
+ " but layout version (" + journalVersion
+ ") is too old to hold this");
}
} else if (entryId == METAENTRY_ID_FENCE_KEY) {
if (journalVersion >= JournalChannel.V4) {
byte[] key = masterKeyCache.get(ledgerId);
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
handle.setFenced();
} else {
throw new IOException("Invalid journal. Contains fenceKey "
+ " but layout version (" + journalVersion
+ ") is too old to hold this");
}
} else if (entryId == METAENTRY_ID_LEDGER_EXPLICITLAC) {
if (journalVersion >= JournalChannel.V6) {
int explicitLacBufLength = recBuff.getInt();
ByteBuf explicitLacBuf = Unpooled.buffer(explicitLacBufLength);
byte[] explicitLacBufArray = new byte[explicitLacBufLength];
recBuff.get(explicitLacBufArray);
explicitLacBuf.writeBytes(explicitLacBufArray);
byte[] key = masterKeyCache.get(ledgerId);
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
handle.setExplicitLac(explicitLacBuf);
} else {
throw new IOException("Invalid journal. Contains explicitLAC " + " but layout version ("
+ journalVersion + ") is too old to hold this");
}
} else if (entryId < 0) {
/*
* this is possible if bookie code binary is rolledback
* to older version but when it is trying to read
* Journal which was created previously using newer
* code/journalversion, which introduced new special
* entry. So in anycase, if we see unrecognizable
* special entry while replaying journal we should skip
* (ignore) it.
*/
LOG.warn("Read unrecognizable entryId: {} for ledger: {} while replaying Journal. Skipping it",
entryId, ledgerId);
} else {
byte[] key = masterKeyCache.get(ledgerId);
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
recBuff.rewind();
handle.addEntry(Unpooled.wrappedBuffer(recBuff));
}
} catch (NoLedgerException nsle) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId);
}
} catch (BookieException be) {
throw new IOException(be);
}
}
};
for (Journal journal : journals) {
replay(journal, scanner);
}
long elapsedTs = System.currentTimeMillis() - startTs;
LOG.info("Finished replaying journal in {} ms.", elapsedTs);
}
/**
* Replay journal files and updates journal's in-memory lastLogMark object.
*
* @param journal Journal object corresponding to a journalDir
* @param scanner Scanner to process replayed entries.
* @throws IOException
*/
private void replay(Journal journal, JournalScanner scanner) throws IOException {
final LogMark markedLog = journal.getLastLogMark().getCurMark();
List<Long> logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId ->
journalId >= markedLog.getLogFileId());
// last log mark may be missed due to no sync up before
// validate filtered log ids only when we have markedLogId
if (markedLog.getLogFileId() > 0) {
if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
}
}
// TODO: When reading in the journal logs that need to be synced, we
// should use BufferedChannels instead to minimize the amount of
// system calls done.
for (Long id : logs) {
long logPosition = 0L;
if (id == markedLog.getLogFileId()) {
logPosition = markedLog.getLogFileOffset();
}
LOG.info("Replaying journal {} from position {}", id, logPosition);
long scanOffset = journal.scanJournal(id, logPosition, scanner);
// Update LastLogMark after completely replaying journal
// scanOffset will point to EOF position
// After LedgerStorage flush, SyncThread should persist this to disk
journal.setLastLogMark(id, scanOffset);
}
}
@Override
public synchronized void start() {
setDaemon(true);
if (LOG.isDebugEnabled()) {
LOG.debug("I'm starting a bookie with journal directories {}",
journalDirectories.stream().map(File::getName).collect(Collectors.joining(", ")));
}
//Start DiskChecker thread
dirsMonitor.start();
// replay journals
try {
readJournal();
} catch (IOException | BookieException ioe) {
LOG.error("Exception while replaying journals, shutting down", ioe);
shutdown(ExitCode.BOOKIE_EXCEPTION);
return;
}
// Do a fully flush after journal replay
try {
syncThread.requestFlush().get();
} catch (InterruptedException e) {
LOG.warn("Interrupting the fully flush after replaying journals : ", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error on executing a fully flush after replaying journals.");
shutdown(ExitCode.BOOKIE_EXCEPTION);
return;
}
if (conf.isLocalConsistencyCheckOnStartup()) {
LOG.info("Running local consistency check on startup prior to accepting IO.");
List<LedgerStorage.DetectedInconsistency> errors = null;
try {
errors = ledgerStorage.localConsistencyCheck(Optional.empty());
} catch (IOException e) {
LOG.error("Got a fatal exception while checking store", e);
shutdown(ExitCode.BOOKIE_EXCEPTION);
return;
}
if (errors != null && errors.size() > 0) {
LOG.error("Bookie failed local consistency check:");
for (LedgerStorage.DetectedInconsistency error : errors) {
LOG.error("Ledger {}, entry {}: ", error.getLedgerId(), error.getEntryId(), error.getException());
}
shutdown(ExitCode.BOOKIE_EXCEPTION);
return;
}
}
LOG.info("Finished reading journal, starting bookie");
/*
* start sync thread first, so during replaying journals, we could do
* checkpoint which reduce the chance that we need to replay journals
* again if bookie restarted again before finished journal replays.
*/
syncThread.start();
// start bookie thread
super.start();
// After successful bookie startup, register listener for disk
// error/full notifications.
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
if (indexDirsManager != ledgerDirsManager) {
indexDirsManager.addLedgerDirsListener(getLedgerDirsListener());
}
ledgerStorage.start();
// check the bookie status to start with, and set running.
// since bookie server use running as a flag to tell bookie server whether it is alive
// if setting it in bookie thread, the watcher might run before bookie thread.
stateManager.initState();
try {
stateManager.registerBookie(true).get();
} catch (Exception e) {
LOG.error("Couldn't register bookie with zookeeper, shutting down : ", e);
shutdown(ExitCode.ZK_REG_FAIL);
}
}
/*
* Get the DiskFailure listener for the bookie
*/
private LedgerDirsListener getLedgerDirsListener() {
return new LedgerDirsListener() {
@Override
public void diskFailed(File disk) {
// Shutdown the bookie on disk failure.
triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
}
@Override
public void allDisksFull(boolean highPriorityWritesAllowed) {
// Transition to readOnly mode on all disks full
stateManager.setHighPriorityWritesAvailability(highPriorityWritesAllowed);
stateManager.transitionToReadOnlyMode();
}
@Override
public void fatalError() {
LOG.error("Fatal error reported by ledgerDirsManager");
triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
}
@Override
public void diskWritable(File disk) {
// Transition to writable mode when a disk becomes writable again.
stateManager.setHighPriorityWritesAvailability(true);
stateManager.transitionToWritableMode();
}
@Override
public void diskJustWritable(File disk) {
// Transition to writable mode when a disk becomes writable again.
stateManager.setHighPriorityWritesAvailability(true);
stateManager.transitionToWritableMode();
}
};
}
/**
* Instantiate the metadata driver for the Bookie.
*/
private MetadataBookieDriver instantiateMetadataDriver(ServerConfiguration conf) throws BookieException {
try {
String metadataServiceUriStr = conf.getMetadataServiceUri();
if (null == metadataServiceUriStr) {
return null;
}
MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
URI.create(metadataServiceUriStr));
driver.initialize(
conf,
() -> {
stateManager.forceToUnregistered();
// schedule a re-register operation
stateManager.registerBookie(false);
},
statsLogger);
return driver;
} catch (MetadataException me) {
throw new MetadataStoreException("Failed to initialize metadata bookie driver", me);
} catch (ConfigurationException e) {
throw new BookieIllegalOpException(e);
}
}
/*
* Check whether Bookie is writable.
*/
public boolean isReadOnly() {
return stateManager.isReadOnly();
}
/**
* Check whether Bookie is available for high priority writes.
*
* @return true if the bookie is able to take high priority writes.
*/
public boolean isAvailableForHighPriorityWrites() {
return stateManager.isAvailableForHighPriorityWrites();
}
public boolean isRunning() {
return stateManager.isRunning();
}
@Override
public void run() {
// bookie thread wait for journal thread
try {
// start journals
for (Journal journal: journals) {
journal.start();
}
// wait until journal quits
for (Journal journal: journals) {
journal.joinThread();
}
LOG.info("Journal thread(s) quit.");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted on running journal thread : ", ie);
}
// if the journal thread quits due to shutting down, it is ok
if (!stateManager.isShuttingDown()) {
// some error found in journal thread and it quits
// following add operations to it would hang unit client timeout
// so we should let bookie server exists
LOG.error("Journal manager quits unexpectedly.");
triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
}
}
// Triggering the Bookie shutdown in its own thread,
// because shutdown can be called from sync thread which would be
// interrupted by shutdown call.
AtomicBoolean shutdownTriggered = new AtomicBoolean(false);
void triggerBookieShutdown(final int exitCode) {
if (!shutdownTriggered.compareAndSet(false, true)) {
return;
}
LOG.info("Triggering shutdown of Bookie-{} with exitCode {}",
conf.getBookiePort(), exitCode);
BookieThread th = new BookieThread("BookieShutdownTrigger") {
@Override
public void run() {
BookieImpl.this.shutdown(exitCode);
}
};
th.start();
}
// provided a public shutdown method for other caller
// to shut down bookie gracefully
public int shutdown() {
return shutdown(ExitCode.OK);
}
// internal shutdown method to let shutdown bookie gracefully
// when encountering exception
synchronized int shutdown(int exitCode) {
try {
if (isRunning()) { // avoid shutdown twice
// the exitCode only set when first shutdown usually due to exception found
LOG.info("Shutting down Bookie-{} with exitCode {}",
conf.getBookiePort(), exitCode);
if (this.exitCode == ExitCode.OK) {
this.exitCode = exitCode;
}
stateManager.forceToShuttingDown();
// turn bookie to read only during shutting down process
LOG.info("Turning bookie to read only during shut down");
stateManager.forceToReadOnly();
// Shutdown Sync thread
syncThread.shutdown();
// Shutdown journals
for (Journal journal : journals) {
journal.shutdown();
}
this.join();
// Shutdown the EntryLogger which has the GarbageCollector Thread running
ledgerStorage.shutdown();
// close Ledger Manager
try {
if (null != ledgerManager) {
ledgerManager.close();
}
if (null != ledgerManagerFactory) {
ledgerManagerFactory.close();
}
} catch (IOException ie) {
LOG.error("Failed to close active ledger manager : ", ie);
}
//Shutdown disk checker
dirsMonitor.shutdown();
}
// Shutdown the ZK client
if (metadataDriver != null) {
metadataDriver.close();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted during shutting down bookie : ", ie);
} catch (Exception e) {
LOG.error("Got Exception while trying to shutdown Bookie", e);
throw e;
} finally {
// setting running to false here, so watch thread
// in bookie server know it only after bookie shut down
stateManager.close();
}
return this.exitCode;
}
/**
* Retrieve the ledger descriptor for the ledger which entry should be added to.
* The LedgerDescriptor returned from this method should be eventually freed with
* #putHandle().
*
* @throws BookieException if masterKey does not match the master key of the ledger
*/
@VisibleForTesting
LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
throws IOException, BookieException {
final long ledgerId = entry.getLong(entry.readerIndex());
return handles.getHandle(ledgerId, masterKey);
}
private Journal getJournal(long ledgerId) {
return journals.get(MathUtils.signSafeMod(ledgerId, journals.size()));
}
/**
* Add an entry to a ledger as specified by handle.
*/
private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException, InterruptedException {
long ledgerId = handle.getLedgerId();
long entryId = handle.addEntry(entry);
bookieStats.getWriteBytes().add(entry.readableBytes());
// journal `addEntry` should happen after the entry is added to ledger storage.
// otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage.
if (masterKeyCache.get(ledgerId) == null) {
// Force the load into masterKey cache
byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
if (oldValue == null) {
// new handle, we should add the key to journal ensure we can rebuild
ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
bb.putLong(ledgerId);
bb.putLong(METAENTRY_ID_LEDGER_KEY);
bb.putInt(masterKey.length);
bb.put(masterKey);
bb.flip();
getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync */, new NopWriteCallback(), null);
}
}
if (!writeDataToJournal) {
cb.writeComplete(0, ledgerId, entryId, null, ctx);
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Adding {}@{}", entryId, ledgerId);
}
getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
}
/**
* Add entry to a ledger, even if the ledger has previous been fenced. This should only
* happen in bookie recovery or ledger recovery cases, where entries are being replicates
* so that they exist on a quorum of bookies. The corresponding client side call for this
* is not exposed to users.
*/
public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException, InterruptedException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
try {
LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
synchronized (handle) {
entrySize = entry.readableBytes();
addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey);
}
success = true;
} catch (NoWritableLedgerDirException e) {
stateManager.transitionToReadOnlyMode();
throw new IOException(e);
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
if (success) {
bookieStats.getRecoveryAddEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getAddBytesStats().registerSuccessfulValue(entrySize);
} else {
bookieStats.getRecoveryAddEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getAddBytesStats().registerFailedValue(entrySize);
}
entry.release();
}
}
private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + explicitLac.capacity());
bb.writeLong(ledgerId);
bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
bb.writeInt(explicitLac.capacity());
bb.writeBytes(explicitLac);
return bb;
}
public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte[] masterKey)
throws IOException, InterruptedException, BookieException {
try {
long ledgerId = entry.getLong(entry.readerIndex());
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
synchronized (handle) {
entry.markReaderIndex();
handle.setExplicitLac(entry);
entry.resetReaderIndex();
ByteBuf explicitLACEntry = createExplicitLACEntry(ledgerId, entry);
getJournal(ledgerId).logAddEntry(explicitLACEntry, false /* ackBeforeSync */, writeCallback, ctx);
}
} catch (NoWritableLedgerDirException e) {
stateManager.transitionToReadOnlyMode();
throw new IOException(e);
}
}
public ByteBuf getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException {
ByteBuf lac;
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
synchronized (handle) {
lac = handle.getExplicitLac();
}
return lac;
}
/**
* Force sync given 'ledgerId' entries on the journal to the disk.
* It works like a regular addEntry with ackBeforeSync=false.
* This is useful for ledgers with DEFERRED_SYNC write flag.
*/
public void forceLedger(long ledgerId, WriteCallback cb,
Object ctx) {
if (LOG.isTraceEnabled()) {
LOG.trace("Forcing ledger {}", ledgerId);
}
Journal journal = getJournal(ledgerId);
journal.forceLedger(ledgerId, cb, ctx);
bookieStats.getForceLedgerOps().inc();
}
/**
* Add entry to a ledger.
*/
public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException, InterruptedException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
try {
LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
synchronized (handle) {
if (handle.isFenced()) {
throw BookieException
.create(BookieException.Code.LedgerFencedException);
}
entrySize = entry.readableBytes();
addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey);
}
success = true;
} catch (NoWritableLedgerDirException e) {
stateManager.transitionToReadOnlyMode();
throw new IOException(e);
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
if (success) {
bookieStats.getAddEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getAddBytesStats().registerSuccessfulValue(entrySize);
} else {
bookieStats.getAddEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getAddBytesStats().registerFailedValue(entrySize);
}
entry.release();
}
}
/**
* Fences a ledger. From this point on, clients will be unable to
* write to this ledger. Only recoveryAddEntry will be
* able to add entries to the ledger.
* This method is idempotent. Once a ledger is fenced, it can
* never be unfenced. Fencing a fenced ledger has no effect.
* @return
*/
public CompletableFuture<Boolean> fenceLedger(long ledgerId, byte[] masterKey)
throws IOException, BookieException {
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
return handle.fenceAndLogInJournal(getJournal(ledgerId));
}
public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
try {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
if (LOG.isTraceEnabled()) {
LOG.trace("Reading {}@{}", entryId, ledgerId);
}
ByteBuf entry = handle.readEntry(entryId);
bookieStats.getReadBytes().add(entry.readableBytes());
success = true;
return entry;
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
if (success) {
bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getReadBytesStats().registerSuccessfulValue(entrySize);
} else {
bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getReadEntryStats().registerFailedValue(entrySize);
}
}
}
public long readLastAddConfirmed(long ledgerId) throws IOException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
return handle.getLastAddConfirmed();
}
public boolean waitForLastAddConfirmedUpdate(long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher);
}
public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
handle.cancelWaitForLastAddConfirmedUpdate(watcher);
}
@VisibleForTesting
public LedgerStorage getLedgerStorage() {
return ledgerStorage;
}
@VisibleForTesting
public BookieStateManager getStateManager() {
return (BookieStateManager) this.stateManager;
}
@VisibleForTesting
public LedgerManagerFactory getLedgerManagerFactory() {
return ledgerManagerFactory;
}
// The rest of the code is test stuff
static class CounterCallback implements WriteCallback {
int count;
@Override
public synchronized void writeComplete(int rc, long l, long e, BookieId addr, Object ctx) {
count--;
if (count == 0) {
notifyAll();
}
}
public synchronized void incCount() {
count++;
}
public synchronized void waitZero() throws InterruptedException {
while (count > 0) {
wait();
}
}
}
public ByteBufAllocator getAllocator() {
return allocator;
}
/**
* Format the bookie server data.
*
* @param conf ServerConfiguration
* @param isInteractive Whether format should ask prompt for confirmation if old data exists or not.
* @param force If non interactive and force is true, then old data will be removed without confirm prompt.
* @return Returns true if the format is success else returns false
*/
public static boolean format(ServerConfiguration conf,
boolean isInteractive, boolean force) {
for (File journalDir : conf.getJournalDirs()) {
String[] journalDirFiles =
journalDir.exists() && journalDir.isDirectory() ? journalDir.list() : null;
if (journalDirFiles != null && journalDirFiles.length != 0) {
try {
boolean confirm = false;
if (!isInteractive) {
// If non interactive and force is set, then delete old
// data.
confirm = force;
} else {
confirm = IOUtils
.confirmPrompt("Are you sure to format Bookie data..?");
}
if (!confirm) {
LOG.error("Bookie format aborted!!");
return false;
}
} catch (IOException e) {
LOG.error("Error during bookie format", e);
return false;
}
}
if (!cleanDir(journalDir)) {
LOG.error("Formatting journal directory failed");
return false;
}
}
File[] ledgerDirs = conf.getLedgerDirs();
for (File dir : ledgerDirs) {
if (!cleanDir(dir)) {
LOG.error("Formatting ledger directory " + dir + " failed");
return false;
}
}
// Clean up index directories if they are separate from the ledger dirs
File[] indexDirs = conf.getIndexDirs();
if (null != indexDirs) {
for (File dir : indexDirs) {
if (!cleanDir(dir)) {
LOG.error("Formatting index directory " + dir + " failed");
return false;
}
}
}
LOG.info("Bookie format completed successfully");
return true;
}
private static boolean cleanDir(File dir) {
if (dir.exists()) {
File[] files = dir.listFiles();
if (files != null) {
for (File child : files) {
boolean delete = FileUtils.deleteQuietly(child);
if (!delete) {
LOG.error("Not able to delete " + child);
return false;
}
}
}
} else if (!dir.mkdirs()) {
LOG.error("Not able to create the directory " + dir);
return false;
}
return true;
}
/**
* Returns exit code - cause of failure.
*
* @return {@link ExitCode}
*/
public int getExitCode() {
return exitCode;
}
static DiskChecker createDiskChecker(ServerConfiguration conf) {
return new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
}
static LedgerDirsManager createLedgerDirsManager(ServerConfiguration conf, DiskChecker diskChecker,
StatsLogger statsLogger) {
return new LedgerDirsManager(conf, conf.getLedgerDirs(), diskChecker, statsLogger);
}
static LedgerDirsManager createIndexDirsManager(ServerConfiguration conf, DiskChecker diskChecker,
StatsLogger statsLogger, LedgerDirsManager fallback) {
File[] idxDirs = conf.getIndexDirs();
if (null == idxDirs) {
return fallback;
} else {
return new LedgerDirsManager(conf, idxDirs, diskChecker, statsLogger);
}
}
public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
try {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
if (LOG.isTraceEnabled()) {
LOG.trace("GetEntriesOfLedger {}", ledgerId);
}
OfLong entriesOfLedger = handle.getListOfEntriesOfLedger(ledgerId);
success = true;
return entriesOfLedger;
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
if (success) {
bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
} else {
bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
}
}
}
}