blob: adfc896f7f2c8b69ba36a54e39b5cc83862b87f8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
* The underlying volume used to store replica.
* It uses the {@link FsDatasetImpl} object for synchronization.
class FsVolumeImpl implements FsVolumeSpi {
private final FsDatasetImpl dataset;
private final String storageID;
private final StorageType storageType;
private final Map<String, BlockPoolSlice> bpSlices
= new ConcurrentHashMap<String, BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final long reserved;
* Per-volume worker pool that processes new blocks to cache.
* The maximum number of workers per volume is bounded (configurable via
* dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
* contention.
private final ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf, StorageType storageType) throws IOException {
this.dataset = dataset;
this.storageID = storageID;
this.reserved = conf.getLong(
this.currentDir = currentDir;
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
this.storageType = storageType;
final int maxNumThreads = dataset.datanode.getConf().getInt(
ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
cacheExecutor = new ThreadPoolExecutor(
1, maxNumThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
File getCurrentDir() {
return currentDir;
File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir();
void decDfsUsed(String bpid, long value) {
synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
long getDfsUsed() throws IOException {
long dfsUsed = 0;
synchronized(dataset) {
for(BlockPoolSlice s : bpSlices.values()) {
dfsUsed += s.getDfsUsed();
return dfsUsed;
long getBlockPoolUsed(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDfsUsed();
* Calculate the capacity of the filesystem, after removing any
* reserved capacity.
* @return the unreserved number of bytes left in this filesystem. May be zero.
long getCapacity() {
long remaining = usage.getCapacity() - reserved;
return remaining > 0 ? remaining : 0;
public long getAvailable() throws IOException {
long remaining = getCapacity()-getDfsUsed();
long available = usage.getAvailable();
if (remaining > available) {
remaining = available;
return (remaining > 0) ? remaining : 0;
long getReserved(){
return reserved;
BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp == null) {
throw new IOException("block pool " + bpid + " is not found");
return bp;
public String getBasePath() {
return currentDir.getParent();
public String getPath(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
public File getFinalizedDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getFinalizedDir();
* Make a deep copy of the list of currently active BPIDs
public String[] getBlockPoolList() {
return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
* Temporary files. They get moved to the finalized block directory when
* the block is finalized.
File createTmpFile(String bpid, Block b) throws IOException {
return getBlockPoolSlice(bpid).createTmpFile(b);
* RBW files. They get moved to the finalized block directory when
* the block is finalized.
File createRbwFile(String bpid, Block b) throws IOException {
return getBlockPoolSlice(bpid).createRbwFile(b);
File addBlock(String bpid, Block b, File f) throws IOException {
return getBlockPoolSlice(bpid).addBlock(b, f);
Executor getCacheExecutor() {
return cacheExecutor;
void checkDirs() throws DiskErrorException {
// TODO:FEDERATION valid synchronization
for(BlockPoolSlice s : bpSlices.values()) {
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
for(BlockPoolSlice s : bpSlices.values()) {
void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
* @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas
* @throws IOException
void addToReplicasMap(String bpid, ReplicaMap volumeMap,
File dir, boolean isFinalized) throws IOException {
BlockPoolSlice bp = getBlockPoolSlice(bpid);
// TODO move this up
// dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
bp.addToReplicasMap(volumeMap, dir, isFinalized);
public String toString() {
return currentDir.getAbsolutePath();
void shutdown() {
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) {
void addBlockPool(String bpid, Configuration conf) throws IOException {
File bpdir = new File(currentDir, bpid);
BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
bpSlices.put(bpid, bp);
void shutdownBlockPool(String bpid) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
boolean isBPDirEmpty(String bpid) throws IOException {
File volumeCurrentDir = this.getCurrentDir();
File bpDir = new File(volumeCurrentDir, bpid);
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
File finalizedDir = new File(bpCurrentDir,
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
finalizedDir)) {
return false;
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
return false;
return true;
void deleteBPDirectories(String bpid, boolean force) throws IOException {
File volumeCurrentDir = this.getCurrentDir();
File bpDir = new File(volumeCurrentDir, bpid);
if (!bpDir.isDirectory()) {
// nothing to be deleted
File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
File finalizedDir = new File(bpCurrentDir,
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
} else {
if (!rbwDir.delete()) {
throw new IOException("Failed to delete " + rbwDir);
if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
!FileUtil.fullyDelete(finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
for (File f : FileUtil.listFiles(bpCurrentDir)) {
if (!f.delete()) {
throw new IOException("Failed to delete " + f);
if (!bpCurrentDir.delete()) {
throw new IOException("Failed to delete " + bpCurrentDir);
for (File f : FileUtil.listFiles(bpDir)) {
if (!f.delete()) {
throw new IOException("Failed to delete " + f);
if (!bpDir.delete()) {
throw new IOException("Failed to delete " + bpDir);
public String getStorageID() {
return storageID;
public StorageType getStorageType() {
return storageType;
DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);