blob: 2768aff604ed3c20a6d5ea35015861da0ed3f2bb [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_WRITABLE_DIRS;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class manages ledger directories used by the bookie.
*/
public class LedgerDirsManager {
private static final Logger LOG = LoggerFactory.getLogger(LedgerDirsManager.class);
private volatile List<File> filledDirs;
private final List<File> ledgerDirectories;
private volatile List<File> writableLedgerDirectories;
private final List<LedgerDirsListener> listeners;
private final Random rand = new Random();
private final ConcurrentMap<File, Float> diskUsages =
new ConcurrentHashMap<File, Float>();
private final long entryLogSize;
private long minUsableSizeForEntryLogCreation;
private long minUsableSizeForIndexFileCreation;
private final DiskChecker diskChecker;
public LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker diskChecker) {
this(conf, dirs, diskChecker, NullStatsLogger.INSTANCE);
}
public LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker diskChecker, StatsLogger statsLogger) {
this.ledgerDirectories = Arrays.asList(BookieImpl.getCurrentDirectories(dirs));
this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
this.filledDirs = new ArrayList<File>();
this.listeners = new ArrayList<LedgerDirsListener>();
this.entryLogSize = conf.getEntryLogSizeLimit();
this.minUsableSizeForIndexFileCreation = conf.getMinUsableSizeForIndexFileCreation();
this.minUsableSizeForEntryLogCreation = conf.getMinUsableSizeForEntryLogCreation();
for (File dir : ledgerDirectories) {
diskUsages.put(dir, 0f);
String statName = "dir_" + dir.getParent().replace('/', '_') + "_usage";
final File targetDir = dir;
statsLogger.registerGauge(statName, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
return diskUsages.get(targetDir) * 100;
}
});
}
this.diskChecker = diskChecker;
statsLogger.registerGauge(LD_WRITABLE_DIRS, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
return writableLedgerDirectories.size();
}
});
}
/**
* Get all ledger dirs configured.
*/
public List<File> getAllLedgerDirs() {
return ledgerDirectories;
}
/**
* Get all dir listeners.
*
* @return list of listeners
*/
public List<LedgerDirsListener> getListeners() {
return listeners;
}
/**
* Calculate the total amount of free space available in all of the ledger directories put together.
*
* @return totalDiskSpace in bytes
* @throws IOException
*/
public long getTotalFreeSpace(List<File> dirs) throws IOException {
return diskChecker.getTotalFreeSpace(dirs);
}
/**
* Calculate the total amount of free space available in all of the ledger directories put together.
*
* @return freeDiskSpace in bytes
* @throws IOException
*/
public long getTotalDiskSpace(List<File> dirs) throws IOException {
return diskChecker.getTotalDiskSpace(dirs);
}
/**
* Get disk usages map.
*
* @return disk usages map
*/
public ConcurrentMap<File, Float> getDiskUsages() {
return diskUsages;
}
/**
* Get only writable ledger dirs.
*/
public List<File> getWritableLedgerDirs()
throws NoWritableLedgerDirException {
if (writableLedgerDirectories.isEmpty()) {
String errMsg = "All ledger directories are non writable";
NoWritableLedgerDirException e = new NoWritableLedgerDirException(
errMsg);
throw e;
}
return writableLedgerDirectories;
}
/**
* @return true if the writableLedgerDirs list has entries
*/
public boolean hasWritableLedgerDirs() {
return !writableLedgerDirectories.isEmpty();
}
public List<File> getWritableLedgerDirsForNewLog() throws NoWritableLedgerDirException {
if (!writableLedgerDirectories.isEmpty()) {
return writableLedgerDirectories;
}
// We don't have writable Ledger Dirs. But we are still okay to create new entry log files if we have enough
// disk spaces. This allows bookie can still function at readonly mode. Because compaction, journal replays
// can still write data to disks.
return getDirsAboveUsableThresholdSize(minUsableSizeForEntryLogCreation, true);
}
List<File> getDirsAboveUsableThresholdSize(long thresholdSize, boolean loggingNoWritable)
throws NoWritableLedgerDirException {
List<File> fullLedgerDirsToAccomodate = new ArrayList<File>();
for (File dir: this.ledgerDirectories) {
// Pick dirs which can accommodate little more than thresholdSize
if (dir.getUsableSpace() > thresholdSize) {
fullLedgerDirsToAccomodate.add(dir);
}
}
if (!fullLedgerDirsToAccomodate.isEmpty()) {
if (loggingNoWritable) {
LOG.info("No writable ledger dirs below diskUsageThreshold. "
+ "But Dirs that can accommodate {} are: {}", thresholdSize, fullLedgerDirsToAccomodate);
}
return fullLedgerDirsToAccomodate;
}
// We will reach here when we find no ledgerDir which has atleast
// thresholdSize usable space
String errMsg = "All ledger directories are non writable and no reserved space (" + thresholdSize + ") left.";
NoWritableLedgerDirException e = new NoWritableLedgerDirException(errMsg);
if (loggingNoWritable) {
LOG.error(errMsg, e);
}
throw e;
}
/**
* @return full-filled ledger dirs.
*/
public List<File> getFullFilledLedgerDirs() {
return filledDirs;
}
/**
* Get dirs, which are full more than threshold.
*/
public boolean isDirFull(File dir) {
return filledDirs.contains(dir);
}
/**
* Add the dir to filled dirs list.
*/
@VisibleForTesting
public void addToFilledDirs(File dir) {
if (!filledDirs.contains(dir)) {
LOG.warn(dir + " is out of space. Adding it to filled dirs list");
// Update filled dirs list
List<File> updatedFilledDirs = new ArrayList<File>(filledDirs);
updatedFilledDirs.add(dir);
filledDirs = updatedFilledDirs;
// Update the writable ledgers list
List<File> newDirs = new ArrayList<File>(writableLedgerDirectories);
newDirs.removeAll(filledDirs);
writableLedgerDirectories = newDirs;
// Notify listeners about disk full
for (LedgerDirsListener listener : listeners) {
listener.diskFull(dir);
}
}
}
/**
* Add the dir to writable dirs list.
*
* @param dir Dir
*/
public void addToWritableDirs(File dir, boolean underWarnThreshold) {
if (writableLedgerDirectories.contains(dir)) {
return;
}
LOG.info("{} becomes writable. Adding it to writable dirs list.", dir);
// Update writable dirs list
List<File> updatedWritableDirs = new ArrayList<File>(writableLedgerDirectories);
updatedWritableDirs.add(dir);
writableLedgerDirectories = updatedWritableDirs;
// Update the filled dirs list
List<File> newDirs = new ArrayList<File>(filledDirs);
newDirs.removeAll(writableLedgerDirectories);
filledDirs = newDirs;
// Notify listeners about disk writable
for (LedgerDirsListener listener : listeners) {
if (underWarnThreshold) {
listener.diskWritable(dir);
} else {
listener.diskJustWritable(dir);
}
}
}
/**
* Returns one of the ledger dir from writable dirs list randomly.
*/
File pickRandomWritableDir() throws NoWritableLedgerDirException {
return pickRandomWritableDir(null);
}
/**
* Pick up a writable dir from available dirs list randomly. The <code>excludedDir</code>
* will not be pickedup.
*
* @param excludedDir
* The directory to exclude during pickup.
* @throws NoWritableLedgerDirException if there is no writable dir available.
*/
File pickRandomWritableDir(File excludedDir) throws NoWritableLedgerDirException {
List<File> writableDirs = getWritableLedgerDirs();
return pickRandomDir(writableDirs, excludedDir);
}
/**
* Pick up a dir randomly from writableLedgerDirectories. If writableLedgerDirectories is empty
* then pick up a dir randomly from the ledger/indexdirs which have usable space more than
* minUsableSizeForIndexFileCreation.
*
* @param excludedDir The directory to exclude during pickup.
* @return
* @throws NoWritableLedgerDirException if there is no dir available.
*/
File pickRandomWritableDirForNewIndexFile(File excludedDir) throws NoWritableLedgerDirException {
final List<File> writableDirsForNewIndexFile;
if (!writableLedgerDirectories.isEmpty()) {
writableDirsForNewIndexFile = writableLedgerDirectories;
} else {
// We don't have writable Index Dirs.
// That means we must have turned readonly. But
// during the Bookie restart, while replaying the journal there might be a need
// to create new Index file and it should proceed.
writableDirsForNewIndexFile = getDirsAboveUsableThresholdSize(minUsableSizeForIndexFileCreation, true);
}
return pickRandomDir(writableDirsForNewIndexFile, excludedDir);
}
boolean isDirWritableForNewIndexFile(File indexDir) {
return (ledgerDirectories.contains(indexDir)
&& (indexDir.getUsableSpace() > minUsableSizeForIndexFileCreation));
}
/**
* Return one dir from all dirs, regardless writable or not.
*/
File pickRandomDir(File excludedDir) throws NoWritableLedgerDirException {
return pickRandomDir(getAllLedgerDirs(), excludedDir);
}
File pickRandomDir(List<File> dirs, File excludedDir) throws NoWritableLedgerDirException {
final int start = rand.nextInt(dirs.size());
int idx = start;
File candidate = dirs.get(idx);
while (null != excludedDir && excludedDir.equals(candidate)) {
idx = (idx + 1) % dirs.size();
if (idx == start) {
// after searching all available dirs,
// no writable dir is found
throw new NoWritableLedgerDirException("No writable directories found from "
+ " available writable dirs (" + dirs + ") : exclude dir "
+ excludedDir);
}
candidate = dirs.get(idx);
}
return candidate;
}
public void addLedgerDirsListener(LedgerDirsListener listener) {
if (listener != null) {
listeners.add(listener);
}
}
public DiskChecker getDiskChecker() {
return diskChecker;
}
/**
* Indicates All configured ledger directories are full.
*/
public static class NoWritableLedgerDirException extends IOException {
private static final long serialVersionUID = -8696901285061448421L;
public NoWritableLedgerDirException(String errMsg) {
super(errMsg);
}
}
/**
* Listener for the disk check events will be notified from the
* {@link LedgerDirsManager} whenever disk full/failure detected.
*/
public interface LedgerDirsListener {
/**
* This will be notified on disk failure/disk error.
*
* @param disk Failed disk
*/
default void diskFailed(File disk) {}
/**
* Notified when the disk usage warn threshold is exceeded on the drive.
* @param disk
*/
default void diskAlmostFull(File disk) {}
/**
* This will be notified on disk detected as full.
*
* @param disk Filled disk
*/
default void diskFull(File disk) {}
/**
* This will be notified on disk detected as writable and under warn threshold.
*
* @param disk Writable disk
*/
default void diskWritable(File disk) {}
/**
* This will be notified on disk detected as writable but still in warn threshold.
*
* @param disk Writable disk
*/
default void diskJustWritable(File disk) {}
/**
* This will be notified whenever all disks are detected as full.
*
* <p>Normal writes will be rejected when disks are detected as "full". High priority writes
* such as ledger recovery writes can go through if disks are still available.
*
* @param highPriorityWritesAllowed the parameter indicates we are still have disk spaces for high priority
* writes even disks are detected as "full"
*/
default void allDisksFull(boolean highPriorityWritesAllowed) {}
/**
* This will notify the fatal errors.
*/
default void fatalError() {}
}
}