Merge branch 'trunk' into HDFS-6581
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
index 252f37b..c5d23b4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
@@ -79,7 +79,21 @@
/**
* Force closed blocks to disk. Similar to POSIX O_SYNC. See javadoc for description.
*/
- SYNC_BLOCK((short) 0x08);
+ SYNC_BLOCK((short) 0x08),
+
+ /**
+ * Create the block on transient storage (RAM) if available. If
+ * transient storage is unavailable then the block will be created
+ * on disk.
+ *
+ * HDFS will make a best effort to lazily write these files to persistent
+ * storage, however file contents may be lost at any time due to process/
+ * node restarts, hence there is no guarantee of data durability.
+ *
+ * This flag must only be used for intermediate data whose loss can be
+ * tolerated by the application.
+ */
+ LAZY_PERSIST((short) 0x10);
private final short mode;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index 52706f4..e729e67 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -179,8 +179,19 @@
return fs.create(f, permission,
overwrite, bufferSize, replication, blockSize, progress);
}
-
+ @Override
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ EnumSet<CreateFlag> flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ ChecksumOpt checksumOpt) throws IOException {
+ return fs.create(f, permission,
+ flags, bufferSize, replication, blockSize, progress);
+ }
@Override
@Deprecated
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
index da67f1c..65e52fd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
@@ -30,6 +30,7 @@
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
@@ -45,6 +46,9 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+
/**
* Provides: argument processing to ensure the destination is valid
* for the number of source arguments. A processPaths that accepts both
@@ -56,6 +60,7 @@
private boolean overwrite = false;
private boolean verifyChecksum = true;
private boolean writeChecksum = true;
+ private boolean lazyPersist = false;
/**
* The name of the raw xattr namespace. It would be nice to use
@@ -78,6 +83,10 @@
overwrite = flag;
}
+ protected void setLazyPersist(boolean flag) {
+ lazyPersist = flag;
+ }
+
protected void setVerifyChecksum(boolean flag) {
verifyChecksum = flag;
}
@@ -379,7 +388,7 @@
try {
PathData tempTarget = target.suffix("._COPYING_");
targetFs.setWriteChecksum(writeChecksum);
- targetFs.writeStreamToFile(in, tempTarget);
+ targetFs.writeStreamToFile(in, tempTarget, lazyPersist);
targetFs.rename(tempTarget, target);
} finally {
targetFs.close(); // last ditch effort to ensure temp file is removed
@@ -449,10 +458,11 @@
super(fs);
}
- void writeStreamToFile(InputStream in, PathData target) throws IOException {
+ void writeStreamToFile(InputStream in, PathData target,
+ boolean lazyPersist) throws IOException {
FSDataOutputStream out = null;
try {
- out = create(target);
+ out = create(target, lazyPersist);
IOUtils.copyBytes(in, out, getConf(), true);
} finally {
IOUtils.closeStream(out); // just in case copyBytes didn't
@@ -460,9 +470,23 @@
}
// tag created files as temp files
- FSDataOutputStream create(PathData item) throws IOException {
+ FSDataOutputStream create(PathData item, boolean lazyPersist)
+ throws IOException {
try {
- return create(item.path, true);
+ if (lazyPersist) {
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
+ return create(item.path,
+ FsPermission.getFileDefault().applyUMask(
+ FsPermission.getUMask(getConf())),
+ createFlags,
+ getConf().getInt("io.file.buffer.size", 4096),
+ lazyPersist ? 1 : getDefaultReplication(item.path),
+ getDefaultBlockSize(),
+ null,
+ null);
+ } else {
+ return create(item.path, true);
+ }
} finally { // might have been created but stream was interrupted
deleteOnExit(item.path);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index 3fd870c..afd1115 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -215,21 +215,25 @@
*/
public static class Put extends CommandWithDestination {
public static final String NAME = "put";
- public static final String USAGE = "[-f] [-p] <localsrc> ... <dst>";
+ public static final String USAGE = "[-f] [-p] [-l] <localsrc> ... <dst>";
public static final String DESCRIPTION =
"Copy files from the local file system " +
"into fs. Copying fails if the file already " +
- "exists, unless the -f flag is given. Passing " +
- "-p preserves access and modification times, " +
- "ownership and the mode. Passing -f overwrites " +
- "the destination if it already exists.\n";
+ "exists, unless the -f flag is given.\n" +
+ "Flags:\n" +
+ " -p : Preserves access and modification times, ownership and the mode.\n" +
+ " -f : Overwrites the destination if it already exists.\n" +
+ " -l : Allow DataNode to lazily persist the file to disk. Forces\n" +
+ " replication factor of 1. This flag will result in reduced\n" +
+ " durability. Use with care.\n";
@Override
protected void processOptions(LinkedList<String> args) throws IOException {
- CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p");
+ CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l");
cf.parse(args);
setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p"));
+ setLazyPersist(cf.getOpt("l"));
getRemoteDestination(args);
// should have a -r option
setRecursive(true);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java
index 652c928..ee56fe6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java
@@ -91,7 +91,7 @@
break;
case 'F':
buf.append(stat.isDirectory()
- ? "directory"
+ ? "directory"
: (stat.isFile() ? "regular file" : "symlink"));
break;
case 'g':
diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
index 804b504..c6e5fc5 100644
--- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
+++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
@@ -436,7 +436,7 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^-put \[-f\] \[-p\] <localsrc> \.\.\. <dst> :\s*</expected-output>
+ <expected-output>^-put \[-f\] \[-p\] \[-l\] <localsrc> \.\.\. <dst> :( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
@@ -444,15 +444,31 @@
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^\s*exists, unless the -f flag is given.( )*Passing -p preserves access and( )*</expected-output>
+ <expected-output>^\s*exists, unless the -f flag is given.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^\s*modification times, ownership and the mode. Passing -f overwrites the( )*</expected-output>
+ <expected-output>^\s*Flags:( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*destination if it already exists.( )*</expected-output>
+ <expected-output>^\s*-p Preserves access and modification times, ownership and the mode.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*durability. Use with care.( )*</expected-output>
</comparator>
</comparators>
</test>
@@ -467,7 +483,7 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^-copyFromLocal \[-f\] \[-p\] <localsrc> \.\.\. <dst> :\s*</expected-output>
+ <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] <localsrc> \.\.\. <dst> :\s*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
new file mode 100644
index 0000000..b41e133
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -0,0 +1,96 @@
+ BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
+
+ HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)
+
+ HDFS-6924. Add new RAM_DISK storage type. (Arpit Agarwal)
+
+ HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and
+ edit logs. (Arpit Agarwal)
+
+ HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
+ (Arpit Agarwal)
+
+ HDFS-6925. DataNode should attempt to place replicas on transient storage
+ first if lazyPersist flag is received. (Arpit Agarwal)
+
+ HDFS-6926. DN support for saving replicas to persistent storage and
+ evicting in-memory replicas. (Arpit Agarwal)
+
+ HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
+
+ HDFS-6929. NN periodically unlinks lazy persist files with missing
+ replicas from namespace. (Arpit Agarwal)
+
+ HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
+ (Arpit Agarwal)
+
+ HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
+ (Arpit Agarwal)
+
+ HDFS-6931. Move lazily persisted replicas to finalized directory on DN
+ startup. (Arpit Agarwal)
+
+ HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via
+ Arpit Agarwal)
+
+ HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)
+
+ HDFS-6977. Delete all copies when a block is deleted from the block space.
+ (Arpit Agarwal)
+
+ HDFS-6991. Notify NN of evicted block before deleting it from RAM disk.
+ (Arpit Agarwal)
+
+ HDFS-6978. Directory scanner should correctly reconcile blocks on RAM
+ disk. (Arpit Agarwal)
+
+ HDFS-7066. LazyWriter#evictBlocks misses a null check for replicaState.
+ (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7064. Fix unit test failures in HDFS-6581 branch. (Xiaoyu Yao via
+ Arpit Agarwal)
+
+ HDFS-6581. Few more unit test fixes for HDFS-6581. (Arpit Agarwal)
+
+ HDFS-7080. Fix finalize and upgrade unit test failures. (Arpit Agarwal)
+
+ HDFS-7084. FsDatasetImpl#copyBlockFiles debug log can be improved.
+ (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7091. Add forwarding constructor for INodeFile for existing callers.
+ (Arpit Agarwal)
+
+ HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
+
+ HDFS-7108. Fix unit test failures in SimulatedFsDataset. (Arpit Agarwal)
+
+ HDFS-7071. Updated editsStored and editsStored.xml to bump layout
+ version and add LazyPersist flag. (Xiaoyu Yao and Arpit Agarwal via
+ Arpit Agarwal)
+
+ HDFS-6990. Add unit test for evict/delete RAM_DISK block with open
+ handle. (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7143. Fix findbugs warnings in HDFS-6581 branch. (szetszwo via
+ Arpit Agarwal)
+
+ HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK.
+ (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7144. Fix findbugs warnings in RamDiskReplicaTracker. (szetszwo via
+ Arpit Agarwal)
+
+ HDFS-7155. Bugfix in createLocatedFileStatus caused by bad merge.
+ (Arpit Agarwal)
+
+ HDFS-7153. Add storagePolicy to NN edit log during file creation.
+ (Arpit Agarwal)
+
+ HDFS-7159. Use block storage policy to set lazy persist preference.
+ (Arpit Agarwal)
+
+ HDFS-7129. Metrics to track usage of memory for writes. (Xiaoyu Yao
+ via Arpit Agarwal)
+
+ HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal)
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b016750..4391578 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.web.AuthFilter;
import org.apache.hadoop.http.HttpConfig;
@@ -127,6 +128,19 @@
public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
+ public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
+ public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
+ public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
+ public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
+ public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
+ public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
+ public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";
+ public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT = 3;
+
+ // This setting is for testing/internal use only.
+ public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
+ public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true;
+
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
"dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
@@ -228,6 +242,9 @@
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
+
+ public static final String DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = "dfs.namenode.lazypersist.file.scrub.interval.sec";
+ public static final int DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT = 5 * 60;
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 13544c8..781fb68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -75,6 +76,7 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
@@ -172,6 +174,8 @@
private final AtomicReference<CachingStrategy> cachingStrategy;
private boolean failPacket = false;
private FileEncryptionInfo fileEncryptionInfo;
+ private static final BlockStoragePolicySuite blockStoragePolicySuite =
+ BlockStoragePolicySuite.createDefaultSuite();
private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L;
@@ -360,6 +364,7 @@
private long restartDeadline = 0; // Deadline of DN restart
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
+ private final boolean isLazyPersistFile;
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
@@ -376,15 +381,16 @@
/**
* Default construction for file create
*/
- private DataStreamer() {
- this(null);
+ private DataStreamer(HdfsFileStatus stat) {
+ this(stat, null);
}
/**
* construction with tracing info
*/
- private DataStreamer(Span span) {
+ private DataStreamer(HdfsFileStatus stat, Span span) {
isAppend = false;
+ isLazyPersistFile = initLazyPersist(stat);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
@@ -404,6 +410,7 @@
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
+ isLazyPersistFile = initLazyPersist(stat);
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@@ -447,6 +454,13 @@
}
}
+ private boolean initLazyPersist(HdfsFileStatus stat) {
+ final BlockStoragePolicy lpPolicy =
+ blockStoragePolicySuite.getPolicy("LAZY_PERSIST");
+ return lpPolicy != null &&
+ stat.getStoragePolicy() == lpPolicy.getId();
+ }
+
private void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}
@@ -1396,7 +1410,7 @@
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
- cachingStrategy.get());
+ cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1649,7 +1663,7 @@
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
- streamer = new DataStreamer(traceSpan);
+ streamer = new DataStreamer(stat, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@@ -1726,7 +1740,7 @@
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
- streamer = new DataStreamer(traceSpan);
+ streamer = new DataStreamer(stat, traceSpan);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
index 7ca9e00..88cc7d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -31,17 +32,40 @@
@InterfaceAudience.Public
@InterfaceStability.Unstable
public enum StorageType {
- DISK,
- SSD,
- ARCHIVE;
+ DISK(false),
+ SSD(false),
+ ARCHIVE(false),
+ RAM_DISK(true);
+
+ private final boolean isTransient;
public static final StorageType DEFAULT = DISK;
-
+
public static final StorageType[] EMPTY_ARRAY = {};
-
+
private static final StorageType[] VALUES = values();
-
+
+ StorageType(boolean isTransient) { this.isTransient = isTransient; }
+
+ public boolean isTransient() {
+ return isTransient;
+ }
+
+ public boolean isMovable() {
+ return !isTransient;
+ }
+
public static List<StorageType> asList() {
return Arrays.asList(VALUES);
}
+
+ public static List<StorageType> getMovableTypes() {
+ List<StorageType> movableTypes = new ArrayList<StorageType>();
+ for (StorageType t : VALUES) {
+ if ( t.isTransient == false ) {
+ movableTypes.add(t);
+ }
+ }
+ return movableTypes;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java
index 8ca83a0..ef13e0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java
@@ -49,15 +49,29 @@
private final StorageType[] creationFallbacks;
/** The fallback storage type for replication. */
private final StorageType[] replicationFallbacks;
+ /**
+ * Whether the policy is inherited during file creation.
+ * If set then the policy cannot be changed after file creation.
+ */
+ private boolean copyOnCreateFile;
@VisibleForTesting
public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
StorageType[] creationFallbacks, StorageType[] replicationFallbacks) {
+ this(id, name, storageTypes, creationFallbacks, replicationFallbacks,
+ false);
+ }
+
+ @VisibleForTesting
+ public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
+ StorageType[] creationFallbacks, StorageType[] replicationFallbacks,
+ boolean copyOnCreateFile) {
this.id = id;
this.name = name;
this.storageTypes = storageTypes;
this.creationFallbacks = creationFallbacks;
this.replicationFallbacks = replicationFallbacks;
+ this.copyOnCreateFile = copyOnCreateFile;
}
/**
@@ -65,13 +79,22 @@
*/
public List<StorageType> chooseStorageTypes(final short replication) {
final List<StorageType> types = new LinkedList<StorageType>();
- int i = 0;
- for(; i < replication && i < storageTypes.length; i++) {
- types.add(storageTypes[i]);
+ int i = 0, j = 0;
+
+ // Do not return transient storage types. We will not have accurate
+ // usage information for transient types.
+ for (;i < replication && j < storageTypes.length; ++j) {
+ if (!storageTypes[j].isTransient()) {
+ types.add(storageTypes[j]);
+ ++i;
+ }
}
+
final StorageType last = storageTypes[storageTypes.length - 1];
- for(; i < replication; i++) {
- types.add(last);
+ if (!last.isTransient()) {
+ for (; i < replication; i++) {
+ types.add(last);
+ }
}
return types;
}
@@ -241,4 +264,8 @@
}
return null;
}
+
+ public boolean isCopyOnCreateFile() {
+ return copyOnCreateFile;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index d54d5be..f6b99e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -106,8 +106,8 @@
final long maxBytesRcvd,
final long latestGenerationStamp,
final DataChecksum requestedChecksum,
- final CachingStrategy cachingStrategy) throws IOException;
-
+ final CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException;
/**
* Transfer a block to another datanode.
* The block stage must be
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index daae9b7..8192925 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -148,10 +148,11 @@
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
- CachingStrategy.newDefaultStrategy()));
- } finally {
- if (traceScope != null) traceScope.close();
- }
+ CachingStrategy.newDefaultStrategy()),
+ (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
+ } finally {
+ if (traceScope != null) traceScope.close();
+ }
}
/** Receive {@link Op#TRANSFER_BLOCK} */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index fb6cf2c..1ae9da5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -128,7 +128,8 @@
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
- final CachingStrategy cachingStrategy) throws IOException {
+ final CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@@ -146,7 +147,8 @@
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto)
- .setCachingStrategy(getCachingStrategy(cachingStrategy));
+ .setCachingStrategy(getCachingStrategy(cachingStrategy))
+ .setAllowLazyPersist(allowLazyPersist);
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 0432d5d..6470a1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -1362,6 +1362,9 @@
if (flag.contains(CreateFlag.OVERWRITE)) {
value |= CreateFlagProto.OVERWRITE.getNumber();
}
+ if (flag.contains(CreateFlag.LAZY_PERSIST)) {
+ value |= CreateFlagProto.LAZY_PERSIST.getNumber();
+ }
return value;
}
@@ -1378,6 +1381,10 @@
== CreateFlagProto.OVERWRITE_VALUE) {
result.add(CreateFlag.OVERWRITE);
}
+ if ((flag & CreateFlagProto.LAZY_PERSIST_VALUE)
+ == CreateFlagProto.LAZY_PERSIST_VALUE) {
+ result.add(CreateFlag.LAZY_PERSIST);
+ }
return new EnumSetWritable<CreateFlag>(result);
}
@@ -1784,6 +1791,8 @@
return StorageTypeProto.SSD;
case ARCHIVE:
return StorageTypeProto.ARCHIVE;
+ case RAM_DISK:
+ return StorageTypeProto.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
@@ -1814,6 +1823,8 @@
return StorageType.SSD;
case ARCHIVE:
return StorageType.ARCHIVE;
+ case RAM_DISK:
+ return StorageType.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 67994c8..2a19537 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -271,7 +271,7 @@
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
- for(StorageType t : StorageType.asList()) {
+ for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
continue;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ad170d7..640791f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -402,6 +402,10 @@
return storagePolicySuite.getPolicy(policyName);
}
+ public BlockStoragePolicy getStoragePolicy(final byte policyId) {
+ return storagePolicySuite.getPolicy(policyId);
+ }
+
public BlockStoragePolicy[] getStoragePolicies() {
return storagePolicySuite.getAllPolicies();
}
@@ -2103,8 +2107,8 @@
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
- && (!storedBlock.findDatanode(dn)
- || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+ && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+ corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
}
return storedBlock;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
index 1d162a0..caaabad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
@@ -44,6 +44,12 @@
public static BlockStoragePolicySuite createDefaultSuite() {
final BlockStoragePolicy[] policies =
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
+ final byte lazyPersistId = 15;
+ policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId, "LAZY_PERSIST",
+ new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
+ new StorageType[]{StorageType.DISK},
+ new StorageType[]{StorageType.DISK},
+ true); // Cannot be changed on regular files, but inherited.
final byte hotId = 12;
policies[hotId] = new BlockStoragePolicy(hotId, "HOT",
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 3121496..25e89ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -230,7 +230,6 @@
void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block);
- checkDelHint(delHint);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
@@ -249,11 +248,6 @@
block.getBlockPoolId(), getBlockPoolId());
}
- private void checkDelHint(String delHint) {
- Preconditions.checkArgument(delHint != null,
- "delHint is null");
- }
-
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
index bbb67fc..61f1e7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
@@ -191,7 +191,7 @@
+ hours + " hours for block pool " + bpid);
// get the list of blocks and arrange them in random order
- List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
+ List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
Collections.shuffle(arr);
long scanTime = -1;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index bfb2233..4d1cc6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -139,7 +139,8 @@
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
- CachingStrategy cachingStrategy) throws IOException {
+ CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException {
try{
this.block = block;
this.in = in;
@@ -180,7 +181,7 @@
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(storageType, block);
+ replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 546162f..a9ea3fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -890,7 +890,7 @@
}
// calls specific to BP
- protected void notifyNamenodeReceivedBlock(
+ public void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
@@ -2003,7 +2003,8 @@
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode,
- stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
+ stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
+ false);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);
@@ -2080,7 +2081,8 @@
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
- if (blockScanner != null) {
+ FsVolumeSpi volume = getFSDataset().getVolume(block);
+ if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 965b655..99eedb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -81,6 +81,7 @@
final static String STORAGE_DIR_DETACHED = "detach";
public final static String STORAGE_DIR_RBW = "rbw";
public final static String STORAGE_DIR_FINALIZED = "finalized";
+ public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
public final static String STORAGE_DIR_TMP = "tmp";
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 4575c93..67eb941 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -544,7 +544,8 @@
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
- CachingStrategy cachingStrategy) throws IOException {
+ CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@@ -606,8 +607,8 @@
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
- cachingStrategy);
-
+ cachingStrategy, allowLazyPersist);
+
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
@@ -648,10 +649,11 @@
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
+ // Do not propagate allowLazyPersist to downstream DataNodes.
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- latestGenerationStamp, requestedChecksum, cachingStrategy);
+ latestGenerationStamp, requestedChecksum, cachingStrategy, false);
mirrorOut.flush();
@@ -1046,7 +1048,7 @@
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
- CachingStrategy.newDropBehind());
+ CachingStrategy.newDropBehind(), false);
// receive a block
blockReceiver.receiveBlock(null, null, replyOut, null,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index a47f2ef..71f976b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -83,6 +83,7 @@
long missingBlockFile = 0;
long missingMemoryBlocks = 0;
long mismatchBlocks = 0;
+ long duplicateBlocks = 0;
public Stats(String bpid) {
this.bpid = bpid;
@@ -399,7 +400,7 @@
/**
* Reconcile differences between disk and in-memory blocks
*/
- void reconcile() {
+ void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();
@@ -440,7 +441,7 @@
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < memReport.length && d < blockpoolReport.length) {
- Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+ FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)];
ScanInfo info = blockpoolReport[Math.min(
d, blockpoolReport.length - 1)];
if (info.getBlockId() < memBlock.getBlockId()) {
@@ -468,9 +469,23 @@
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
+ } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
+ // volumeMap record and on-disk files don't match.
+ statsRecord.duplicateBlocks++;
+ addDifference(diffRecord, statsRecord, info);
}
d++;
- m++;
+
+ if (d < blockpoolReport.length) {
+ // There may be multiple on-disk records for the same block, don't increment
+ // the memory record pointer if so.
+ ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
+ if (nextInfo.getBlockId() != info.blockId) {
+ ++m;
+ }
+ } else {
+ ++m;
+ }
}
while (m < memReport.length) {
FinalizedReplica current = memReport[m++];
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
index a480bb1..b6e5ba9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
@@ -59,4 +59,9 @@
* Return the storageUuid of the volume that stores this replica.
*/
public String getStorageUuid();
+
+ /**
+ * Return true if the target volume is backed by RAM.
+ */
+ public boolean isOnTransientStorage();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 08395aa..45862ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -250,7 +250,7 @@
}
}
} else {
- // for create, we can use the requested checksum
+ // for create, we can use the requested checksum
checksum = requestedChecksum;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 49ac605..940d3eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -62,17 +62,6 @@
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
/**
- * Constructor for a zero length replica
- * @param blockId block id
- * @param genStamp replica generation stamp
- * @param vol volume where replica is located
- * @param dir directory path where block and meta files are located
- */
- ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
- this( blockId, 0L, genStamp, vol, dir);
- }
-
- /**
* Constructor
* @param block a block
* @param vol volume where replica is located
@@ -296,20 +285,6 @@
return true;
}
- /**
- * Set this replica's generation stamp to be a newer one
- * @param newGS new generation stamp
- * @throws IOException is the new generation stamp is not greater than the current one
- */
- void setNewerGenerationStamp(long newGS) throws IOException {
- long curGS = getGenerationStamp();
- if (newGS <= curGS) {
- throw new IOException("New generation stamp (" + newGS
- + ") must be greater than current one (" + curGS + ")");
- }
- setGenerationStamp(newGS);
- }
-
@Override //Object
public String toString() {
return getClass().getSimpleName()
@@ -321,4 +296,9 @@
+ "\n getVolume() = " + getVolume()
+ "\n getBlockFile() = " + getBlockFile();
}
+
+ @Override
+ public boolean isOnTransientStorage() {
+ return volume.isTransientStorage();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
index 35f7c93..2cd8a01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
@@ -37,8 +37,7 @@
// that the replica will be bumped to after recovery
public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
- super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
- replica.getVolume(), replica.getDir());
+ super(replica, replica.getVolume(), replica.getDir());
if ( replica.getState() != ReplicaState.FINALIZED &&
replica.getState() != ReplicaState.RBW &&
replica.getState() != ReplicaState.RWR ) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index ec19ec5..d0d36ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -99,7 +99,7 @@
@Override
public synchronized V chooseVolume(List<V> volumes,
- final long replicaSize) throws IOException {
+ long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
@@ -138,8 +138,7 @@
if (mostAvailableAmongLowVolumes < replicaSize ||
random.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
- highAvailableVolumes,
- replicaSize);
+ highAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size "
@@ -147,8 +146,7 @@
}
} else {
volume = roundRobinPolicyLowAvailable.chooseVolume(
- lowAvailableVolumes,
- replicaSize);
+ lowAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size "
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4c03151..2bb2e7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -116,13 +116,16 @@
/** @return a list of finalized blocks for the given block pool. */
public List<FinalizedReplica> getFinalizedBlocks(String bpid);
+ /** @return a list of finalized blocks for the given block pool. */
+ public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid);
+
/**
* Check whether the in-memory block record matches the block on the disk,
* and, in case that they are not matched, update the record or mark it
* as corrupted.
*/
public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol);
+ File diskMetaFile, FsVolumeSpi vol) throws IOException;
/**
* @param b - the block
@@ -197,7 +200,7 @@
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createRbw(StorageType storageType,
- ExtendedBlock b) throws IOException;
+ ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index cba23c3..8ebf2b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -46,6 +46,9 @@
public StorageType getStorageType();
+ /** Returns true if the volume is NOT backed by persistent storage. */
+ public boolean isTransientStorage();
+
/**
* Reserve disk space for an RBW block so a writer does not run out of
* space before the block is full.
@@ -56,4 +59,4 @@
* Release disk space previously reserved for RBW block.
*/
public void releaseReservedSpace(long bytesToRelease);
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
index 7f4bdae..55a3560 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
@@ -20,6 +20,9 @@
import java.io.IOException;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/**
@@ -27,12 +30,14 @@
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
+ public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
private int curVolume = 0;
@Override
- public synchronized V chooseVolume(final List<V> volumes, final long blockSize
- ) throws IOException {
+ public synchronized V chooseVolume(final List<V> volumes, long blockSize)
+ throws IOException {
+
if(volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
@@ -50,7 +55,9 @@
final V volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable();
- if (availableVolumeSize > blockSize) { return volume; }
+ if (availableVolumeSize > blockSize) {
+ return volume;
+ }
if (availableVolumeSize > maxAvailable) {
maxAvailable = availableVolumeSize;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 96e4650..3eeb3ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -28,6 +28,9 @@
import java.io.RandomAccessFile;
import java.util.Scanner;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
@@ -42,6 +45,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -56,16 +60,20 @@
* This class is synchronized by {@link FsVolumeImpl}.
*/
class BlockPoolSlice {
+ static final Log LOG = LogFactory.getLog(BlockPoolSlice.class);
+
private final String bpid;
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current
// directory where finalized replicas are stored
private final File finalizedDir;
+ private final File lazypersistDir;
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
private static final String DU_CACHE_FILE = "dfsUsed";
private volatile boolean dfsUsedSaved = false;
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+ private final boolean deleteDuplicateReplicas;
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final DU dfsUsage;
@@ -85,12 +93,17 @@
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+ this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
if (!this.finalizedDir.exists()) {
if (!this.finalizedDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + this.finalizedDir);
}
}
+ this.deleteDuplicateReplicas = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
+ DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
+
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
// in the future, we might want to do some sort of datanode-local
@@ -136,6 +149,10 @@
return finalizedDir;
}
+ File getLazypersistDir() {
+ return lazypersistDir;
+ }
+
File getRbwDir() {
return rbwDir;
}
@@ -252,18 +269,63 @@
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile;
}
-
+
+ /**
+ * Save the given replica to persistent storage.
+ *
+ * @return The saved meta and block files, in that order.
+ * @throws IOException
+ */
+ File[] lazyPersistReplica(long blockId, long genStamp,
+ File srcMeta, File srcFile) throws IOException {
+ if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
+ FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
+ }
+ File targetFiles[] = FsDatasetImpl.copyBlockFiles(
+ blockId, genStamp, srcMeta, srcFile, lazypersistDir);
+ dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
+ return targetFiles;
+ }
+
+ /**
+ * Move a persisted replica from lazypersist directory to a subdirectory
+ * under finalized.
+ */
+ File activateSavedReplica(Block b, File metaFile, File blockFile)
+ throws IOException {
+ final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+ final File targetBlockFile = new File(blockDir, blockFile.getName());
+ final File targetMetaFile = new File(blockDir, metaFile.getName());
+ FileUtils.moveFile(blockFile, targetBlockFile);
+ FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
+ FileUtils.moveFile(metaFile, targetMetaFile);
+ FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
+ return targetBlockFile;
+ }
+
void checkDirs() throws DiskErrorException {
DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
+
+
- void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+ void getVolumeMap(ReplicaMap volumeMap,
+ final RamDiskReplicaTracker lazyWriteReplicaMap)
+ throws IOException {
+ // Recover lazy persist replicas, they will be added to the volumeMap
+ // when we scan the finalized directory.
+ if (lazypersistDir.exists()) {
+ int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
+ FsDatasetImpl.LOG.info(
+ "Recovered " + numRecovered + " replicas from " + lazypersistDir);
+ }
+
// add finalized replicas
- addToReplicasMap(volumeMap, finalizedDir, true);
+ addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
// add rbw replicas
- addToReplicasMap(volumeMap, rbwDir, false);
+ addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
}
/**
@@ -292,18 +354,82 @@
/**
+ * Move replicas in the lazy persist directory to their corresponding locations
+ * in the finalized directory.
+ * @return number of replicas recovered.
+ */
+ private int moveLazyPersistReplicasToFinalized(File source)
+ throws IOException {
+ File files[] = FileUtil.listFiles(source);
+ int numRecovered = 0;
+ for (File file : files) {
+ if (file.isDirectory()) {
+ numRecovered += moveLazyPersistReplicasToFinalized(file);
+ }
+
+ if (Block.isMetaFilename(file.getName())) {
+ File metaFile = file;
+ File blockFile = Block.metaToBlockFile(metaFile);
+ long blockId = Block.filename2id(blockFile.getName());
+ File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
+
+ if (blockFile.exists()) {
+
+ if (!targetDir.exists() && !targetDir.mkdirs()) {
+ LOG.warn("Failed to mkdirs " + targetDir);
+ continue;
+ }
+
+ final File targetMetaFile = new File(targetDir, metaFile.getName());
+ try {
+ NativeIO.renameTo(metaFile, targetMetaFile);
+ } catch (IOException e) {
+ LOG.warn("Failed to move meta file from "
+ + metaFile + " to " + targetMetaFile, e);
+ continue;
+
+ }
+
+ final File targetBlockFile = new File(targetDir, blockFile.getName());
+ try {
+ NativeIO.renameTo(blockFile, targetBlockFile);
+ } catch (IOException e) {
+ LOG.warn("Failed to move block file from "
+ + blockFile + " to " + targetBlockFile, e);
+ continue;
+ }
+
+ if (targetBlockFile.exists() && targetMetaFile.exists()) {
+ ++numRecovered;
+ } else {
+ // Failure should be rare.
+ LOG.warn("Failed to move " + blockFile + " to " + targetDir);
+ }
+ }
+ }
+ }
+
+ FileUtil.fullyDelete(source);
+ return numRecovered;
+ }
+
+ /**
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
+ * @param lazyWriteReplicaMap Map of replicas on transient
+ * storage.
* @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas
*/
- void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
- ) throws IOException {
+ void addToReplicasMap(ReplicaMap volumeMap, File dir,
+ final RamDiskReplicaTracker lazyWriteReplicaMap,
+ boolean isFinalized)
+ throws IOException {
File files[] = FileUtil.listFiles(dir);
for (File file : files) {
if (file.isDirectory()) {
- addToReplicasMap(volumeMap, file, isFinalized);
+ addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
}
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@@ -361,13 +487,96 @@
}
}
- ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
- if (oldReplica != null) {
- FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
- "on disk: " + oldReplica.getBlockFile() + " and " + file );
+ ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
+ if (oldReplica == null) {
+ volumeMap.add(bpid, newReplica);
+ } else {
+ // We have multiple replicas of the same block so decide which one
+ // to keep.
+ newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+ }
+
+ // If we are retaining a replica on transient storage make sure
+ // it is in the lazyWriteReplicaMap so it can be persisted
+ // eventually.
+ if (newReplica.getVolume().isTransientStorage()) {
+ lazyWriteReplicaMap.addReplica(bpid, blockId,
+ (FsVolumeImpl) newReplica.getVolume());
+ } else {
+ lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
}
}
}
+
+ /**
+ * This method is invoked during DN startup when volumes are scanned to
+ * build up the volumeMap.
+ *
+ * Given two replicas, decide which one to keep. The preference is as
+ * follows:
+ * 1. Prefer the replica with the higher generation stamp.
+ * 2. If generation stamps are equal, prefer the replica with the
+ * larger on-disk length.
+ * 3. If on-disk length is the same, prefer the replica on persistent
+ * storage volume.
+ * 4. All other factors being equal, keep replica1.
+ *
+ * The other replica is removed from the volumeMap and is deleted from
+ * its storage volume.
+ *
+ * @param replica1
+ * @param replica2
+ * @param volumeMap
+ * @return the replica that is retained.
+ * @throws IOException
+ */
+ ReplicaInfo resolveDuplicateReplicas(
+ final ReplicaInfo replica1, final ReplicaInfo replica2,
+ final ReplicaMap volumeMap) throws IOException {
+
+ if (!deleteDuplicateReplicas) {
+ // Leave both block replicas in place.
+ return replica1;
+ }
+
+ ReplicaInfo replicaToKeep;
+ ReplicaInfo replicaToDelete;
+
+ if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
+ replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp()
+ ? replica1 : replica2;
+ } else if (replica1.getNumBytes() != replica2.getNumBytes()) {
+ replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ?
+ replica1 : replica2;
+ } else if (replica1.getVolume().isTransientStorage() &&
+ !replica2.getVolume().isTransientStorage()) {
+ replicaToKeep = replica2;
+ } else {
+ replicaToKeep = replica1;
+ }
+
+ replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("resolveDuplicateReplicas decide to keep " + replicaToKeep
+ + ". Will try to delete " + replicaToDelete);
+ }
+
+ // Update volumeMap.
+ volumeMap.add(bpid, replicaToKeep);
+
+ // Delete the files on disk. Failure here is okay.
+ final File blockFile = replicaToDelete.getBlockFile();
+ if (!blockFile.delete()) {
+ LOG.warn("Failed to delete block file " + blockFile);
+ }
+ final File metaFile = replicaToDelete.getMetaFile();
+ if (!metaFile.delete()) {
+ LOG.warn("Failed to delete meta file " + metaFile);
+ }
+
+ return replicaToKeep;
+ }
/**
* Find out the number of bytes in the block that match its crc.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 6e81082..df52e14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -30,7 +30,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -44,7 +43,9 @@
import javax.management.ObjectName;
import javax.management.StandardMBean;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -66,6 +67,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
@@ -84,6 +86,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -92,6 +95,7 @@
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -115,7 +119,6 @@
}
}
-
@Override // FsDatasetSpi
public List<FsVolumeImpl> getVolumes() {
return volumes.volumes;
@@ -155,7 +158,7 @@
@Override // FsDatasetSpi
public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException {
- File blockfile = getFile(bpid, blkid);
+ File blockfile = getFile(bpid, blkid, false);
if (blockfile == null) {
return null;
}
@@ -208,11 +211,17 @@
final FsVolumeList volumes;
final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService;
+ final Daemon lazyWriter;
final FsDatasetCache cacheManager;
private final Configuration conf;
private final int validVolsRequired;
+ private volatile boolean fsRunning;
final ReplicaMap volumeMap;
+ final RamDiskReplicaTracker ramDiskReplicaTracker;
+
+ private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
+
// Used for synchronizing access to usage stats
private final Object statsLock = new Object();
@@ -222,6 +231,7 @@
*/
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
+ this.fsRunning = true;
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
@@ -252,6 +262,8 @@
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
+ ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
+
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
@@ -266,6 +278,10 @@
}
cacheManager = new FsDatasetCache(this);
+
+ // Start the lazy writer once we have built the replica maps.
+ lazyWriter = new Daemon(new LazyWriter(conf));
+ lazyWriter.start();
registerMBean(datanode.getDatanodeUuid());
}
@@ -281,7 +297,7 @@
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
ReplicaMap tempVolumeMap = new ReplicaMap(this);
- fsVolume.getVolumeMap(tempVolumeMap);
+ fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
volumeMap.addAll(tempVolumeMap);
volumes.addVolume(fsVolume);
@@ -309,7 +325,7 @@
for (final String bpid : bpids) {
try {
fsVolume.addBlockPool(bpid, this.conf);
- fsVolume.getVolumeMap(bpid, tempVolumeMap);
+ fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume +
". Will throw later.", e);
@@ -550,16 +566,16 @@
* Get File name for a given block.
*/
private File getBlockFile(ExtendedBlock b) throws IOException {
- return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
+ return getBlockFile(b.getBlockPoolId(), b.getBlockId());
}
/**
* Get File name for a given block.
*/
- File getBlockFile(String bpid, Block b) throws IOException {
- File f = validateBlockFile(bpid, b);
+ File getBlockFile(String bpid, long blockId) throws IOException {
+ File f = validateBlockFile(bpid, blockId);
if(f == null) {
- throw new IOException("Block " + b + " is not valid.");
+ throw new IOException("BlockId " + blockId + " is not valid.");
}
return f;
}
@@ -569,12 +585,16 @@
* checking that it exists. This should be used when the
* next operation is going to open the file for read anyway,
* and thus the exists check is redundant.
+ *
+ * @param touch if true then update the last access timestamp of the
+ * block. Currently used for blocks on transient storage.
*/
- private File getBlockFileNoExistsCheck(ExtendedBlock b)
+ private File getBlockFileNoExistsCheck(ExtendedBlock b,
+ boolean touch)
throws IOException {
final File f;
synchronized(this) {
- f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId());
+ f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
}
if (f == null) {
throw new IOException("Block " + b + " is not valid");
@@ -585,7 +605,7 @@
@Override // FsDatasetSpi
public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
- File blockFile = getBlockFileNoExistsCheck(b);
+ File blockFile = getBlockFileNoExistsCheck(b, true);
if (isNativeIOAvailable) {
return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
} else {
@@ -661,8 +681,8 @@
return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
}
- static File moveBlockFiles(Block b, File srcfile, File destdir
- ) throws IOException {
+ static File moveBlockFiles(Block b, File srcfile, File destdir)
+ throws IOException {
final File dstfile = new File(destdir, b.getBlockName());
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
@@ -685,6 +705,34 @@
return dstfile;
}
+ /**
+ * Copy the block and meta files for the given block to the given destination.
+ * @return the new meta and block files.
+ * @throws IOException
+ */
+ static File[] copyBlockFiles(long blockId, long genStamp,
+ File srcMeta, File srcFile, File destRoot)
+ throws IOException {
+ final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
+ final File dstFile = new File(destDir, srcFile.getName());
+ final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
+ try {
+ FileUtils.copyFile(srcMeta, dstMeta);
+ } catch (IOException e) {
+ throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
+ }
+ try {
+ FileUtils.copyFile(srcFile, dstFile);
+ } catch (IOException e) {
+ throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Copied " + srcMeta + " to " + dstMeta);
+ LOG.debug("Copied " + srcFile + " to " + dstFile);
+ }
+ return new File[] {dstMeta, dstFile};
+ }
+
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
@@ -949,8 +997,8 @@
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
- ExtendedBlock b) throws IOException {
- ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+ ExtendedBlock b, boolean allowLazyPersist) throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
@@ -958,12 +1006,32 @@
" and thus cannot be created.");
}
// create a new block
- FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
- // create a rbw file to hold block in the designated volume
+ FsVolumeImpl v;
+ while (true) {
+ try {
+ if (allowLazyPersist) {
+ // First try to place the block on a transient volume.
+ v = volumes.getNextTransientVolume(b.getNumBytes());
+ datanode.getMetrics().incrRamDiskBlocksWrite();
+ } else {
+ v = volumes.getNextVolume(storageType, b.getNumBytes());
+ }
+ } catch (DiskOutOfSpaceException de) {
+ if (allowLazyPersist) {
+ datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+ allowLazyPersist = false;
+ continue;
+ }
+ throw de;
+ }
+ break;
+ }
+ // create an rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+
return newReplicaInfo;
}
@@ -1109,7 +1177,6 @@
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile(), 0);
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-
return newReplicaInfo;
}
@@ -1176,8 +1243,14 @@
File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+
+ if (v.isTransientStorage()) {
+ ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+ datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+ }
}
volumeMap.add(bpid, newReplicaInfo);
+
return newReplicaInfo;
}
@@ -1197,6 +1270,9 @@
replicaInfo.getMetaFile(), b.getLocalBlock())) {
LOG.warn("Block " + b + " unfinalized and removed. " );
}
+ if (replicaInfo.getVolume().isTransientStorage()) {
+ ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+ }
}
}
@@ -1293,6 +1369,22 @@
}
/**
+ * Get the list of finalized blocks from in-memory blockmap for a block pool.
+ */
+ @Override
+ public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+ ArrayList<FinalizedReplica> finalized =
+ new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+ for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+ if(!b.getVolume().isTransientStorage() &&
+ b.getState() == ReplicaState.FINALIZED) {
+ finalized.add(new FinalizedReplica((FinalizedReplica)b));
+ }
+ }
+ return finalized;
+ }
+
+ /**
* Check whether the given block is a valid one.
* valid means finalized
*/
@@ -1321,11 +1413,11 @@
/**
* Find the file corresponding to the block and return it if it exists.
*/
- File validateBlockFile(String bpid, Block b) {
+ File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final File f;
synchronized(this) {
- f = getFile(bpid, b.getBlockId());
+ f = getFile(bpid, blockId, false);
}
if(f != null ) {
@@ -1337,7 +1429,7 @@
}
if (LOG.isDebugEnabled()) {
- LOG.debug("b=" + b + ", f=" + f);
+ LOG.debug("blockId=" + blockId + ", f=" + f);
}
return null;
}
@@ -1409,6 +1501,17 @@
volumeMap.remove(bpid, invalidBlks[i]);
}
+ if (v.isTransientStorage()) {
+ RamDiskReplica replicaInfo =
+ ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
+ if (replicaInfo != null) {
+ if (replicaInfo.getIsPersisted() == false) {
+ datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
+ }
+ discardRamDiskReplica(replicaInfo, true);
+ }
+ }
+
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
datanode.getShortCircuitRegistry().processBlockInvalidation(
@@ -1497,6 +1600,11 @@
": volume was not an instance of FsVolumeImpl.");
return;
}
+ if (volume.isTransientStorage()) {
+ LOG.warn("Caching not supported on block with id " + blockId +
+ " since the volume is backed by RAM.");
+ return;
+ }
success = true;
} finally {
if (!success) {
@@ -1533,7 +1641,7 @@
@Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId();
- return getFile(block.getBlockPoolId(), blockId) != null;
+ return getFile(block.getBlockPoolId(), blockId, false) != null;
}
/**
@@ -1542,13 +1650,18 @@
* @param blockId a block's id
* @return on disk data file path; null if the replica does not exist
*/
- File getFile(final String bpid, final long blockId) {
+ File getFile(final String bpid, final long blockId, boolean touch) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info != null) {
+ if (touch && info.getVolume().isTransientStorage()) {
+ ramDiskReplicaTracker.touch(bpid, blockId);
+ datanode.getMetrics().incrRamDiskBlocksReadHits();
+ }
return info.getBlockFile();
}
return null;
}
+
/**
* check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong
@@ -1624,8 +1737,14 @@
@Override // FsDatasetSpi
public void shutdown() {
- if (mbeanName != null)
+ fsRunning = false;
+
+ ((LazyWriter) lazyWriter.getRunnable()).stop();
+ lazyWriter.interrupt();
+
+ if (mbeanName != null) {
MBeans.unregister(mbeanName);
+ }
if (asyncDiskService != null) {
asyncDiskService.shutdown();
@@ -1634,6 +1753,13 @@
if(volumes != null) {
volumes.shutdown();
}
+
+ try {
+ lazyWriter.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
+ "from LazyWriter.join");
+ }
}
@Override // FSDatasetMBean
@@ -1666,7 +1792,7 @@
*/
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) {
+ File diskMetaFile, FsVolumeSpi vol) throws IOException {
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
synchronized (this) {
@@ -1699,6 +1825,9 @@
if (blockScanner != null) {
blockScanner.deleteBlock(bpid, new Block(blockId));
}
+ if (vol.isTransientStorage()) {
+ ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
+ }
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
@@ -1719,8 +1848,12 @@
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
final DataBlockScanner blockScanner = datanode.getBlockScanner();
- if (blockScanner != null) {
- blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+ if (!vol.isTransientStorage()) {
+ if (blockScanner != null) {
+ blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+ }
+ } else {
+ ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
}
LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
@@ -1732,10 +1865,20 @@
File memFile = memBlockInfo.getBlockFile();
if (memFile.exists()) {
if (memFile.compareTo(diskFile) != 0) {
- LOG.warn("Block file " + memFile.getAbsolutePath()
- + " does not match file found by scan "
- + diskFile.getAbsolutePath());
- // TODO: Should the diskFile be deleted?
+ if (diskMetaFile.exists()) {
+ if (memBlockInfo.getMetaFile().exists()) {
+ // We have two sets of block+meta files. Decide which one to
+ // keep.
+ ReplicaInfo diskBlockInfo = new FinalizedReplica(
+ blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile());
+ ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas(
+ memBlockInfo, diskBlockInfo, volumeMap);
+ }
+ } else {
+ if (!diskFile.delete()) {
+ LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan");
+ }
+ }
}
} else {
// Block refers to a block file that does not exist.
@@ -1899,9 +2042,9 @@
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
LOG.info("updateReplica: " + oldBlock
- + ", recoveryId=" + recoveryId
- + ", length=" + newlength
- + ", replica=" + replica);
+ + ", recoveryId=" + recoveryId
+ + ", length=" + newlength
+ + ", replica=" + replica);
//check replica
if (replica == null) {
@@ -1993,7 +2136,7 @@
volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid);
}
- volumes.getAllVolumesMap(bpid, volumeMap);
+ volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
}
@Override
@@ -2171,5 +2314,248 @@
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
nbytes, flags);
}
+
+ void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
+ ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(),
+ replica.getBlockId(), deleteSavedCopies);
+ }
+
+ class LazyWriter implements Runnable {
+ private volatile boolean shouldRun = true;
+ final int checkpointerInterval;
+ final long estimateBlockSize;
+ final int lowWatermarkFreeSpacePercentage;
+ final int lowWatermarkFreeSpaceReplicas;
+
+
+ public LazyWriter(Configuration conf) {
+ this.checkpointerInterval = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
+ this.estimateBlockSize = conf.getLongBytes(
+ DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+ DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+ this.lowWatermarkFreeSpacePercentage = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
+ this.lowWatermarkFreeSpaceReplicas = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
+ }
+
+ private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
+ throws IOException {
+
+ FsVolumeImpl targetVolume;
+ ReplicaInfo replicaInfo;
+ BlockPoolSlice bpSlice;
+ File srcFile, srcMeta;
+ long genStamp;
+
+ synchronized (FsDatasetImpl.this) {
+ replicaInfo = volumeMap.get(bpid, blockId);
+
+ if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
+ // The block was either deleted before it could be checkpointed or
+ // it is already on persistent storage. This can occur if a second
+ // replica on persistent storage was found after the lazy write was
+ // scheduled.
+ return;
+ }
+
+ // Pick a target volume for the block.
+ targetVolume = volumes.getNextVolume(
+ StorageType.DEFAULT, replicaInfo.getNumBytes());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
+ }
+
+ ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+ bpSlice = targetVolume.getBlockPoolSlice(bpid);
+ srcMeta = replicaInfo.getMetaFile();
+ srcFile = replicaInfo.getBlockFile();
+ genStamp = replicaInfo.getGenerationStamp();
+ }
+
+ // Drop the FsDatasetImpl lock for the file copy.
+ File[] savedFiles =
+ bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
+
+ synchronized (FsDatasetImpl.this) {
+ ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
+
+ // Update metrics (ignore the metadata file size)
+ datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
+ datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
+ datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
+ Time.monotonicNow() - creationTime);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+ " to file " + savedFiles[1]);
+ }
+ }
+ }
+
+ /**
+ * Checkpoint a pending replica to persistent storage now.
+ * If we fail then move the replica to the end of the queue.
+ * @return true if there is more work to be done, false otherwise.
+ */
+ private boolean saveNextReplica() {
+ RamDiskReplica block = null;
+ boolean succeeded = false;
+
+ try {
+ block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
+ if (block != null) {
+ moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(),
+ block.getCreationTime());
+ }
+ succeeded = true;
+ } catch(IOException ioe) {
+ LOG.warn("Exception saving replica " + block, ioe);
+ } finally {
+ if (!succeeded && block != null) {
+ LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
+ ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
+ }
+ }
+
+ return succeeded;
+ }
+
+ private boolean transientFreeSpaceBelowThreshold() throws IOException {
+ long free = 0;
+ long capacity = 0;
+
+ // Don't worry about fragmentation for now. We don't expect more than one
+ // transient volume per DN.
+ for (FsVolumeImpl v : volumes.volumes) {
+ if (v.isTransientStorage()) {
+ capacity += v.getCapacity();
+ free += v.getAvailable();
+ }
+ }
+
+ if (capacity == 0) {
+ return false;
+ }
+
+ int percentFree = (int) (free * 100 / capacity);
+ return percentFree < lowWatermarkFreeSpacePercentage ||
+ free < (estimateBlockSize * lowWatermarkFreeSpaceReplicas);
+ }
+
+ /**
+ * Attempt to evict one or more transient block replicas we have at least
+ * spaceNeeded bytes free.
+ */
+ private void evictBlocks() throws IOException {
+ int iterations = 0;
+
+ while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
+ transientFreeSpaceBelowThreshold()) {
+ RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
+
+ if (replicaState == null) {
+ break;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Evicting block " + replicaState);
+ }
+
+ ReplicaInfo replicaInfo, newReplicaInfo;
+ File blockFile, metaFile;
+ long blockFileUsed, metaFileUsed;
+ final String bpid = replicaState.getBlockPoolId();
+
+ synchronized (FsDatasetImpl.this) {
+ replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
+ Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+ blockFile = replicaInfo.getBlockFile();
+ metaFile = replicaInfo.getMetaFile();
+ blockFileUsed = blockFile.length();
+ metaFileUsed = metaFile.length();
+ discardRamDiskReplica(replicaState, false);
+
+ // Move the replica from lazyPersist/ to finalized/ on target volume
+ BlockPoolSlice bpSlice =
+ replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
+ File newBlockFile = bpSlice.activateSavedReplica(
+ replicaInfo, replicaState.getSavedMetaFile(),
+ replicaState.getSavedBlockFile());
+
+ newReplicaInfo =
+ new FinalizedReplica(replicaInfo.getBlockId(),
+ replicaInfo.getBytesOnDisk(),
+ replicaInfo.getGenerationStamp(),
+ replicaState.getLazyPersistVolume(),
+ newBlockFile.getParentFile());
+
+ // Update the volumeMap entry.
+ volumeMap.add(bpid, newReplicaInfo);
+
+ // Update metrics
+ datanode.getMetrics().incrRamDiskBlocksEvicted();
+ datanode.getMetrics().addRamDiskBlocksEvictionWindowMs(
+ Time.monotonicNow() - replicaState.getCreationTime());
+ if (replicaState.getNumReads() == 0) {
+ datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
+ }
+ }
+
+ // Before deleting the files from transient storage we must notify the
+ // NN that the files are on the new storage. Else a blockReport from
+ // the transient storage might cause the NN to think the blocks are lost.
+ ExtendedBlock extendedBlock =
+ new ExtendedBlock(bpid, newReplicaInfo);
+ datanode.notifyNamenodeReceivedBlock(
+ extendedBlock, null, newReplicaInfo.getStorageUuid());
+
+ // Remove the old replicas from transient storage.
+ if (blockFile.delete() || !blockFile.exists()) {
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
+ if (metaFile.delete() || !metaFile.exists()) {
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
+ }
+ }
+
+ // If deletion failed then the directory scanner will cleanup the blocks
+ // eventually.
+ }
+ }
+
+ @Override
+ public void run() {
+ int numSuccessiveFailures = 0;
+
+ while (fsRunning && shouldRun) {
+ try {
+ numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
+ evictBlocks();
+
+ // Sleep if we have no more work to do or if it looks like we are not
+ // making any forward progress. This is to ensure that if all persist
+ // operations are failing we don't keep retrying them in a tight loop.
+ if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
+ Thread.sleep(checkpointerInterval * 1000);
+ numSuccessiveFailures = 0;
+ }
+ } catch (InterruptedException e) {
+ LOG.info("LazyWriter was interrupted, exiting");
+ break;
+ } catch (Exception e) {
+ LOG.warn("Ignoring exception in LazyWriter:", e);
+ }
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 63bc6a1..60fb71d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -77,7 +77,7 @@
* dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
* contention.
*/
- private final ThreadPoolExecutor cacheExecutor;
+ protected ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf, StorageType storageType) throws IOException {
@@ -96,6 +96,10 @@
}
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
+ if (storageType.isTransient()) {
+ return null;
+ }
+
final int maxNumThreads = dataset.datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
@@ -202,6 +206,11 @@
}
@Override
+ public boolean isTransientStorage() {
+ return storageType.isTransient();
+ }
+
+ @Override
public String getPath(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
}
@@ -230,9 +239,6 @@
@Override
public void reserveSpaceForRbw(long bytesToReserve) {
if (bytesToReserve != 0) {
- if (FsDatasetImpl.LOG.isDebugEnabled()) {
- FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
- }
reservedForRbw.addAndGet(bytesToReserve);
}
}
@@ -240,9 +246,6 @@
@Override
public void releaseReservedSpace(long bytesToRelease) {
if (bytesToRelease != 0) {
- if (FsDatasetImpl.LOG.isDebugEnabled()) {
- FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
- }
long oldReservation, newReservation;
do {
@@ -292,39 +295,29 @@
}
}
- void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+ void getVolumeMap(ReplicaMap volumeMap,
+ final RamDiskReplicaTracker ramDiskReplicaMap)
+ throws IOException {
for(BlockPoolSlice s : bpSlices.values()) {
- s.getVolumeMap(volumeMap);
+ s.getVolumeMap(volumeMap, ramDiskReplicaMap);
}
}
- void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
- getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
+ void getVolumeMap(String bpid, ReplicaMap volumeMap,
+ final RamDiskReplicaTracker ramDiskReplicaMap)
+ throws IOException {
+ getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
}
- /**
- * Add replicas under the given directory to the volume map
- * @param volumeMap the replicas map
- * @param dir an input directory
- * @param isFinalized true if the directory has finalized replicas;
- * false if the directory has rbw replicas
- * @throws IOException
- */
- void addToReplicasMap(String bpid, ReplicaMap volumeMap,
- File dir, boolean isFinalized) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- // TODO move this up
- // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
- bp.addToReplicasMap(volumeMap, dir, isFinalized);
- }
-
@Override
public String toString() {
return currentDir.getAbsolutePath();
}
void shutdown() {
- cacheExecutor.shutdown();
+ if (cacheExecutor != null) {
+ cacheExecutor.shutdown();
+ }
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) {
entry.getValue().shutdown();
@@ -373,6 +366,8 @@
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED);
+ File lazypersistDir = new File(bpCurrentDir,
+ DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
FileUtil.fullyDelete(bpDir);
@@ -384,6 +379,11 @@
!FileUtil.fullyDelete(finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
}
+ if (lazypersistDir.exists() &&
+ ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
+ !FileUtil.fullyDelete(lazypersistDir)))) {
+ throw new IOException("Failed to delete " + lazypersistDir);
+ }
FileUtil.fullyDelete(tmpDir);
for (File f : FileUtil.listFiles(bpCurrentDir)) {
if (!f.delete()) {
@@ -417,6 +417,5 @@
DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
}
-
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 90739c3..837ddf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -68,7 +68,25 @@
}
return blockChooser.chooseVolume(list, blockSize);
}
-
+
+ /**
+ * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+ * by a single thread and next volume is chosen with no concurrent
+ * update to {@link #volumes}.
+ * @param blockSize free space needed on the volume
+ * @return next volume to store the block in.
+ */
+ synchronized FsVolumeImpl getNextTransientVolume(
+ long blockSize) throws IOException {
+ final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+ for(FsVolumeImpl v : volumes) {
+ if (v.isTransientStorage()) {
+ list.add(v);
+ }
+ }
+ return blockChooser.chooseVolume(list, blockSize);
+ }
+
long getDfsUsed() throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes) {
@@ -101,7 +119,10 @@
return remaining;
}
- void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
+ void getAllVolumesMap(final String bpid,
+ final ReplicaMap volumeMap,
+ final RamDiskReplicaTracker ramDiskReplicaMap)
+ throws IOException {
long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
@@ -113,7 +134,7 @@
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
bpid + " on volume " + v + "...");
long startTime = Time.monotonicNow();
- v.getVolumeMap(bpid, volumeMap);
+ v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
@@ -142,17 +163,6 @@
+ totalTimeTaken + "ms");
}
- void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
- throws IOException {
- FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
- " on volume " + volume + "...");
- long startTime = Time.monotonicNow();
- volume.getVolumeMap(bpid, volumeMap);
- long timeTaken = Time.monotonicNow() - startTime;
- FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
- " on volume " + volume + ": " + timeTaken + "ms");
- }
-
/**
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
* volumes from the active list that result in a DiskErrorException.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
new file mode 100644
index 0000000..a843d9a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.collect.TreeMultimap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Time;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * An implementation of RamDiskReplicaTracker that uses an LRU
+ * eviction scheme.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
+
+ private class RamDiskReplicaLru extends RamDiskReplica {
+ long lastUsedTime;
+
+ private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
+ super(bpid, blockId, ramDiskVolume);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return super.equals(other);
+ }
+ }
+
+ /**
+ * Map of blockpool ID to <map of blockID to ReplicaInfo>.
+ */
+ Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
+
+ /**
+ * Queue of replicas that need to be written to disk.
+ * Stale entries are GC'd by dequeueNextReplicaToPersist.
+ */
+ Queue<RamDiskReplicaLru> replicasNotPersisted;
+
+ /**
+ * Map of persisted replicas ordered by their last use times.
+ */
+ TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
+
+ RamDiskReplicaLruTracker() {
+ replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
+ replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
+ replicasPersisted = TreeMultimap.create();
+ }
+
+ @Override
+ synchronized void addReplica(final String bpid, final long blockId,
+ final FsVolumeImpl transientVolume) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ if (map == null) {
+ map = new HashMap<Long, RamDiskReplicaLru>();
+ replicaMaps.put(bpid, map);
+ }
+ RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
+ map.put(blockId, ramDiskReplicaLru);
+ replicasNotPersisted.add(ramDiskReplicaLru);
+ }
+
+ @Override
+ synchronized void touch(final String bpid,
+ final long blockId) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+ if (ramDiskReplicaLru == null) {
+ return;
+ }
+
+ ramDiskReplicaLru.numReads.getAndIncrement();
+
+ // Reinsert the replica with its new timestamp.
+ if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
+ ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
+ replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+ }
+ }
+
+ @Override
+ synchronized void recordStartLazyPersist(
+ final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+ ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
+ }
+
+ @Override
+ synchronized void recordEndLazyPersist(
+ final String bpid, final long blockId, final File[] savedFiles) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+ if (ramDiskReplicaLru == null) {
+ throw new IllegalStateException("Unknown replica bpid=" +
+ bpid + "; blockId=" + blockId);
+ }
+ ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
+
+ if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
+ // Common case.
+ replicasNotPersisted.remove();
+ } else {
+ // Caller error? Fallback to O(n) removal.
+ replicasNotPersisted.remove(ramDiskReplicaLru);
+ }
+
+ ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
+ replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+ ramDiskReplicaLru.isPersisted = true;
+ }
+
+ @Override
+ synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
+ while (replicasNotPersisted.size() != 0) {
+ RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
+ Map<Long, RamDiskReplicaLru> replicaMap =
+ replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+ if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+ return ramDiskReplicaLru;
+ }
+
+ // The replica no longer exists, look for the next one.
+ }
+ return null;
+ }
+
+ @Override
+ synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
+ replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
+ }
+
+ @Override
+ synchronized int numReplicasNotPersisted() {
+ return replicasNotPersisted.size();
+ }
+
+ @Override
+ synchronized RamDiskReplicaLru getNextCandidateForEviction() {
+ Iterator it = replicasPersisted.values().iterator();
+ while (it.hasNext()) {
+ RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
+ it.remove();
+
+ Map<Long, RamDiskReplicaLru> replicaMap =
+ replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+ if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+ return ramDiskReplicaLru;
+ }
+
+ // The replica no longer exists, look for the next one.
+ }
+ return null;
+ }
+
+ /**
+ * Discard any state we are tracking for the given replica. This could mean
+ * the block is either deleted from the block space or the replica is no longer
+ * on transient storage.
+ *
+ * @param deleteSavedCopies true if we should delete the saved copies on
+ * persistent storage. This should be set by the
+ * caller when the block is no longer needed.
+ */
+ @Override
+ synchronized void discardReplica(
+ final String bpid, final long blockId,
+ boolean deleteSavedCopies) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+ if (map == null) {
+ return;
+ }
+
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+ if (ramDiskReplicaLru == null) {
+ return;
+ }
+
+ if (deleteSavedCopies) {
+ ramDiskReplicaLru.deleteSavedFiles();
+ }
+
+ map.remove(blockId);
+ replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+
+ // replicasNotPersisted will be lazily GC'ed.
+ }
+
+ @Override
+ synchronized RamDiskReplica getReplica(
+ final String bpid, final long blockId) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+ if (map == null) {
+ return null;
+ }
+
+ return map.get(blockId);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
new file mode 100644
index 0000000..7507925
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicLong;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class RamDiskReplicaTracker {
+ static final Log LOG = LogFactory.getLog(RamDiskReplicaTracker.class);
+
+ FsDatasetImpl fsDataset;
+
+ static class RamDiskReplica implements Comparable<RamDiskReplica> {
+ private final String bpid;
+ private final long blockId;
+ private File savedBlockFile;
+ private File savedMetaFile;
+
+ private long creationTime;
+ protected AtomicLong numReads = new AtomicLong(0);
+ protected boolean isPersisted;
+
+ /**
+ * RAM_DISK volume that holds the original replica.
+ */
+ final FsVolumeSpi ramDiskVolume;
+
+ /**
+ * Persistent volume that holds or will hold the saved replica.
+ */
+ FsVolumeImpl lazyPersistVolume;
+
+ RamDiskReplica(final String bpid, final long blockId,
+ final FsVolumeImpl ramDiskVolume) {
+ this.bpid = bpid;
+ this.blockId = blockId;
+ this.ramDiskVolume = ramDiskVolume;
+ lazyPersistVolume = null;
+ savedMetaFile = null;
+ savedBlockFile = null;
+ creationTime = Time.monotonicNow();
+ isPersisted = false;
+ }
+
+ long getBlockId() {
+ return blockId;
+ }
+
+ String getBlockPoolId() {
+ return bpid;
+ }
+
+ FsVolumeImpl getLazyPersistVolume() {
+ return lazyPersistVolume;
+ }
+
+ void setLazyPersistVolume(FsVolumeImpl volume) {
+ Preconditions.checkState(!volume.isTransientStorage());
+ this.lazyPersistVolume = volume;
+ }
+
+ File getSavedBlockFile() {
+ return savedBlockFile;
+ }
+
+ File getSavedMetaFile() {
+ return savedMetaFile;
+ }
+
+ long getNumReads() { return numReads.get(); }
+
+ long getCreationTime() { return creationTime; }
+
+ boolean getIsPersisted() {return isPersisted; }
+
+ /**
+ * Record the saved meta and block files on the given volume.
+ *
+ * @param files Meta and block files, in that order.
+ */
+ void recordSavedBlockFiles(File[] files) {
+ this.savedMetaFile = files[0];
+ this.savedBlockFile = files[1];
+ }
+
+ @Override
+ public int hashCode() {
+ return bpid.hashCode() ^ (int) blockId;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ RamDiskReplica otherState = (RamDiskReplica) other;
+ return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
+ }
+
+ // Delete the saved meta and block files. Failure to delete can be
+ // ignored, the directory scanner will retry the deletion later.
+ void deleteSavedFiles() {
+ if (savedBlockFile != null) {
+ if (!savedBlockFile.delete()) {
+ LOG.warn("Failed to delete block file " + savedBlockFile);
+ }
+ savedBlockFile = null;
+ }
+
+ if (savedMetaFile != null) {
+ if (!savedMetaFile.delete()) {
+ LOG.warn("Failed to delete meta file " + savedMetaFile);
+ }
+ savedMetaFile = null;
+ }
+ }
+
+ @Override
+ public int compareTo(RamDiskReplica other) {
+ int bpidResult = bpid.compareTo(other.bpid);
+ if (bpidResult == 0)
+ if (blockId == other.blockId) {
+ return 0;
+ } else if (blockId < other.blockId) {
+ return -1;
+ } else {
+ return 1;
+ }
+ return bpidResult;
+ }
+
+ @Override
+ public String toString() {
+ return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
+ }
+ }
+
+ /**
+ * Get an instance of the configured RamDiskReplicaTracker based on the
+ * the configuration property
+ * {@link org.apache.hadoop.hdfs.DFSConfigKeys#DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY}.
+ *
+ * @param conf the configuration to be used
+ * @param dataset the FsDataset object.
+ * @return an instance of RamDiskReplicaTracker
+ */
+ static RamDiskReplicaTracker getInstance(final Configuration conf,
+ final FsDatasetImpl fsDataset) {
+ final Class<? extends RamDiskReplicaTracker> trackerClass = conf.getClass(
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY,
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT,
+ RamDiskReplicaTracker.class);
+ final RamDiskReplicaTracker tracker = ReflectionUtils.newInstance(
+ trackerClass, conf);
+ tracker.initialize(fsDataset);
+ return tracker;
+ }
+
+ void initialize(final FsDatasetImpl fsDataset) {
+ this.fsDataset = fsDataset;
+ }
+
+ /**
+ * Start tracking a new finalized replica on RAM disk.
+ *
+ * @param transientVolume RAM disk volume that stores the replica.
+ */
+ abstract void addReplica(final String bpid, final long blockId,
+ final FsVolumeImpl transientVolume);
+
+ /**
+ * Invoked when a replica is opened by a client. This may be used as
+ * a heuristic by the eviction scheme.
+ */
+ abstract void touch(final String bpid, final long blockId);
+
+ /**
+ * Get the next replica to write to persistent storage.
+ */
+ abstract RamDiskReplica dequeueNextReplicaToPersist();
+
+ /**
+ * Invoked if a replica that was previously dequeued for persistence
+ * could not be successfully persisted. Add it back so it can be retried
+ * later.
+ */
+ abstract void reenqueueReplicaNotPersisted(
+ final RamDiskReplica ramDiskReplica);
+
+ /**
+ * Invoked when the Lazy persist operation is started by the DataNode.
+ * @param checkpointVolume
+ */
+ abstract void recordStartLazyPersist(
+ final String bpid, final long blockId, FsVolumeImpl checkpointVolume);
+
+ /**
+ * Invoked when the Lazy persist operation is complete.
+ *
+ * @param savedFiles The saved meta and block files, in that order.
+ */
+ abstract void recordEndLazyPersist(
+ final String bpid, final long blockId, final File[] savedFiles);
+
+ /**
+ * Return a candidate replica to remove from RAM Disk. The exact replica
+ * to be returned may depend on the eviction scheme utilized.
+ *
+ * @return
+ */
+ abstract RamDiskReplica getNextCandidateForEviction();
+
+ /**
+ * Return the number of replicas pending persistence to disk.
+ */
+ abstract int numReplicasNotPersisted();
+
+ /**
+ * Discard all state we are tracking for the given replica.
+ */
+ abstract void discardReplica(
+ final String bpid, final long blockId,
+ boolean deleteSavedCopies);
+
+ /**
+ * Return RamDiskReplica info given block pool id and block id
+ * Return null if it does not exist in RamDisk
+ */
+ abstract RamDiskReplica getReplica(
+ final String bpid, final long blockId);
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index b536e7e..57f12db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -65,6 +65,26 @@
@Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
+ // RamDisk metrics on read/write
+ @Metric MutableCounterLong ramDiskBlocksWrite;
+ @Metric MutableCounterLong ramDiskBlocksWriteFallback;
+ @Metric MutableCounterLong ramDiskBytesWrite;
+ @Metric MutableCounterLong ramDiskBlocksReadHits;
+
+ // RamDisk metrics on eviction
+ @Metric MutableCounterLong ramDiskBlocksEvicted;
+ @Metric MutableCounterLong ramDiskBlocksEvictedWithoutRead;
+ @Metric MutableRate ramDiskBlocksEvictionWindowMs;
+ final MutableQuantiles[] ramDiskBlocksEvictionWindowMsQuantiles;
+
+
+ // RamDisk metrics on lazy persist
+ @Metric MutableCounterLong ramDiskBlocksLazyPersisted;
+ @Metric MutableCounterLong ramDiskBlocksDeletedBeforeLazyPersisted;
+ @Metric MutableCounterLong ramDiskBytesLazyPersisted;
+ @Metric MutableRate ramDiskBlocksLazyPersistWindowMs;
+ final MutableQuantiles[] ramDiskBlocksLazyPersistWindowMsQuantiles;
+
@Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@@ -107,6 +127,8 @@
fsyncNanosQuantiles = new MutableQuantiles[len];
sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+ ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
+ ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) {
int interval = intervals[i];
@@ -127,6 +149,14 @@
"sendDataPacketTransferNanos" + interval + "s",
"Time reading from disk and writing to network while sending " +
"a packet in ns", "ops", "latency", interval);
+ ramDiskBlocksEvictionWindowMsQuantiles[i] = registry.newQuantiles(
+ "ramDiskBlocksEvictionWindows" + interval + "s",
+ "Time between the RamDisk block write and eviction in ms",
+ "ops", "latency", interval);
+ ramDiskBlocksLazyPersistWindowMsQuantiles[i] = registry.newQuantiles(
+ "ramDiskBlocksLazyPersistWindows" + interval + "s",
+ "Time between the RamDisk block write and disk persist in ms",
+ "ops", "latency", interval);
}
}
@@ -284,4 +314,54 @@
q.add(latencyNanos);
}
}
+
+ public void incrRamDiskBlocksWrite() {
+ ramDiskBlocksWrite.incr();
+ }
+
+ public void incrRamDiskBlocksWriteFallback() {
+ ramDiskBlocksWriteFallback.incr();
+ }
+
+ public void addRamDiskBytesWrite(long bytes) {
+ ramDiskBytesWrite.incr(bytes);
+ }
+
+ public void incrRamDiskBlocksReadHits() {
+ ramDiskBlocksReadHits.incr();
+ }
+
+ public void incrRamDiskBlocksEvicted() {
+ ramDiskBlocksEvicted.incr();
+ }
+
+ public void incrRamDiskBlocksEvictedWithoutRead() {
+ ramDiskBlocksEvictedWithoutRead.incr();
+ }
+
+ public void addRamDiskBlocksEvictionWindowMs(long latencyMs) {
+ ramDiskBlocksEvictionWindowMs.add(latencyMs);
+ for (MutableQuantiles q : ramDiskBlocksEvictionWindowMsQuantiles) {
+ q.add(latencyMs);
+ }
+ }
+
+ public void incrRamDiskBlocksLazyPersisted() {
+ ramDiskBlocksLazyPersisted.incr();
+ }
+
+ public void incrRamDiskBlocksDeletedBeforeLazyPersisted() {
+ ramDiskBlocksDeletedBeforeLazyPersisted.incr();
+ }
+
+ public void incrRamDiskBytesLazyPersisted(long bytes) {
+ ramDiskBytesLazyPersisted.incr(bytes);
+ }
+
+ public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) {
+ ramDiskBlocksLazyPersistWindowMs.add(latencyMs);
+ for (MutableQuantiles q : ramDiskBlocksLazyPersistWindowMsQuantiles) {
+ q.add(latencyMs);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 04133bd..59814af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -69,7 +69,7 @@
= new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
private StorageMap() {
- for(StorageType t : StorageType.asList()) {
+ for(StorageType t : StorageType.getMovableTypes()) {
targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
}
}
@@ -130,7 +130,7 @@
final List<DatanodeStorageReport> reports = dispatcher.init();
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
- for(StorageType t : StorageType.asList()) {
+ for(StorageType t : StorageType.getMovableTypes()) {
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
final long maxRemaining = getMaxRemaining(r, t);
final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t,
@@ -348,7 +348,7 @@
LocatedBlock lb = lbs.get(i);
final StorageTypeDiff diff = new StorageTypeDiff(types,
lb.getStorageTypes());
- if (!diff.removeOverlap()) {
+ if (!diff.removeOverlap(true)) {
if (scheduleMoves4Block(diff, lb)) {
hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1);
@@ -452,22 +452,38 @@
this.expected = new LinkedList<StorageType>(expected);
this.existing = new LinkedList<StorageType>(Arrays.asList(existing));
}
-
+
/**
* Remove the overlap between the expected types and the existing types.
- * @return if the existing types or the expected types is empty after
+ * @param ignoreNonMovable ignore non-movable storage types
+ * by removing them from both expected and existing storage type list
+ * to prevent non-movable storage from being moved.
+ * @returns if the existing types or the expected types is empty after
* removing the overlap.
*/
- boolean removeOverlap() {
+ boolean removeOverlap(boolean ignoreNonMovable) {
for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
final StorageType t = i.next();
if (expected.remove(t)) {
i.remove();
}
}
+ if (ignoreNonMovable) {
+ removeNonMovable(existing);
+ removeNonMovable(expected);
+ }
return expected.isEmpty() || existing.isEmpty();
}
-
+
+ void removeNonMovable(List<StorageType> types) {
+ for (Iterator<StorageType> i = types.iterator(); i.hasNext(); ) {
+ final StorageType t = i.next();
+ if (!t.isMovable()) {
+ i.remove();
+ }
+ }
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + "{expected=" + expected
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index aab2183..0d999e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -60,6 +60,7 @@
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -1036,6 +1037,20 @@
}
final int snapshotId = iip.getLatestSnapshotId();
if (inode.isFile()) {
+ BlockStoragePolicy newPolicy = getBlockManager().getStoragePolicy(policyId);
+ if (newPolicy.isCopyOnCreateFile()) {
+ throw new HadoopIllegalArgumentException(
+ "Policy " + newPolicy + " cannot be set after file creation.");
+ }
+
+ BlockStoragePolicy currentPolicy =
+ getBlockManager().getStoragePolicy(inode.getLocalStoragePolicyID());
+
+ if (currentPolicy != null && currentPolicy.isCopyOnCreateFile()) {
+ throw new HadoopIllegalArgumentException(
+ "Existing policy " + currentPolicy.getName() +
+ " cannot be changed after file creation.");
+ }
inode.asFile().setStoragePolicyID(policyId, snapshotId);
} else if (inode.isDirectory()) {
setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 98ea7d6..c9eb9fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -415,6 +415,7 @@
private AddCloseOp(FSEditLogOpCodes opCode) {
super(opCode);
+ storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
assert(opCode == OP_ADD || opCode == OP_CLOSE);
}
@@ -709,6 +710,7 @@
this.mtime = Long.parseLong(st.getValue("MTIME"));
this.atime = Long.parseLong(st.getValue("ATIME"));
this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
+
this.clientName = st.getValue("CLIENT_NAME");
this.clientMachine = st.getValue("CLIENT_MACHINE");
this.overwrite = Boolean.parseBoolean(st.getValueOrNull("OVERWRITE"));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index af3cf2c..b1e7dfe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -783,8 +783,9 @@
if (counter != null) {
counter.increment();
}
+
final INodeFile file = new INodeFile(inodeId, localName, permissions,
- modificationTime, atime, blocks, replication, blockSize, (byte)0);
+ modificationTime, atime, blocks, replication, blockSize);
if (underConstruction) {
file.toUnderConstruction(clientName, clientMachine);
}
@@ -885,7 +886,7 @@
final long preferredBlockSize = in.readLong();
return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
- accessTime, replication, preferredBlockSize, (byte)0, null);
+ accessTime, replication, preferredBlockSize, (byte) 0, null);
}
public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index d3472fc..16636a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -148,14 +148,16 @@
int numLocs = in.readInt();
assert numLocs == 0 : "Unexpected block locations";
+ // Images in the pre-protobuf format will not have the lazyPersist flag,
+ // so it is safe to pass false always.
INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
- modificationTime, blocks, blockReplication, preferredBlockSize, (byte)0);
+ modificationTime, blocks, blockReplication, preferredBlockSize);
file.toUnderConstruction(clientName, clientMachine);
return file;
}
// Helper function that writes an INodeUnderConstruction
- // into the input stream
+ // into the output stream
//
static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons,
String path) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index a3d8dc7..baea1a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -60,6 +60,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -360,7 +362,8 @@
Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
Path path = dst != null ? new Path(dst) : new Path(src);
status = new FileStatus(stat.getLen(), stat.isDir(),
- stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
+ stat.getReplication(), stat.getBlockSize(),
+ stat.getModificationTime(),
stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
stat.getGroup(), symlink, path);
}
@@ -445,6 +448,10 @@
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
+
+ // A daemon to periodically clean up corrupt lazyPersist files
+ // from the name space.
+ Daemon lazyPersistFileScrubber = null;
/**
* When an active namenode will roll its own edit log, in # edits
*/
@@ -454,6 +461,12 @@
*/
private final int editLogRollerInterval;
+ /**
+ * How frequently we scan and unlink corrupt lazyPersist files.
+ * (In seconds)
+ */
+ private final int lazyPersistFileScrubIntervalSec;
+
private volatile boolean hasResourcesAvailable = false;
private volatile boolean fsRunning = true;
@@ -863,6 +876,15 @@
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
this.inodeId = new INodeId();
+ this.lazyPersistFileScrubIntervalSec = conf.getInt(
+ DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+ DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
+
+ if (this.lazyPersistFileScrubIntervalSec == 0) {
+ throw new IllegalArgumentException(
+ DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero.");
+ }
+
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean(
@@ -932,7 +954,7 @@
@VisibleForTesting
static RetryCache initRetryCache(Configuration conf) {
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
- DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
+ DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
if (enable) {
float heapPercent = conf.getFloat(
@@ -1161,6 +1183,12 @@
editLogRollerThreshold, editLogRollerInterval));
nnEditLogRoller.start();
+ if (lazyPersistFileScrubIntervalSec > 0) {
+ lazyPersistFileScrubber = new Daemon(new LazyPersistFileScrubber(
+ lazyPersistFileScrubIntervalSec));
+ lazyPersistFileScrubber.start();
+ }
+
cacheManager.startMonitorThread();
blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
} finally {
@@ -1214,6 +1242,10 @@
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
nnEditLogRoller.interrupt();
}
+ if (lazyPersistFileScrubber != null) {
+ ((LazyPersistFileScrubber) lazyPersistFileScrubber.getRunnable()).stop();
+ lazyPersistFileScrubber.interrupt();
+ }
if (dir != null && getFSImage() != null) {
if (getFSImage().editLog != null) {
getFSImage().editLog.close();
@@ -2517,6 +2549,7 @@
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
waitForLoadingFSImage();
@@ -2576,7 +2609,7 @@
src = resolvePath(src, pathComponents);
toRemoveBlocks = startFileInternal(pc, src, permissions, holder,
clientMachine, create, overwrite, createParent, replication,
- blockSize, suite, protocolVersion, edek, logRetryCache);
+ blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
stat = dir.getFileInfo(src, false,
FSDirectory.isReservedRawName(srcArg), true);
} catch (StandbyException se) {
@@ -2612,7 +2645,7 @@
String src, PermissionStatus permissions, String holder,
String clientMachine, boolean create, boolean overwrite,
boolean createParent, short replication, long blockSize,
- CipherSuite suite, CryptoProtocolVersion version,
+ boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
EncryptedKeyVersion edek, boolean logRetryEntry)
throws FileAlreadyExistsException, AccessControlException,
UnresolvedLinkException, FileNotFoundException,
@@ -2693,7 +2726,7 @@
if (parent != null && mkdirsRecursively(parent.toString(),
permissions, true, now())) {
newNode = dir.addFile(src, permissions, replication, blockSize,
- holder, clientMachine);
+ holder, clientMachine);
}
if (newNode == null) {
@@ -2708,6 +2741,8 @@
newNode = dir.getInode(newNode.getId()).asFile();
}
+ setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
+
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2722,6 +2757,37 @@
}
}
+ private void setNewINodeStoragePolicy(INodeFile inode,
+ INodesInPath iip,
+ boolean isLazyPersist)
+ throws IOException {
+
+ if (isLazyPersist) {
+ BlockStoragePolicy lpPolicy =
+ blockManager.getStoragePolicy("LAZY_PERSIST");
+
+ // Set LAZY_PERSIST storage policy if the flag was passed to
+ // CreateFile.
+ if (lpPolicy == null) {
+ throw new HadoopIllegalArgumentException(
+ "The LAZY_PERSIST storage policy has been disabled " +
+ "by the administrator.");
+ }
+ inode.setStoragePolicyID(lpPolicy.getId(),
+ iip.getLatestSnapshotId());
+ } else {
+ BlockStoragePolicy effectivePolicy =
+ blockManager.getStoragePolicy(inode.getStoragePolicyID());
+
+ if (effectivePolicy != null &&
+ effectivePolicy.isCopyOnCreateFile()) {
+ // Copy effective policy from ancestor directory to current file.
+ inode.setStoragePolicyID(effectivePolicy.getId(),
+ iip.getLatestSnapshotId());
+ }
+ }
+ }
+
/**
* Append to an existing file for append.
* <p>
@@ -2761,6 +2827,14 @@
+ src + " for client " + clientMachine);
}
INodeFile myFile = INodeFile.valueOf(inode, src, true);
+ final BlockStoragePolicy lpPolicy =
+ blockManager.getStoragePolicy("LAZY_PERSIST");
+
+ if (lpPolicy != null &&
+ lpPolicy.getId() == myFile.getStoragePolicyID()) {
+ throw new UnsupportedOperationException(
+ "Cannot append to lazy persist file " + src);
+ }
// Opening an existing file for write - may need to recover lease.
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
@@ -5088,6 +5162,73 @@
}
}
+ /**
+ * Daemon to periodically scan the namespace for lazyPersist files
+ * with missing blocks and unlink them.
+ */
+ class LazyPersistFileScrubber implements Runnable {
+ private volatile boolean shouldRun = true;
+ final int scrubIntervalSec;
+ public LazyPersistFileScrubber(final int scrubIntervalSec) {
+ this.scrubIntervalSec = scrubIntervalSec;
+ }
+
+ /**
+ * Periodically go over the list of lazyPersist files with missing
+ * blocks and unlink them from the namespace.
+ */
+ private void clearCorruptLazyPersistFiles()
+ throws SafeModeException, AccessControlException,
+ UnresolvedLinkException, IOException {
+
+ BlockStoragePolicy lpPolicy = blockManager.getStoragePolicy("LAZY_PERSIST");
+
+ List<BlockCollection> filesToDelete = new ArrayList<BlockCollection>();
+
+ writeLock();
+
+ try {
+ final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator();
+
+ while (it.hasNext()) {
+ Block b = it.next();
+ BlockInfo blockInfo = blockManager.getStoredBlock(b);
+ if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) {
+ filesToDelete.add(blockInfo.getBlockCollection());
+ }
+ }
+
+ for (BlockCollection bc : filesToDelete) {
+ LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas.");
+ deleteInternal(bc.getName(), false, false, false);
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ @Override
+ public void run() {
+ while (fsRunning && shouldRun) {
+ try {
+ clearCorruptLazyPersistFiles();
+ Thread.sleep(scrubIntervalSec * 1000);
+ } catch (InterruptedException e) {
+ FSNamesystem.LOG.info(
+ "LazyPersistFileScrubber was interrupted, exiting");
+ break;
+ } catch (Exception e) {
+ FSNamesystem.LOG.error(
+ "Ignoring exception in LazyPersistFileScrubber:", e);
+ }
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ }
+ }
+
public FSImage getFSImage() {
return fsImage;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 583c193..dde36c30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -108,6 +108,7 @@
h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
return h;
}
+
}
private long header = 0L;
@@ -115,11 +116,17 @@
private BlockInfo[] blocks;
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
+ long atime, BlockInfo[] blklist, short replication,
+ long preferredBlockSize) {
+ this(id, name, permissions, mtime, atime, blklist, replication,
+ preferredBlockSize, (byte) 0);
+ }
+
+ INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime, BlockInfo[] blklist, short replication,
long preferredBlockSize, byte storagePolicyID) {
super(id, name, permissions, mtime, atime);
- header = HeaderFormat.toLong(preferredBlockSize, replication,
- storagePolicyID);
+ header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID);
this.blocks = blklist;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
index f9d2700..0f85bab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
-
/**
* The attributes of a file.
*/
@@ -47,11 +46,12 @@
public SnapshotCopy(byte[] name, PermissionStatus permissions,
AclFeature aclFeature, long modificationTime, long accessTime,
- short replication, long preferredBlockSize, byte storagePolicyID,
- XAttrFeature xAttrsFeature) {
+ short replication, long preferredBlockSize,
+ byte storagePolicyID, XAttrFeature xAttrsFeature) {
super(name, permissions, aclFeature, modificationTime, accessTime,
xAttrsFeature);
- header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID);
+ header = HeaderFormat.toLong(preferredBlockSize, replication,
+ storagePolicyID);
}
public SnapshotCopy(INodeFile file) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
index bafef25..bbd545a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
@@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
+import java.util.regex.Pattern;
import javax.management.AttributeNotFoundException;
import javax.management.MBeanAttributeInfo;
@@ -109,6 +110,23 @@
}
}
+ public void printAllMatchedAttributes(String attrRegExp) throws Exception {
+ err("List of the keys matching " + attrRegExp + " :");
+ Object val = null;
+ Pattern p = Pattern.compile(attrRegExp);
+ for (ObjectName oname : hadoopObjectNames) {
+ err(">>>>>>>>jmx name: " + oname.getCanonicalKeyPropertyListString());
+ MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
+ MBeanAttributeInfo[] mbinfos = mbinfo.getAttributes();
+ for (MBeanAttributeInfo mb : mbinfos) {
+ if (p.matcher(mb.getName()).lookingAt()) {
+ val = mbsc.getAttribute(oname, mb.getName());
+ System.out.format(format, mb.getName(), (val == null) ? "" : val.toString());
+ }
+ }
+ }
+ }
+
/**
* get single value by key
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
index 0eb7c61..0a2ae26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
@@ -257,6 +257,8 @@
final long aTime = (Long) m.get("accessTime");
final long mTime = (Long) m.get("modificationTime");
final long blockSize = (Long) m.get("blockSize");
+ final boolean isLazyPersist = m.containsKey("lazyPersist")
+ ? (Boolean) m.get("lazyPersist") : false;
final short replication = (short) (long) (Long) m.get("replication");
final long fileId = m.containsKey("fileId") ? (Long) m.get("fileId")
: INodeId.GRANDFATHER_INODE_ID;
@@ -267,8 +269,9 @@
(byte) (long) (Long) m.get("storagePolicy") :
BlockStoragePolicySuite.ID_UNSPECIFIED;
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
- blockSize, mTime, aTime, permission, owner, group, symlink,
- DFSUtil.string2Bytes(localName), fileId, childrenNum, null, storagePolicy);
+ blockSize, mTime, aTime, permission, owner, group,
+ symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null,
+ storagePolicy);
}
/** Convert an ExtendedBlock to a Json map. */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 14e8c68..232c264 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -65,6 +65,7 @@
CREATE = 0x01; // Create a file
OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC
APPEND = 0x04; // Append to a file
+ LAZY_PERSIST = 0x10; // File with reduced durability guarantees.
}
message CreateRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 098d10a..fb774b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -115,6 +115,13 @@
optional CachingStrategyProto cachingStrategy = 10;
optional StorageTypeProto storageType = 11 [default = DISK];
repeated StorageTypeProto targetStorageTypes = 12;
+
+ /**
+ * Hint to the DataNode that the block can be allocated on transient
+ * storage i.e. memory and written to disk lazily. The DataNode is free
+ * to ignore this hint.
+ */
+ optional bool allowLazyPersist = 13 [default = false];
}
message OpTransferBlockProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 38e649edb..10af3b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -159,6 +159,7 @@
DISK = 1;
SSD = 2;
ARCHIVE = 3;
+ RAM_DISK = 4;
}
/**
@@ -305,7 +306,6 @@
// Optional field for fileId
optional uint64 fileId = 13 [default = 0]; // default as an invalid id
optional int32 childrenNum = 14 [default = -1];
-
// Optional field for file encryption
optional FileEncryptionInfoProto fileEncryptionInfo = 15;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index adc80a2..54814b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -398,6 +398,16 @@
</property>
<property>
+ <name>dfs.namenode.lazypersist.file.scrub.interval.sec</name>
+ <value>300</value>
+ <description>
+ The NameNode periodically scans the namespace for LazyPersist files with
+ missing blocks and unlinks them from the namespace. This configuration key
+ controls the interval between successive scans. Set it to a negative value
+ to disable this behavior.
+ </description>
+</property>
+<property>
<name>dfs.block.access.token.enable</name>
<value>false</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 7be6a49..84792b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -93,6 +93,7 @@
import java.util.*;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.fs.CreateFlag.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -285,16 +286,29 @@
public static void createFile(FileSystem fs, Path fileName, int bufferLen,
long fileLen, long blockSize, short replFactor, long seed)
throws IOException {
- assert bufferLen > 0;
- if (!fs.mkdirs(fileName.getParent())) {
- throw new IOException("Mkdirs failed to create " +
- fileName.getParent().toString());
- }
- FSDataOutputStream out = null;
- try {
- out = fs.create(fileName, true, fs.getConf()
- .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
- replFactor, blockSize);
+ createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
+ replFactor, seed, false);
+ }
+
+ public static void createFile(FileSystem fs, Path fileName,
+ boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+ short replFactor, long seed, boolean flush) throws IOException {
+ assert bufferLen > 0;
+ if (!fs.mkdirs(fileName.getParent())) {
+ throw new IOException("Mkdirs failed to create " +
+ fileName.getParent().toString());
+ }
+ FSDataOutputStream out = null;
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+ createFlags.add(OVERWRITE);
+ if (isLazyPersist) {
+ createFlags.add(LAZY_PERSIST);
+ }
+ try {
+ out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
+ fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+ replFactor, blockSize, null);
+
if (fileLen > 0) {
byte[] toWrite = new byte[bufferLen];
Random rb = new Random(seed);
@@ -302,10 +316,13 @@
while (bytesToWrite>0) {
rb.nextBytes(toWrite);
int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
- : (int) bytesToWrite;
-
- out.write(toWrite, 0, bytesToWriteNext);
- bytesToWrite -= bytesToWriteNext;
+ : (int) bytesToWrite;
+
+ out.write(toWrite, 0, bytesToWriteNext);
+ bytesToWrite -= bytesToWriteNext;
+ }
+ if (flush) {
+ out.hsync();
}
}
} finally {
@@ -1415,6 +1432,39 @@
}
/**
+ * Helper function that verified blocks of a file are placed on the
+ * expected storage type.
+ *
+ * @param fs The file system containing the the file.
+ * @param client The DFS client used to access the file
+ * @param path name to the file to verify
+ * @param storageType expected storage type
+ * @returns true if file exists and its blocks are located on the expected
+ * storage type.
+ * false otherwise.
+ */
+ public static boolean verifyFileReplicasOnStorageType(FileSystem fs,
+ DFSClient client, Path path, StorageType storageType) throws IOException {
+ if (!fs.exists(path)) {
+ LOG.info("verifyFileReplicasOnStorageType: file " + path + "does not exist");
+ return false;
+ }
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ if (locatedBlock.getStorageTypes()[0] != storageType) {
+ LOG.info("verifyFileReplicasOnStorageType: for file " + path +
+ ". Expect blk" + locatedBlock +
+ " on Type: " + storageType + ". Actual Type: " +
+ locatedBlock.getStorageTypes()[0]);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Helper function to create a key in the Key Provider. Defaults
* to the first indexed NameNode's Key Provider.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 0512b7f..0010a75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1340,6 +1340,7 @@
}
int curDatanodesNum = dataNodes.size();
+ final int curDatanodesNumSaved = curDatanodesNum;
// for mincluster's the default initialDelay for BRs is 0
if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
@@ -1390,7 +1391,8 @@
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) {
- String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
+ String dirs = makeDataNodeDirs(i, storageTypes == null ?
+ null : storageTypes[i - curDatanodesNum]);
dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index 771b7bd..03317b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -69,7 +69,7 @@
static final byte COLD = (byte) 4;
static final byte WARM = (byte) 8;
static final byte HOT = (byte) 12;
-
+ static final byte LAZY_PERSIST = (byte) 15;
@Test (timeout=300000)
public void testConfigKeyEnabled() throws IOException {
@@ -116,6 +116,9 @@
expectedPolicyStrings.put(HOT,
"BlockStoragePolicy{HOT:12, storageTypes=[DISK], " +
"creationFallbacks=[], replicationFallbacks=[ARCHIVE]}");
+ expectedPolicyStrings.put(LAZY_PERSIST,
+ "BlockStoragePolicy{LAZY_PERSIST:15, storageTypes=[RAM_DISK, DISK], " +
+ "creationFallbacks=[DISK], replicationFallbacks=[DISK]}");
for(byte i = 1; i < 16; i++) {
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
@@ -1141,7 +1144,7 @@
final DistributedFileSystem fs = cluster.getFileSystem();
try {
BlockStoragePolicy[] policies = fs.getStoragePolicies();
- Assert.assertEquals(3, policies.length);
+ Assert.assertEquals(4, policies.length);
Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
policies[0].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java
index 01bfb0d..39d3c96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java
@@ -115,9 +115,12 @@
* the upgrade. Actually it is ok for those contents to change.
* For now disabling block verification so that the contents are
* not changed.
+ * Disable duplicate replica deletion as the test intentionally
+ * mirrors the contents of storage directories.
*/
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
index 104b043..bb00144 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
@@ -229,6 +229,7 @@
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
log("Normal NameNode upgrade", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
@@ -370,6 +371,7 @@
{
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
@@ -405,6 +407,7 @@
int numDirs = 4;
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index bcb68e9..3586551 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -524,6 +524,6 @@
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
- checksum, CachingStrategy.newDefaultStrategy());
+ checksum, CachingStrategy.newDefaultStrategy(), false);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 29ac3f2..b7fdccf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -98,9 +98,10 @@
*/
@Override
public synchronized ReplicaInPipelineInterface createRbw(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+ throws IOException {
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
- return super.createRbw(storageType, b);
+ return super.createRbw(storageType, b, allowLazyPersist);
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index cb85c7d..98fd59a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -448,13 +448,16 @@
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
AdminStates.DECOMMISSIONED),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
- AdminStates.NORMAL)
+ AdminStates.NORMAL),
+ DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4",
+ AdminStates.NORMAL),
};
- String[] storageIDs = {"s1", "s2", "s3"};
+ String[] storageIDs = {"s1", "s2", "s3", "s4"};
StorageType[] media = {
StorageType.DISK,
StorageType.SSD,
- StorageType.DISK
+ StorageType.DISK,
+ StorageType.RAM_DISK
};
LocatedBlock lb = new LocatedBlock(
new ExtendedBlock("bp12", 12345, 10, 53),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index dbc3212a..751f186 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -41,12 +44,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -57,6 +55,7 @@
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;
@@ -86,6 +85,7 @@
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
static final int DEFAULT_BLOCK_SIZE = 100;
+ static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
private static final Random r = new Random();
static {
@@ -108,6 +108,15 @@
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
}
+ static void initConfWithRamDisk(Configuration conf) {
+ conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
+ conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+ conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
+ conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS, 1);
+ }
+
/* create a file with a length of <code>fileLen</code> */
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
short replicationFactor, int nnIndex)
@@ -1096,6 +1105,81 @@
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
}
+ /*
+ * Test Balancer with Ram_Disk configured
+ * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
+ * Then verify that the balancer does not migrate files on RAM_DISK across DN.
+ */
+ @Test(timeout=300000)
+ public void testBalancerWithRamDisk() throws Exception {
+ final int SEED = 0xFADED;
+ final short REPL_FACT = 1;
+ Configuration conf = new Configuration();
+ initConfWithRamDisk(conf);
+
+ final int defaultRamDiskCapacity = 10;
+ final int defaultDiskCapacity = 100;
+ final long ramDiskStorageLimit =
+ ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+ (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+ final long diskStorageLimit =
+ ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+ (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+
+ cluster = new MiniDFSCluster
+ .Builder(conf)
+ .numDataNodes(1)
+ .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
+ .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
+ .build();
+
+ try {
+ cluster.waitActive();
+ // Create few files on RAM_DISK
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+ DistributedFileSystem fs = cluster.getFileSystem();
+ DFSClient client = fs.getClient();
+ DFSTestUtil.createFile(fs, path1, true,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+ DFSTestUtil.createFile(fs, path1, true,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(6 * 1000);
+
+ // Add another fresh DN with the same type/capacity without files on RAM_DISK
+ StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
+ long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}};
+ cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
+ null, null, storageCapacities, null, false, false, false, null);
+
+ cluster.triggerHeartbeats();
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+ // Run Balancer
+ Balancer.Parameters p = new Balancer.Parameters(
+ Parameters.DEFAULT.policy,
+ Parameters.DEFAULT.threshold,
+ Parameters.DEFAULT.nodesToBeExcluded,
+ Parameters.DEFAULT.nodesToBeIncluded);
+ final int r = Balancer.run(namenodes, p, conf);
+
+ // Validate no RAM_DISK block should be moved
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+
+ // Verify files are still on RAM_DISK
+ DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
+ DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
/**
* @param args
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 9ea6c51..e9557da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -364,7 +364,7 @@
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
- dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
+ dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 83d93f0..d1284fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -300,6 +300,11 @@
public ChunkChecksum getLastChecksumAndDataLen() {
return new ChunkChecksum(oStream.getLength(), null);
}
+
+ @Override
+ public boolean isOnTransientStorage() {
+ return false;
+ }
}
/**
@@ -415,9 +420,66 @@
}
}
+ static class SimulatedVolume implements FsVolumeSpi {
+ private final SimulatedStorage storage;
+
+ SimulatedVolume(final SimulatedStorage storage) {
+ this.storage = storage;
+ }
+
+ @Override
+ public String getStorageID() {
+ return storage.getStorageUuid();
+ }
+
+ @Override
+ public String[] getBlockPoolList() {
+ return new String[0];
+ }
+
+ @Override
+ public long getAvailable() throws IOException {
+ return storage.getCapacity() - storage.getUsed();
+ }
+
+ @Override
+ public String getBasePath() {
+ return null;
+ }
+
+ @Override
+ public String getPath(String bpid) throws IOException {
+ return null;
+ }
+
+ @Override
+ public File getFinalizedDir(String bpid) throws IOException {
+ return null;
+ }
+
+ @Override
+ public StorageType getStorageType() {
+ return null;
+ }
+
+ @Override
+ public boolean isTransientStorage() {
+ return false;
+ }
+
+ @Override
+ public void reserveSpaceForRbw(long bytesToReserve) {
+ }
+
+ @Override
+ public void releaseReservedSpace(long bytesToRelease) {
+ }
+ }
+
private final Map<String, Map<Block, BInfo>> blockMap
= new HashMap<String, Map<Block,BInfo>>();
private final SimulatedStorage storage;
+ private final SimulatedVolume volume;
private final String datanodeUuid;
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
@@ -434,6 +496,7 @@
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
+ this.volume = new SimulatedVolume(this.storage);
}
public synchronized void injectBlocks(String bpid,
@@ -747,7 +810,8 @@
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createRbw(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, ExtendedBlock b,
+ boolean allowLazyPersist) throws IOException {
return createTemporary(storageType, b);
}
@@ -1083,7 +1147,7 @@
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) {
+ File diskMetaFile, FsVolumeSpi vol) throws IOException {
throw new UnsupportedOperationException();
}
@@ -1116,6 +1180,11 @@
}
@Override
+ public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Map<String, Object> getVolumeInfoMap() {
throw new UnsupportedOperationException();
}
@@ -1127,7 +1196,7 @@
@Override
public FsVolumeSpi getVolume(ExtendedBlock b) {
- throw new UnsupportedOperationException();
+ return volume;
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 67805c0..987b480 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -532,7 +532,7 @@
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- dn.data.createRbw(StorageType.DEFAULT, block);
+ dn.data.createRbw(StorageType.DEFAULT, block, false);
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
@@ -556,7 +556,7 @@
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
- StorageType.DEFAULT, block);
+ StorageType.DEFAULT, block, false);
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java
index 53babb4..94af015 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java
@@ -44,13 +44,14 @@
File dir1 = new File("/dir1");
File dir2 = new File("/dir2");
File dir3 = new File("/dir3");
+ File dir4 = new File("/dir4");
// Verify that a valid string is correctly parsed, and that storage
// type is not case-sensitive
- String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3";
+ String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3,[ram_disk]/dir4";
conf.set(DFS_DATANODE_DATA_DIR_KEY, locations1);
locations = DataNode.getStorageLocations(conf);
- assertThat(locations.size(), is(4));
+ assertThat(locations.size(), is(5));
assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(0).getUri(), is(dir0.toURI()));
assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
@@ -59,6 +60,8 @@
assertThat(locations.get(2).getUri(), is(dir2.toURI()));
assertThat(locations.get(3).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(3).getUri(), is(dir3.toURI()));
+ assertThat(locations.get(4).getStorageType(), is(StorageType.RAM_DISK));
+ assertThat(locations.get(4).getUri(), is(dir4.toURI()));
// Verify that an unrecognized storage type result in an exception.
String locations2 = "[BadMediaType]/dir0,[ssd]/dir1,[disk]/dir2";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index bc50eaa..b7795b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -31,22 +33,21 @@
import java.util.List;
import java.util.Random;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
/**
@@ -60,22 +61,29 @@
private MiniDFSCluster cluster;
private String bpid;
+ private DFSClient client;
private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
private DirectoryScanner scanner = null;
private final Random rand = new Random();
private final Random r = new Random();
+ private static final int BLOCK_LENGTH = 100;
static {
- CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
+ CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
}
/** create a file with a length of <code>fileLen</code> */
- private void createFile(String fileName, long fileLen) throws IOException {
+ private List<LocatedBlock> createFile(String fileNamePrefix,
+ long fileLen,
+ boolean isLazyPersist) throws IOException {
FileSystem fs = cluster.getFileSystem();
- Path filePath = new Path(fileName);
- DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
+ Path filePath = new Path("/" + fileNamePrefix + ".dat");
+ DFSTestUtil.createFile(
+ fs, filePath, isLazyPersist, 1024, fileLen,
+ BLOCK_LENGTH, (short) 1, r.nextLong(), false);
+ return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
}
/** Truncate a block file */
@@ -134,6 +142,43 @@
return 0;
}
+ /**
+ * Duplicate the given block on all volumes.
+ * @param blockId
+ * @throws IOException
+ */
+ private void duplicateBlock(long blockId) throws IOException {
+ synchronized (fds) {
+ ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+ for (FsVolumeSpi v : fds.getVolumes()) {
+ if (v.getStorageID().equals(b.getVolume().getStorageID())) {
+ continue;
+ }
+
+ // Volume without a copy of the block. Make a copy now.
+ File sourceBlock = b.getBlockFile();
+ File sourceMeta = b.getMetaFile();
+ String sourceRoot = b.getVolume().getBasePath();
+ String destRoot = v.getBasePath();
+
+ String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath();
+ String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath();
+
+ File destBlock = new File(destRoot, relativeBlockPath);
+ File destMeta = new File(destRoot, relativeMetaPath);
+
+ destBlock.getParentFile().mkdirs();
+ FileUtils.copyFile(sourceBlock, destBlock);
+ FileUtils.copyFile(sourceMeta, destMeta);
+
+ if (destBlock.exists() && destMeta.exists()) {
+ LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
+ LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
+ }
+ }
+ }
+ }
+
/** Get a random blockId that is not used already */
private long getFreeBlockId() {
long id = rand.nextLong();
@@ -215,7 +260,13 @@
}
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
- long missingMemoryBlocks, long mismatchBlocks) {
+ long missingMemoryBlocks, long mismatchBlocks) throws IOException {
+ scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
+ missingMemoryBlocks, mismatchBlocks, 0);
+ }
+
+ private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
+ long missingMemoryBlocks, long mismatchBlocks, long duplicateBlocks) throws IOException {
scanner.reconcile();
assertTrue(scanner.diffs.containsKey(bpid));
@@ -229,9 +280,92 @@
assertEquals(missingBlockFile, stats.missingBlockFile);
assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
assertEquals(mismatchBlocks, stats.mismatchBlocks);
+ assertEquals(duplicateBlocks, stats.duplicateBlocks);
}
- @Test
+ @Test (timeout=300000)
+ public void testRetainBlockOnPersistentStorage() throws Exception {
+ cluster = new MiniDFSCluster
+ .Builder(CONF)
+ .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+ .numDataNodes(1)
+ .build();
+ try {
+ cluster.waitActive();
+ bpid = cluster.getNamesystem().getBlockPoolId();
+ fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+ client = cluster.getFileSystem().getClient();
+ scanner = new DirectoryScanner(fds, CONF);
+ scanner.setRetainDiffs(true);
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+ // Add a file with 1 block
+ List<LocatedBlock> blocks =
+ createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
+
+ // Ensure no difference between volumeMap and disk.
+ scan(1, 0, 0, 0, 0, 0);
+
+ // Make a copy of the block on RAM_DISK and ensure that it is
+ // picked up by the scanner.
+ duplicateBlock(blocks.get(0).getBlock().getBlockId());
+ scan(2, 1, 0, 0, 0, 0, 1);
+ verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+ scan(1, 0, 0, 0, 0, 0);
+
+ } finally {
+ if (scanner != null) {
+ scanner.shutdown();
+ scanner = null;
+ }
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test (timeout=300000)
+ public void testDeleteBlockOnTransientStorage() throws Exception {
+ cluster = new MiniDFSCluster
+ .Builder(CONF)
+ .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+ .numDataNodes(1)
+ .build();
+ try {
+ cluster.waitActive();
+ bpid = cluster.getNamesystem().getBlockPoolId();
+ fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+ client = cluster.getFileSystem().getClient();
+ scanner = new DirectoryScanner(fds, CONF);
+ scanner.setRetainDiffs(true);
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+ // Create a file file on RAM_DISK
+ List<LocatedBlock> blocks =
+ createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
+
+ // Ensure no difference between volumeMap and disk.
+ scan(1, 0, 0, 0, 0, 0);
+
+ // Make a copy of the block on DEFAULT storage and ensure that it is
+ // picked up by the scanner.
+ duplicateBlock(blocks.get(0).getBlock().getBlockId());
+ scan(2, 1, 0, 0, 0, 0, 1);
+
+ // Ensure that the copy on RAM_DISK was deleted.
+ verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+ scan(1, 0, 0, 0, 0, 0);
+
+ } finally {
+ if (scanner != null) {
+ scanner.shutdown();
+ scanner = null;
+ }
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test (timeout=600000)
public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning
for (int parallelism = 1; parallelism < 3; parallelism++) {
@@ -245,16 +379,17 @@
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+ client = cluster.getFileSystem().getClient();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
// Add files with 100 blocks
- createFile("/tmp/t1", 10000);
+ createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 100, false);
long totalBlocks = 100;
- // Test1: No difference between in-memory and disk
+ // Test1: No difference between volumeMap and disk
scan(100, 0, 0, 0, 0, 0);
// Test2: block metafile is missing
@@ -355,7 +490,10 @@
assertFalse(scanner.getRunStatus());
} finally {
- scanner.shutdown();
+ if (scanner != null) {
+ scanner.shutdown();
+ scanner = null;
+ }
cluster.shutdown();
}
}
@@ -389,6 +527,13 @@
assertEquals(genStamp, memBlock.getGenerationStamp());
}
+ private void verifyStorageType(long blockId, boolean expectTransient) {
+ final ReplicaInfo memBlock;
+ memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+ assertNotNull(memBlock);
+ assertThat(memBlock.getVolume().isTransientStorage(), is(expectTransient));
+ }
+
private static class TestFsVolumeSpi implements FsVolumeSpi {
@Override
public String[] getBlockPoolList() {
@@ -426,6 +571,11 @@
}
@Override
+ public boolean isTransientStorage() {
+ return false;
+ }
+
+ @Override
public void reserveSpaceForRbw(long bytesToReserve) {
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 4b5b6e1..f440bb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -152,7 +152,7 @@
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
- checksum, CachingStrategy.newDefaultStrategy());
+ checksum, CachingStrategy.newDefaultStrategy(), false);
out.flush();
// close the connection before sending the content of the block
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index bd6c3de..099a0cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -67,7 +67,7 @@
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
- StorageType.DEFAULT, b);
+ StorageType.DEFAULT, b, false);
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index 6bd36ed..f9e30e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -23,18 +23,19 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
public class FsDatasetTestUtil {
public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
- return ((FsDatasetImpl)fsd).getFile(bpid, bid);
+ return ((FsDatasetImpl)fsd).getFile(bpid, bid, false);
}
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
) throws IOException {
- return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
+ return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId());
}
public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
@@ -62,4 +63,13 @@
String bpid) {
return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
}
+
+ /**
+ * Stop the lazy writer daemon that saves RAM disk files to persistent storage.
+ * @param dn
+ */
+ public static void stopLazyWriter(DataNode dn) {
+ FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
+ ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 10b9f7e..7c39ca5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -147,7 +147,7 @@
for (int i = 0; i < NUM_BLOCKS; i++) {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
- dataset.createRbw(StorageType.DEFAULT, eb);
+ dataset.createRbw(StorageType.DEFAULT, eb, false);
}
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
new file mode 100644
index 0000000..91deb55
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -0,0 +1,984 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestLazyPersistFiles {
+ public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
+
+ static {
+ ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
+
+ private static final int THREADPOOL_SIZE = 10;
+
+ private static final short REPL_FACTOR = 1;
+ private static final int BLOCK_SIZE = 5 * 1024 * 1024;
+ private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+ private static final long HEARTBEAT_INTERVAL_SEC = 1;
+ private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+ private static final int LAZY_WRITER_INTERVAL_SEC = 1;
+ private static final int BUFFER_LENGTH = 4096;
+ private static final int EVICTION_LOW_WATERMARK = 1;
+ private static final String JMX_SERVICE_NAME = "DataNode";
+ private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private DFSClient client;
+ private Configuration conf;
+ private JMXGet jmx;
+
+ @After
+ public void shutDownCluster() throws Exception {
+
+ // Dump all RamDisk JMX metrics before shutdown the cluster
+ printRamDiskJMXMetrics();
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ client = null;
+ }
+
+ if (cluster != null) {
+ cluster.shutdownDataNodes();
+ cluster.shutdown();
+ cluster = null;
+ }
+
+ if (jmx != null) {
+ jmx = null;
+ }
+ }
+
+ @Test (timeout=300000)
+ public void testPolicyNotSetByDefault() throws IOException {
+ startUpCluster(false, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, 0, false);
+ // Stat the file and check that the LAZY_PERSIST policy is not
+ // returned back.
+ HdfsFileStatus status = client.getFileInfo(path.toString());
+ assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
+ }
+
+ @Test (timeout=300000)
+ public void testPolicyPropagation() throws IOException {
+ startUpCluster(false, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, 0, true);
+ // Stat the file and check that the lazyPersist flag is returned back.
+ HdfsFileStatus status = client.getFileInfo(path.toString());
+ assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
+ }
+
+ @Test (timeout=300000)
+ public void testPolicyPersistenceInEditLog() throws IOException {
+ startUpCluster(false, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, 0, true);
+ cluster.restartNameNode(true);
+
+ // Stat the file and check that the lazyPersist flag is returned back.
+ HdfsFileStatus status = client.getFileInfo(path.toString());
+ assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
+ }
+
+ @Test (timeout=300000)
+ public void testPolicyPersistenceInFsImage() throws IOException {
+ startUpCluster(false, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, 0, true);
+ // checkpoint
+ fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+ fs.saveNamespace();
+ fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ cluster.restartNameNode(true);
+
+ // Stat the file and check that the lazyPersist flag is returned back.
+ HdfsFileStatus status = client.getFileInfo(path.toString());
+ assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
+ }
+
+ @Test (timeout=300000)
+ public void testPlacementOnRamDisk() throws IOException {
+ startUpCluster(true, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path, RAM_DISK);
+ }
+
+ @Test (timeout=300000)
+ public void testPlacementOnSizeLimitedRamDisk() throws IOException {
+ startUpCluster(true, 3);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+ makeTestFile(path1, BLOCK_SIZE, true);
+ makeTestFile(path2, BLOCK_SIZE, true);
+
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ ensureFileReplicasOnStorageType(path2, RAM_DISK);
+ }
+
+ /**
+ * Client tries to write LAZY_PERSIST to same DN with no RamDisk configured
+ * Write should default to disk. No error.
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testFallbackToDisk() throws IOException {
+ startUpCluster(false, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path, DEFAULT);
+ }
+
+ /**
+ * File can not fit in RamDisk even with eviction
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testFallbackToDiskFull() throws Exception {
+ startUpCluster(false, 0);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path, DEFAULT);
+
+ verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
+ }
+
+ /**
+ * File partially fit in RamDisk after eviction.
+ * RamDisk can fit 2 blocks. Write a file with 5 blocks.
+ * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testFallbackToDiskPartial()
+ throws IOException, InterruptedException {
+ startUpCluster(true, 2);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE * 5, true);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ triggerBlockReport();
+
+ int numBlocksOnRamDisk = 0;
+ int numBlocksOnDisk = 0;
+
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
+ numBlocksOnRamDisk++;
+ } else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
+ numBlocksOnDisk++;
+ }
+ }
+
+ // Since eviction is asynchronous, depending on the timing of eviction
+ // wrt writes, we may get 2 or less blocks on RAM disk.
+ assert(numBlocksOnRamDisk <= 2);
+ assert(numBlocksOnDisk >= 3);
+ }
+
+ /**
+ * If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
+ * specified, then block placement should fail.
+ *
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testRamDiskNotChosenByDefault() throws IOException {
+ startUpCluster(true, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ try {
+ makeTestFile(path, BLOCK_SIZE, false);
+ fail("Block placement to RAM_DISK should have failed without lazyPersist flag");
+ } catch (Throwable t) {
+ LOG.info("Got expected exception ", t);
+ }
+ }
+
+ /**
+ * Append to lazy persist file is denied.
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testAppendIsDenied() throws IOException {
+ startUpCluster(true, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE, true);
+
+ try {
+ client.append(path.toString(), BUFFER_LENGTH, null, null).close();
+ fail("Append to LazyPersist file did not fail as expected");
+ } catch (Throwable t) {
+ LOG.info("Got expected exception ", t);
+ }
+ }
+
+ /**
+ * If one or more replicas of a lazyPersist file are lost, then the file
+ * must be discarded by the NN, instead of being kept around as a
+ * 'corrupt' file.
+ */
+ @Test (timeout=300000)
+ public void testLazyPersistFilesAreDiscarded()
+ throws IOException, InterruptedException {
+ startUpCluster(true, 2);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+ makeTestFile(path1, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Stop the DataNode and sleep for the time it takes the NN to
+ // detect the DN as being dead.
+ cluster.shutdownDataNodes();
+ Thread.sleep(30000L);
+ assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
+
+ // Next, wait for the replication monitor to mark the file as corrupt
+ Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
+
+ // Wait for the LazyPersistFileScrubber to run
+ Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
+
+ // Ensure that path1 does not exist anymore, whereas path2 does.
+ assert(!fs.exists(path1));
+
+ // We should have zero blocks that needs replication i.e. the one
+ // belonging to path2.
+ assertThat(cluster.getNameNode()
+ .getNamesystem()
+ .getBlockManager()
+ .getUnderReplicatedBlocksCount(),
+ is(0L));
+ }
+
+ @Test (timeout=300000)
+ public void testLazyPersistBlocksAreSaved()
+ throws IOException, InterruptedException {
+ startUpCluster(true, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ // Create a test file
+ makeTestFile(path, BLOCK_SIZE * 10, true);
+ LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ LOG.info("Verifying copy was saved to lazyPersist/");
+
+ // Make sure that there is a saved copy of the replica on persistent
+ // storage.
+ final String bpid = cluster.getNamesystem().getBlockPoolId();
+ List<? extends FsVolumeSpi> volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+ final Set<Long> persistedBlockIds = new HashSet<Long>();
+
+ // Make sure at least one non-transient volume has a saved copy of
+ // the replica.
+ for (FsVolumeSpi v : volumes) {
+ if (v.isTransientStorage()) {
+ continue;
+ }
+
+ FsVolumeImpl volume = (FsVolumeImpl) v;
+ File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
+
+ for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+ File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId());
+ File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+ if (blockFile.exists()) {
+ // Found a persisted copy for this block!
+ boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
+ assertThat(added, is(true));
+ } else {
+ LOG.error(blockFile + " not found");
+ }
+ }
+ }
+
+ // We should have found a persisted copy for each located block.
+ assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
+ }
+
+ /**
+ * RamDisk eviction after lazy persist to disk.
+ * @throws Exception
+ */
+ @Test (timeout=300000)
+ public void testRamDiskEviction() throws Exception {
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+ final int SEED = 0xFADED;
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Create another file with a replica on RAM_DISK.
+ makeTestFile(path2, BLOCK_SIZE, true);
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ triggerBlockReport();
+
+ // Ensure the first file was evicted to disk, the second is still on
+ // RAM_DISK.
+ ensureFileReplicasOnStorageType(path2, RAM_DISK);
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
+ }
+
+ /**
+ * RamDisk eviction should not happen on blocks that are not yet
+ * persisted on disk.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testRamDiskEvictionBeforePersist()
+ throws IOException, InterruptedException {
+ startUpCluster(true, 1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+ final int SEED = 0XFADED;
+
+ // Stop lazy writer to ensure block for path1 is not persisted to disk.
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Create second file with a replica on RAM_DISK.
+ makeTestFile(path2, BLOCK_SIZE, true);
+
+ // Eviction should not happen for block of the first file that is not
+ // persisted yet.
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ ensureFileReplicasOnStorageType(path2, DEFAULT);
+
+ assert(fs.exists(path1));
+ assert(fs.exists(path2));
+ verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
+ }
+
+ /**
+ * Validates lazy persisted blocks are evicted from RAM_DISK based on LRU.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testRamDiskEvictionIsLru()
+ throws Exception {
+ final int NUM_PATHS = 5;
+ startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path paths[] = new Path[NUM_PATHS * 2];
+
+ for (int i = 0; i < paths.length; i++) {
+ paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
+ }
+
+ for (int i = 0; i < NUM_PATHS; i++) {
+ makeTestFile(paths[i], BLOCK_SIZE, true);
+ }
+
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ for (int i = 0; i < NUM_PATHS; ++i) {
+ ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+ }
+
+ // Open the files for read in a random order.
+ ArrayList<Integer> indexes = new ArrayList<Integer>(NUM_PATHS);
+ for (int i = 0; i < NUM_PATHS; ++i) {
+ indexes.add(i);
+ }
+ Collections.shuffle(indexes);
+
+ for (int i = 0; i < NUM_PATHS; ++i) {
+ LOG.info("Touching file " + paths[indexes.get(i)]);
+ DFSTestUtil.readFile(fs, paths[indexes.get(i)]);
+ }
+
+ // Create an equal number of new files ensuring that the previous
+ // files are evicted in the same order they were read.
+ for (int i = 0; i < NUM_PATHS; ++i) {
+ makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
+ triggerBlockReport();
+ Thread.sleep(3000);
+ ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
+ ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
+ for (int j = i + 1; j < NUM_PATHS; ++j) {
+ ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
+ }
+ }
+
+ verifyRamDiskJMXMetric("RamDiskBlocksWrite", NUM_PATHS * 2);
+ verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 0);
+ verifyRamDiskJMXMetric("RamDiskBytesWrite", BLOCK_SIZE * NUM_PATHS * 2);
+ verifyRamDiskJMXMetric("RamDiskBlocksReadHits", NUM_PATHS);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", NUM_PATHS);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 0);
+ verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 0);
+ }
+
+ /**
+ * Delete lazy-persist file that has not been persisted to disk.
+ * Memory is freed up and file is gone.
+ * @throws IOException
+ */
+ @Test // (timeout=300000)
+ public void testDeleteBeforePersist()
+ throws Exception {
+ startUpCluster(true, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+ makeTestFile(path, BLOCK_SIZE, true);
+ LocatedBlocks locatedBlocks =
+ ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+ // Delete before persist
+ client.delete(path.toString(), false);
+ Assert.assertFalse(fs.exists(path));
+
+ assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+ verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1);
+ }
+
+ /**
+ * Delete lazy-persist file that has been persisted to disk
+ * Both memory blocks and disk blocks are deleted.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testDeleteAfterPersist()
+ throws Exception {
+ startUpCluster(true, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ // Delete after persist
+ client.delete(path.toString(), false);
+ Assert.assertFalse(fs.exists(path));
+
+ assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+ verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
+ verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
+ }
+
+ /**
+ * RAM_DISK used/free space
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testDfsUsageCreateDelete()
+ throws IOException, InterruptedException {
+ startUpCluster(true, 4);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ // Get the usage before write BLOCK_SIZE
+ long usedBeforeCreate = fs.getUsed();
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ long usedAfterCreate = fs.getUsed();
+
+ assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ long usedAfterPersist = fs.getUsed();
+ assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
+
+ // Delete after persist
+ client.delete(path.toString(), false);
+ long usedAfterDelete = fs.getUsed();
+
+ assertThat(usedBeforeCreate, is(usedAfterDelete));
+ }
+
+ /**
+ * Concurrent read from the same node and verify the contents.
+ */
+ @Test (timeout=300000)
+ public void testConcurrentRead()
+ throws Exception {
+ startUpCluster(true, 2);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final Path path1 = new Path("/" + METHOD_NAME + ".dat");
+
+ final int SEED = 0xFADED;
+ final int NUM_TASKS = 5;
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ //Read from multiple clients
+ final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
+ final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+ Runnable readerRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+ } catch (Throwable e) {
+ LOG.error("readerRunnable error", e);
+ testFailed.set(true);
+ } finally {
+ latch.countDown();
+ }
+ }
+ };
+
+ Thread threads[] = new Thread[NUM_TASKS];
+ for (int i = 0; i < NUM_TASKS; i++) {
+ threads[i] = new Thread(readerRunnable);
+ threads[i].start();
+ }
+
+ Thread.sleep(500);
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ Uninterruptibles.joinUninterruptibly(threads[i]);
+ }
+ Assert.assertFalse(testFailed.get());
+ }
+
+ /**
+ * Concurrent write with eviction
+ * RAM_DISK can hold 9 replicas
+ * 4 threads each write 5 replicas
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testConcurrentWrites()
+ throws IOException, InterruptedException {
+ startUpCluster(true, 9);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final int SEED = 0xFADED;
+ final int NUM_WRITERS = 4;
+ final int NUM_WRITER_PATHS = 5;
+
+ Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ paths[i] = new Path[NUM_WRITER_PATHS];
+ for (int j = 0; j < NUM_WRITER_PATHS; j++) {
+ paths[i][j] =
+ new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
+ }
+ }
+
+ final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
+ final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+ ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ Runnable writer = new WriterRunnable(i, paths[i], SEED, latch, testFailed);
+ executor.execute(writer);
+ }
+
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ triggerBlockReport();
+
+ // Stop executor from adding new tasks to finish existing threads in queue
+ latch.await();
+
+ assertThat(testFailed.get(), is(false));
+ }
+
+ @Test (timeout=300000)
+ public void testDnRestartWithSavedReplicas()
+ throws IOException, InterruptedException {
+
+ startUpCluster(true, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+ makeTestFile(path1, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ // However the block replica should not be evicted from RAM_DISK yet.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ LOG.info("Restarting the DataNode");
+ cluster.restartDataNode(0, true);
+ cluster.waitActive();
+
+ // Ensure that the replica is now on persistent storage.
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+ }
+
+ @Test (timeout=300000)
+ public void testDnRestartWithUnsavedReplicas()
+ throws IOException, InterruptedException {
+
+ startUpCluster(true, 1);
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ LOG.info("Restarting the DataNode");
+ cluster.restartDataNode(0, true);
+ cluster.waitActive();
+
+ // Ensure that the replica is still on transient storage.
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ }
+
+ // ---- Utility functions for all test cases -------------------------------
+
+ /**
+ * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+ * capped. If ramDiskStorageLimit < 0 then it is ignored.
+ */
+ private void startUpCluster(boolean hasTransientStorage,
+ final int ramDiskReplicaCapacity,
+ final boolean useSCR)
+ throws IOException {
+
+ conf = new Configuration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+ LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+ conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ LAZY_WRITER_INTERVAL_SEC);
+ conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+ EVICTION_LOW_WATERMARK);
+
+ conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
+
+ long[] capacities = null;
+ if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
+ // Convert replica count to byte count, add some delta for .meta and VERSION files.
+ long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
+ capacities = new long[] { ramDiskStorageLimit, -1 };
+ }
+
+ cluster = new MiniDFSCluster
+ .Builder(conf)
+ .numDataNodes(REPL_FACTOR)
+ .storageCapacities(capacities)
+ .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
+ .build();
+ fs = cluster.getFileSystem();
+ client = fs.getClient();
+ try {
+ jmx = initJMX();
+ } catch (Exception e) {
+ fail("Failed initialize JMX for testing: " + e);
+ }
+ LOG.info("Cluster startup complete");
+ }
+
+ private void startUpCluster(boolean hasTransientStorage,
+ final int ramDiskReplicaCapacity)
+ throws IOException {
+ startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
+ }
+
+ private void makeTestFile(Path path, long length, final boolean isLazyPersist)
+ throws IOException {
+
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+ if (isLazyPersist) {
+ createFlags.add(LAZY_PERSIST);
+ }
+
+ FSDataOutputStream fos = null;
+ try {
+ fos =
+ fs.create(path,
+ FsPermission.getFileDefault(),
+ createFlags,
+ BUFFER_LENGTH,
+ REPL_FACTOR,
+ BLOCK_SIZE,
+ null);
+
+ // Allocate a block.
+ byte[] buffer = new byte[BUFFER_LENGTH];
+ for (int bytesWritten = 0; bytesWritten < length; ) {
+ fos.write(buffer, 0, buffer.length);
+ bytesWritten += buffer.length;
+ }
+ if (length > 0) {
+ fos.hsync();
+ }
+ } finally {
+ IOUtils.closeQuietly(fos);
+ }
+ }
+
+ private LocatedBlocks ensureFileReplicasOnStorageType(
+ Path path, StorageType storageType) throws IOException {
+ // Ensure that returned block locations returned are correct!
+ LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+ assertThat(fs.exists(path), is(true));
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+ }
+ return locatedBlocks;
+ }
+
+ private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
+ long seed) throws IOException {
+ DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+ BLOCK_SIZE, REPL_FACTOR, seed, true);
+ }
+
+ private boolean verifyReadRandomFile(
+ Path path, int fileLength, int seed) throws IOException {
+ byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+ byte expected[] = DFSTestUtil.
+ calculateFileContentsFromSeed(seed, fileLength);
+ return Arrays.equals(contents, expected);
+ }
+
+ private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
+ throws IOException, InterruptedException {
+
+ LOG.info("Verifying replica has no saved copy after deletion.");
+ triggerBlockReport();
+
+ while(
+ DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
+ > 0L){
+ Thread.sleep(1000);
+ }
+
+ final String bpid = cluster.getNamesystem().getBlockPoolId();
+ List<? extends FsVolumeSpi> volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+ // Make sure deleted replica does not have a copy on either finalized dir of
+ // transient volume or finalized dir of non-transient volume
+ for (FsVolumeSpi v : volumes) {
+ FsVolumeImpl volume = (FsVolumeImpl) v;
+ File targetDir = (v.isTransientStorage()) ?
+ volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+ volume.getBlockPoolSlice(bpid).getLazypersistDir();
+ if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
+
+ for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+ File targetDir =
+ DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
+
+ File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+ if (blockFile.exists()) {
+ LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
+ " exists after deletion.");
+ return false;
+ }
+ File metaFile = new File(targetDir,
+ DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
+ lb.getBlock().getGenerationStamp()));
+ if (metaFile.exists()) {
+ LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
+ " exists after deletion.");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void triggerBlockReport()
+ throws IOException, InterruptedException {
+ // Trigger block report to NN
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+ Thread.sleep(10 * 1000);
+ }
+
+ class WriterRunnable implements Runnable {
+ private final int id;
+ private final Path paths[];
+ private final int seed;
+ private CountDownLatch latch;
+ private AtomicBoolean bFail;
+
+ public WriterRunnable(int threadIndex, Path[] paths,
+ int seed, CountDownLatch latch,
+ AtomicBoolean bFail) {
+ id = threadIndex;
+ this.paths = paths;
+ this.seed = seed;
+ this.latch = latch;
+ this.bFail = bFail;
+ System.out.println("Creating Writer: " + id);
+ }
+
+ public void run() {
+ System.out.println("Writer " + id + " starting... ");
+ int i = 0;
+ try {
+ for (i = 0; i < paths.length; i++) {
+ makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
+ // eviction may faiL when all blocks are not persisted yet.
+ // ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+ }
+ } catch (IOException e) {
+ bFail.set(true);
+ LOG.error("Writer exception: writer id:" + id +
+ " testfile: " + paths[i].toString() +
+ " " + e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }
+
+ JMXGet initJMX() throws Exception
+ {
+ JMXGet jmx = new JMXGet();
+ jmx.setService(JMX_SERVICE_NAME);
+ jmx.init();
+ return jmx;
+ }
+
+ void printRamDiskJMXMetrics() {
+ try {
+ jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ void verifyRamDiskJMXMetric(String metricName, long expectedValue)
+ throws Exception {
+ assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
new file mode 100644
index 0000000..b6ac287
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.commons.logging.impl.Log4JLogger;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.CreateFlag;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hdfs.*;
+ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+ import org.apache.hadoop.hdfs.server.datanode.DataNode;
+ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+ import org.apache.hadoop.hdfs.server.namenode.NameNode;
+ import org.apache.hadoop.net.unix.DomainSocket;
+ import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.test.GenericTestUtils;
+ import org.apache.hadoop.util.NativeCodeLoader;
+ import org.apache.log4j.Level;
+ import org.junit.*;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.EnumSet;
+ import java.util.List;
+ import java.util.UUID;
+
+ import static org.apache.hadoop.fs.CreateFlag.CREATE;
+ import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+ import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+ import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+ import static org.hamcrest.CoreMatchers.equalTo;
+ import static org.hamcrest.core.Is.is;
+ import static org.junit.Assert.assertThat;
+
+public class TestScrLazyPersistFiles {
+ public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
+
+ static {
+ ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static short REPL_FACTOR = 1;
+ private static final int BLOCK_SIZE = 10485760; // 10 MB
+ private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+ private static final long HEARTBEAT_INTERVAL_SEC = 1;
+ private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+ private static final int LAZY_WRITER_INTERVAL_SEC = 1;
+ private static final int BUFFER_LENGTH = 4096;
+ private static TemporarySocketDirectory sockDir;
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private DFSClient client;
+ private Configuration conf;
+
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
+
+ @Before
+ public void before() {
+ Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
+ equalTo(true));
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ }
+
+ @After
+ public void shutDownCluster() throws IOException {
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ client = null;
+ }
+
+ if (cluster != null) {
+ cluster.shutdownDataNodes();
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ /**
+ * Read in-memory block with Short Circuit Read
+ * Note: the test uses faked RAM_DISK from physical disk.
+ */
+ @Test (timeout=300000)
+ public void testRamDiskShortCircuitRead()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR,
+ new StorageType[]{RAM_DISK, DEFAULT},
+ 2 * BLOCK_SIZE - 1, true); // 1 replica + delta, SCR read
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final int SEED = 0xFADED;
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ //assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true));
+ FSDataInputStream fis = fs.open(path);
+
+ // Verify SCR read counters
+ try {
+ fis = fs.open(path);
+ byte[] buf = new byte[BUFFER_LENGTH];
+ fis.read(0, buf, 0, BUFFER_LENGTH);
+ HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
+ Assert.assertEquals(BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ } finally {
+ fis.close();
+ fis = null;
+ }
+ }
+
+ /**
+ * Eviction of lazy persisted blocks with Short Circuit Read handle open
+ * Note: the test uses faked RAM_DISK from physical disk.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000000)
+ public void testRamDiskEvictionWithShortCircuitReadHandle()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+ (6 * BLOCK_SIZE -1), true); // 5 replica + delta, SCR.
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+ final int SEED = 0xFADED;
+
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ // However the block replica should not be evicted from RAM_DISK yet.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ // No eviction should happen as the free ratio is below the threshold
+ FSDataInputStream fis = fs.open(path1);
+ try {
+ // Keep and open read handle to path1 while creating path2
+ byte[] buf = new byte[BUFFER_LENGTH];
+ fis.read(0, buf, 0, BUFFER_LENGTH);
+
+ // Create the 2nd file that will trigger RAM_DISK eviction.
+ makeTestFile(path2, BLOCK_SIZE * 2, true);
+ ensureFileReplicasOnStorageType(path2, RAM_DISK);
+
+ // Ensure path1 is still readable from the open SCR handle.
+ fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH);
+ HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
+ Assert.assertEquals(2 * BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(2 * BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ } finally {
+ IOUtils.closeQuietly(fis);
+ }
+
+ // After the open handle is closed, path1 should be evicted to DISK.
+ triggerBlockReport();
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+ }
+
+ // ---- Utility functions for all test cases -------------------------------
+
+ /**
+ * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+ * capped. If ramDiskStorageLimit < 0 then it is ignored.
+ */
+ private void startUpCluster(final int numDataNodes,
+ final StorageType[] storageTypes,
+ final long ramDiskStorageLimit,
+ final boolean useSCR)
+ throws IOException {
+
+ conf = new Configuration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+ LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+ conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ LAZY_WRITER_INTERVAL_SEC);
+
+ if (useSCR)
+ {
+ conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+ conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+ UUID.randomUUID().toString());
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ }
+
+ REPL_FACTOR = 1; //Reset in case a test has modified the value
+
+ cluster = new MiniDFSCluster
+ .Builder(conf)
+ .numDataNodes(numDataNodes)
+ .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+ .build();
+ fs = cluster.getFileSystem();
+ client = fs.getClient();
+
+ // Artificially cap the storage capacity of the RAM_DISK volume.
+ if (ramDiskStorageLimit >= 0) {
+ List<? extends FsVolumeSpi> volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+ for (FsVolumeSpi volume : volumes) {
+ if (volume.getStorageType() == RAM_DISK) {
+ ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
+ }
+ }
+ }
+
+ LOG.info("Cluster startup complete");
+ }
+
+ private void makeTestFile(Path path, long length, final boolean isLazyPersist)
+ throws IOException {
+
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+ if (isLazyPersist) {
+ createFlags.add(LAZY_PERSIST);
+ }
+
+ FSDataOutputStream fos = null;
+ try {
+ fos =
+ fs.create(path,
+ FsPermission.getFileDefault(),
+ createFlags,
+ BUFFER_LENGTH,
+ REPL_FACTOR,
+ BLOCK_SIZE,
+ null);
+
+ // Allocate a block.
+ byte[] buffer = new byte[BUFFER_LENGTH];
+ for (int bytesWritten = 0; bytesWritten < length; ) {
+ fos.write(buffer, 0, buffer.length);
+ bytesWritten += buffer.length;
+ }
+ if (length > 0) {
+ fos.hsync();
+ }
+ } finally {
+ IOUtils.closeQuietly(fos);
+ }
+ }
+
+ private LocatedBlocks ensureFileReplicasOnStorageType(
+ Path path, StorageType storageType) throws IOException {
+ // Ensure that returned block locations returned are correct!
+ LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+ assertThat(fs.exists(path), is(true));
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+ }
+ return locatedBlocks;
+ }
+
+ private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
+ long seed) throws IOException {
+ DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+ BLOCK_SIZE, REPL_FACTOR, seed, true);
+ }
+
+ private void triggerBlockReport()
+ throws IOException, InterruptedException {
+ // Trigger block report to NN
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+ Thread.sleep(10 * 1000);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index a870aa9..60c6d03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -358,7 +358,7 @@
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
@@ -376,7 +376,7 @@
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
@@ -386,7 +386,7 @@
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
@@ -402,7 +402,7 @@
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -418,7 +418,7 @@
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -435,7 +435,7 @@
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
- dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
}
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
index a6edd80..2dae239 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -66,6 +67,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
+
/**
* Test the data migration tool (for Archival Storage)
*/
@@ -336,10 +339,10 @@
Assert.assertTrue(fileStatus.getFullName(parent.toString())
+ " with policy " + policy + " has non-empty overlap: " + diff
+ ", the corresponding block is " + lb.getBlock().getLocalBlock(),
- diff.removeOverlap());
+ diff.removeOverlap(true));
}
}
-
+
Replication getReplication(Path file) throws IOException {
return getOrVerifyReplication(file, null);
}
@@ -407,17 +410,29 @@
}
private static StorageType[][] genStorageTypes(int numDataNodes) {
- return genStorageTypes(numDataNodes, 0, 0);
+ return genStorageTypes(numDataNodes, 0, 0, 0);
}
private static StorageType[][] genStorageTypes(int numDataNodes,
int numAllDisk, int numAllArchive) {
+ return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0);
+ }
+
+ private static StorageType[][] genStorageTypes(int numDataNodes,
+ int numAllDisk, int numAllArchive, int numRamDisk) {
+ Preconditions.checkArgument(
+ (numAllDisk + numAllArchive + numRamDisk) <= numDataNodes);
+
StorageType[][] types = new StorageType[numDataNodes][];
int i = 0;
- for (; i < numAllDisk; i++) {
+ for (; i < numRamDisk; i++)
+ {
+ types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK};
+ }
+ for (; i < numRamDisk + numAllDisk; i++) {
types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
}
- for (; i < numAllDisk + numAllArchive; i++) {
+ for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
}
for (; i < types.length; i++) {
@@ -425,15 +440,19 @@
}
return types;
}
-
+
private static long[][] genCapacities(int nDatanodes, int numAllDisk,
- int numAllArchive, long diskCapacity, long archiveCapacity) {
+ int numAllArchive, int numRamDisk, long diskCapacity,
+ long archiveCapacity, long ramDiskCapacity) {
final long[][] capacities = new long[nDatanodes][];
int i = 0;
- for (; i < numAllDisk; i++) {
+ for (; i < numRamDisk; i++) {
+ capacities[i] = new long[]{ramDiskCapacity, diskCapacity};
+ }
+ for (; i < numRamDisk + numAllDisk; i++) {
capacities[i] = new long[]{diskCapacity, diskCapacity};
}
- for (; i < numAllDisk + numAllArchive; i++) {
+ for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
capacities[i] = new long[]{archiveCapacity, archiveCapacity};
}
for(; i < capacities.length; i++) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
index 94b139b..3f96c0c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
@@ -82,7 +82,7 @@
}
final INodeFile inode = new INodeFile(inodeId.nextValue(), null,
- p, 0L, 0L, blocks, replication, blockSize, (byte)0);
+ p, 0L, 0L, blocks, replication, blockSize);
inode.toUnderConstruction("", "");
// Append path to filename with information about blockIDs
@@ -97,7 +97,7 @@
editLog.logMkDir(currentDir, dirInode);
}
INodeFile fileUc = new INodeFile(inodeId.nextValue(), null,
- p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize, (byte)0);
+ p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
fileUc.toUnderConstruction("", "");
editLog.logOpenFile(filePath, fileUc, false, false);
editLog.logCloseFile(filePath, inode);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
index cf37a54..6098ebf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 8070a5f..7b62242 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -194,7 +194,7 @@
for (int i = 0; i < numTransactions; i++) {
INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
- p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize, (byte)0);
+ p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
inode.toUnderConstruction("", "");
editLog.logOpenFile("/filename" + (startIndex + i), inode, false, false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
index 9bee4a9b..91931aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
@@ -432,7 +432,7 @@
FsPermission.createImmutable(perm));
INodeFile inodeFile = new INodeFile(INodeId.GRANDFATHER_INODE_ID,
name.getBytes("UTF-8"), permStatus, 0L, 0L, null, REPLICATION,
- PREFERRED_BLOCK_SIZE, (byte)0);
+ PREFERRED_BLOCK_SIZE);
parent.addChild(inodeFile);
return inodeFile;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index 8d298ae..e16df16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -1018,8 +1018,8 @@
byte storagePolicy = 0;
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
- blockSize, modTime, accessTime, perms, owner, group, symlink, path,
- fileId, numChildren, null, storagePolicy);
+ blockSize, modTime, accessTime, perms, owner, group, symlink,
+ path, fileId, numChildren, null, storagePolicy);
Result res = new Result(conf);
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index 26d9a96..4221ad5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -82,7 +82,7 @@
INodeFile createINodeFile(short replication, long preferredBlockSize) {
return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
- null, replication, preferredBlockSize, (byte)0);
+ null, replication, preferredBlockSize);
}
private static INodeFile createINodeFile(byte storagePolicyID) {
@@ -283,7 +283,7 @@
INodeFile[] iNodes = new INodeFile[nCount];
for (int i = 0; i < nCount; i++) {
iNodes[i] = new INodeFile(i, null, perm, 0L, 0L, null, replication,
- preferredBlockSize, (byte)0);
+ preferredBlockSize);
iNodes[i].setLocalName(DFSUtil.string2Bytes(fileNamePrefix + i));
BlockInfo newblock = new BlockInfo(replication);
iNodes[i].addBlock(newblock);
@@ -341,7 +341,7 @@
{//cast from INodeFileUnderConstruction
final INode from = new INodeFile(
INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, replication,
- 1024L, (byte)0);
+ 1024L);
from.asFile().toUnderConstruction("client", "machine");
//cast to INodeFile, should success
@@ -1104,7 +1104,7 @@
public void testFileUnderConstruction() {
replication = 3;
final INodeFile file = new INodeFile(INodeId.GRANDFATHER_INODE_ID, null,
- perm, 0L, 0L, null, replication, 1024L, (byte)0);
+ perm, 0L, 0L, null, replication, 1024L);
assertFalse(file.isUnderConstruction());
final String clientName = "client";