blob: 49c9bcfc0b9e7964672f7dff193041051661a8eb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.net.NodeBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID;
import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
import static org.junit.Assert.*;
/**
* Integration tests for the Provided implementation.
*/
public class ITestProvidedImplementation {
@Rule public TestName name = new TestName();
public static final Logger LOG =
LoggerFactory.getLogger(ITestProvidedImplementation.class);
private final Random r = new Random();
private final File fBASE = new File(MiniDFSCluster.getBaseDirectory());
private final Path pBASE = new Path(fBASE.toURI().toString());
private final Path providedPath = new Path(pBASE, "providedDir");
private final Path nnDirPath = new Path(pBASE, "nnDir");
private final String singleUser = "usr1";
private final String singleGroup = "grp1";
private final int numFiles = 10;
private final String filePrefix = "file";
private final String fileSuffix = ".dat";
private final int baseFileLen = 1024;
private long providedDataSize = 0;
private final String bpid = "BP-1234-10.1.1.1-1224";
private Configuration conf;
private MiniDFSCluster cluster;
@Before
public void setSeed() throws Exception {
if (fBASE.exists() && !FileUtil.fullyDelete(fBASE)) {
throw new IOException("Could not fully delete " + fBASE);
}
long seed = r.nextLong();
r.setSeed(seed);
System.out.println(name.getMethodName() + " seed: " + seed);
conf = new HdfsConfiguration();
conf.set(SingleUGIResolver.USER, singleUser);
conf.set(SingleUGIResolver.GROUP, singleGroup);
conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TextFileRegionAliasMap.class, BlockAliasMap.class);
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR,
nnDirPath.toString());
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE,
new Path(nnDirPath, fileNameFromBlockPoolID(bpid)).toString());
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, ",");
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR_PROVIDED,
new File(providedPath.toUri()).toString());
File imageDir = new File(providedPath.toUri());
if (!imageDir.exists()) {
LOG.info("Creating directory: " + imageDir);
imageDir.mkdirs();
}
File nnDir = new File(nnDirPath.toUri());
if (!nnDir.exists()) {
nnDir.mkdirs();
}
// create 10 random files under pBASE
for (int i=0; i < numFiles; i++) {
File newFile = new File(
new Path(providedPath, filePrefix + i + fileSuffix).toUri());
if(!newFile.exists()) {
try {
LOG.info("Creating " + newFile.toString());
newFile.createNewFile();
Writer writer = new OutputStreamWriter(
new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
for(int j=0; j < baseFileLen*i; j++) {
writer.write("0");
}
writer.flush();
writer.close();
providedDataSize += newFile.length();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@After
public void shutdown() throws Exception {
try {
if (cluster != null) {
cluster.shutdown(true, true);
}
} finally {
cluster = null;
}
}
void createImage(TreeWalk t, Path out,
Class<? extends BlockResolver> blockIdsClass) throws Exception {
createImage(t, out, blockIdsClass, "", TextFileRegionAliasMap.class);
}
void createImage(TreeWalk t, Path out,
Class<? extends BlockResolver> blockIdsClass, String clusterID,
Class<? extends BlockAliasMap> aliasMapClass) throws Exception {
ImageWriter.Options opts = ImageWriter.defaults();
opts.setConf(conf);
opts.output(out.toString())
.blocks(aliasMapClass)
.blockIds(blockIdsClass)
.clusterID(clusterID)
.blockPoolID(bpid);
try (ImageWriter w = new ImageWriter(opts)) {
for (TreePath e : t) {
w.accept(e);
}
}
}
void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode,
boolean doFormat) throws IOException {
startCluster(nspath, numDatanodes, storageTypes, storageTypesPerDatanode,
doFormat, null);
}
void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode,
boolean doFormat, String[] racks) throws IOException {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
if (storageTypesPerDatanode != null) {
cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.storageTypes(storageTypesPerDatanode)
.racks(racks)
.build();
} else if (storageTypes != null) {
cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.storagesPerDatanode(storageTypes.length)
.storageTypes(storageTypes)
.racks(racks)
.build();
} else {
cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.racks(racks)
.build();
}
cluster.waitActive();
}
@Test(timeout=20000)
public void testLoadImage() throws Exception {
final long seed = r.nextLong();
LOG.info("providedPath: " + providedPath);
createImage(new RandomTreeWalk(seed), nnDirPath, FixedBlockResolver.class);
startCluster(nnDirPath, 0,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
false);
FileSystem fs = cluster.getFileSystem();
for (TreePath e : new RandomTreeWalk(seed)) {
FileStatus rs = e.getFileStatus();
Path hp = new Path(rs.getPath().toUri().getPath());
assertTrue(fs.exists(hp));
FileStatus hs = fs.getFileStatus(hp);
assertEquals(rs.getPath().toUri().getPath(),
hs.getPath().toUri().getPath());
assertEquals(rs.getPermission(), hs.getPermission());
assertEquals(rs.getLen(), hs.getLen());
assertEquals(singleUser, hs.getOwner());
assertEquals(singleGroup, hs.getGroup());
assertEquals(rs.getAccessTime(), hs.getAccessTime());
assertEquals(rs.getModificationTime(), hs.getModificationTime());
}
}
@Test(timeout=30000)
public void testProvidedReporting() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS,
SingleUGIResolver.class, UGIResolver.class);
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
int numDatanodes = 10;
startCluster(nnDirPath, numDatanodes,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
false);
long diskCapacity = 1000;
// set the DISK capacity for testing
for (DataNode dn: cluster.getDataNodes()) {
for (FsVolumeSpi ref : dn.getFSDataset().getFsVolumeReferences()) {
if (ref.getStorageType() == StorageType.DISK) {
((FsVolumeImpl) ref).setCapacityForTesting(diskCapacity);
}
}
}
// trigger heartbeats to update the capacities
cluster.triggerHeartbeats();
Thread.sleep(10000);
// verify namenode stats
FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
DatanodeStatistics dnStats = namesystem.getBlockManager()
.getDatanodeManager().getDatanodeStatistics();
// total capacity reported includes only the local volumes and
// not the provided capacity
assertEquals(diskCapacity * numDatanodes, namesystem.getTotal());
// total storage used should be equal to the totalProvidedStorage
// no capacity should be remaining!
assertEquals(providedDataSize, dnStats.getProvidedCapacity());
assertEquals(providedDataSize, namesystem.getProvidedCapacityTotal());
assertEquals(providedDataSize, dnStats.getStorageTypeStats()
.get(StorageType.PROVIDED).getCapacityTotal());
assertEquals(providedDataSize, dnStats.getStorageTypeStats()
.get(StorageType.PROVIDED).getCapacityUsed());
// verify datanode stats
for (DataNode dn: cluster.getDataNodes()) {
for (StorageReport report : dn.getFSDataset()
.getStorageReports(namesystem.getBlockPoolId())) {
if (report.getStorage().getStorageType() == StorageType.PROVIDED) {
assertEquals(providedDataSize, report.getCapacity());
assertEquals(providedDataSize, report.getDfsUsed());
assertEquals(providedDataSize, report.getBlockPoolUsed());
assertEquals(0, report.getNonDfsUsed());
assertEquals(0, report.getRemaining());
}
}
}
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getConfiguration(0));
BlockManager bm = namesystem.getBlockManager();
for (int fileId = 0; fileId < numFiles; fileId++) {
String filename = "/" + filePrefix + fileId + fileSuffix;
LocatedBlocks locatedBlocks = client.getLocatedBlocks(
filename, 0, baseFileLen);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
BlockInfo blockInfo =
bm.getStoredBlock(locatedBlock.getBlock().getLocalBlock());
Iterator<DatanodeStorageInfo> storagesItr = blockInfo.getStorageInfos();
DatanodeStorageInfo info = storagesItr.next();
assertEquals(StorageType.PROVIDED, info.getStorageType());
DatanodeDescriptor dnDesc = info.getDatanodeDescriptor();
// check the locations that are returned by FSCK have the right name
assertEquals(ProvidedStorageMap.ProvidedDescriptor.NETWORK_LOCATION
+ PATH_SEPARATOR_STR + ProvidedStorageMap.ProvidedDescriptor.NAME,
NodeBase.getPath(dnDesc));
// no DatanodeStorageInfos should remain
assertFalse(storagesItr.hasNext());
}
}
}
@Test(timeout=500000)
public void testDefaultReplication() throws Exception {
int targetReplication = 2;
conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication);
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockMultiReplicaResolver.class);
// make the last Datanode with only DISK
startCluster(nnDirPath, 3, null,
new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}},
false);
// wait for the replication to finish
Thread.sleep(50000);
FileSystem fs = cluster.getFileSystem();
int count = 0;
for (TreePath e : new FSTreeWalk(providedPath, conf)) {
FileStatus rs = e.getFileStatus();
Path hp = removePrefix(providedPath, rs.getPath());
LOG.info("path: " + hp.toUri().getPath());
e.accept(count++);
assertTrue(fs.exists(hp));
FileStatus hs = fs.getFileStatus(hp);
if (rs.isFile()) {
BlockLocation[] bl = fs.getFileBlockLocations(
hs.getPath(), 0, hs.getLen());
int i = 0;
for(; i < bl.length; i++) {
int currentRep = bl[i].getHosts().length;
assertEquals(targetReplication, currentRep);
}
}
}
}
static Path removePrefix(Path base, Path walk) {
Path wpath = new Path(walk.toUri().getPath());
Path bpath = new Path(base.toUri().getPath());
Path ret = new Path("/");
while (!(bpath.equals(wpath) || "".equals(wpath.getName()))) {
ret = "".equals(ret.getName())
? new Path("/", wpath.getName())
: new Path(new Path("/", wpath.getName()),
new Path(ret.toString().substring(1)));
wpath = wpath.getParent();
}
if (!bpath.equals(wpath)) {
throw new IllegalArgumentException(base + " not a prefix of " + walk);
}
return ret;
}
private void verifyFileSystemContents() throws Exception {
FileSystem fs = cluster.getFileSystem();
int count = 0;
// read NN metadata, verify contents match
for (TreePath e : new FSTreeWalk(providedPath, conf)) {
FileStatus rs = e.getFileStatus();
Path hp = removePrefix(providedPath, rs.getPath());
LOG.info("path: " + hp.toUri().getPath());
e.accept(count++);
assertTrue(fs.exists(hp));
FileStatus hs = fs.getFileStatus(hp);
assertEquals(hp.toUri().getPath(), hs.getPath().toUri().getPath());
assertEquals(rs.getPermission(), hs.getPermission());
assertEquals(rs.getOwner(), hs.getOwner());
assertEquals(rs.getGroup(), hs.getGroup());
if (rs.isFile()) {
assertEquals(rs.getLen(), hs.getLen());
try (ReadableByteChannel i = Channels.newChannel(
new FileInputStream(new File(rs.getPath().toUri())))) {
try (ReadableByteChannel j = Channels.newChannel(
fs.open(hs.getPath()))) {
ByteBuffer ib = ByteBuffer.allocate(4096);
ByteBuffer jb = ByteBuffer.allocate(4096);
while (true) {
int il = i.read(ib);
int jl = j.read(jb);
if (il < 0 || jl < 0) {
assertEquals(il, jl);
break;
}
ib.flip();
jb.flip();
int cmp = Math.min(ib.remaining(), jb.remaining());
for (int k = 0; k < cmp; ++k) {
assertEquals(ib.get(), jb.get());
}
ib.compact();
jb.compact();
}
}
}
}
}
}
private BlockLocation[] createFile(Path path, short replication,
long fileLen, long blockLen) throws IOException {
FileSystem fs = cluster.getFileSystem();
// create a file that is not provided
DFSTestUtil.createFile(fs, path, false, (int) blockLen,
fileLen, blockLen, replication, 0, true);
return fs.getFileBlockLocations(path, 0, fileLen);
}
@Test(timeout=30000)
public void testClusterWithEmptyImage() throws IOException {
// start a cluster with 2 datanodes without any provided storage
startCluster(nnDirPath, 2, null,
new StorageType[][] {
{StorageType.DISK},
{StorageType.DISK}},
true);
assertTrue(cluster.isClusterUp());
assertTrue(cluster.isDataNodeUp());
BlockLocation[] locations = createFile(new Path("/testFile1.dat"),
(short) 2, 1024*1024, 1024*1024);
assertEquals(1, locations.length);
assertEquals(2, locations[0].getHosts().length);
}
private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client,
String filename, long fileLen, long expectedBlocks, int expectedLocations)
throws IOException {
LocatedBlocks locatedBlocks = client.getLocatedBlocks(filename, 0, fileLen);
// given the start and length in the above call,
// only one LocatedBlock in LocatedBlocks
assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size());
DatanodeInfo[] locations =
locatedBlocks.getLocatedBlocks().get(0).getLocations();
assertEquals(expectedLocations, locations.length);
checkUniqueness(locations);
return locations;
}
/**
* verify that the given locations are all unique.
* @param locations
*/
private void checkUniqueness(DatanodeInfo[] locations) {
Set<String> set = new HashSet<>();
for (DatanodeInfo info: locations) {
assertFalse("All locations should be unique",
set.contains(info.getDatanodeUuid()));
set.add(info.getDatanodeUuid());
}
}
/**
* Tests setting replication of provided files.
* @throws Exception
*/
@Test(timeout=50000)
public void testSetReplicationForProvidedFiles() throws Exception {
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
// 10 Datanodes with both DISK and PROVIDED storage
startCluster(nnDirPath, 10,
new StorageType[]{
StorageType.PROVIDED, StorageType.DISK},
null,
false);
setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
}
private void setAndUnsetReplication(String filename) throws Exception {
Path file = new Path(filename);
FileSystem fs = cluster.getFileSystem();
// set the replication to 4, and test that the file has
// the required replication.
short newReplication = 4;
LOG.info("Setting replication of file {} to {}", filename, newReplication);
fs.setReplication(file, newReplication);
DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
file, newReplication, 10000);
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getConfiguration(0));
getAndCheckBlockLocations(client, filename, baseFileLen, 1, newReplication);
// set the replication back to 1
newReplication = 1;
LOG.info("Setting replication of file {} back to {}",
filename, newReplication);
fs.setReplication(file, newReplication);
// defaultReplication number of replicas should be returned
int defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
file, (short) defaultReplication, 10000);
getAndCheckBlockLocations(client, filename, baseFileLen, 1,
defaultReplication);
}
@Test(timeout=30000)
public void testProvidedDatanodeFailures() throws Exception {
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
startCluster(nnDirPath, 3, null,
new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}},
false);
DataNode providedDatanode1 = cluster.getDataNodes().get(0);
DataNode providedDatanode2 = cluster.getDataNodes().get(1);
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getConfiguration(0));
DatanodeStorageInfo providedDNInfo = getProvidedDatanodeStorageInfo();
if (numFiles >= 1) {
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
// 2 locations returned as there are 2 PROVIDED datanodes
DatanodeInfo[] dnInfos =
getAndCheckBlockLocations(client, filename, baseFileLen, 1, 2);
// the location should be one of the provided DNs available
assertTrue(
dnInfos[0].getDatanodeUuid().equals(
providedDatanode1.getDatanodeUuid())
|| dnInfos[0].getDatanodeUuid().equals(
providedDatanode2.getDatanodeUuid()));
// stop the 1st provided datanode
MiniDFSCluster.DataNodeProperties providedDNProperties1 =
cluster.stopDataNode(0);
// make NameNode detect that datanode is down
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(),
providedDatanode1.getDatanodeId().getXferAddr());
// should find the block on the 2nd provided datanode
dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
assertEquals(providedDatanode2.getDatanodeUuid(),
dnInfos[0].getDatanodeUuid());
// stop the 2nd provided datanode
MiniDFSCluster.DataNodeProperties providedDNProperties2 =
cluster.stopDataNode(0);
// make NameNode detect that datanode is down
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(),
providedDatanode2.getDatanodeId().getXferAddr());
getAndCheckBlockLocations(client, filename, baseFileLen, 1, 0);
// BR count for the provided ProvidedDatanodeStorageInfo should reset to
// 0, when all DNs with PROVIDED storage fail.
assertEquals(0, providedDNInfo.getBlockReportCount());
// restart the provided datanode
cluster.restartDataNode(providedDNProperties1, true);
cluster.waitActive();
assertEquals(1, providedDNInfo.getBlockReportCount());
// should find the block on the 1st provided datanode now
dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
// not comparing UUIDs as the datanode can now have a different one.
assertEquals(providedDatanode1.getDatanodeId().getXferAddr(),
dnInfos[0].getXferAddr());
}
}
@Test(timeout=300000)
public void testTransientDeadDatanodes() throws Exception {
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
// 3 Datanodes, 2 PROVIDED and other DISK
startCluster(nnDirPath, 3, null,
new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}},
false);
DataNode providedDatanode = cluster.getDataNodes().get(0);
DatanodeStorageInfo providedDNInfo = getProvidedDatanodeStorageInfo();
int initialBRCount = providedDNInfo.getBlockReportCount();
for (int i= 0; i < numFiles; i++) {
// expect to have 2 locations as we have 2 provided Datanodes.
verifyFileLocation(i, 2);
// NameNode thinks the datanode is down
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(),
providedDatanode.getDatanodeId().getXferAddr());
cluster.waitActive();
cluster.triggerHeartbeats();
Thread.sleep(1000);
// the report count should just continue to increase.
assertEquals(initialBRCount + i + 1,
providedDNInfo.getBlockReportCount());
verifyFileLocation(i, 2);
}
}
private DatanodeStorageInfo getProvidedDatanodeStorageInfo() {
ProvidedStorageMap providedStorageMap =
cluster.getNamesystem().getBlockManager().getProvidedStorageMap();
return providedStorageMap.getProvidedStorageInfo();
}
@Test(timeout=30000)
public void testNamenodeRestart() throws Exception {
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
// 3 Datanodes, 2 PROVIDED and other DISK
startCluster(nnDirPath, 3, null,
new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}},
false);
verifyFileLocation(numFiles - 1, 2);
cluster.restartNameNodes();
cluster.waitActive();
verifyFileLocation(numFiles - 1, 2);
}
/**
* verify that the specified file has a valid provided location.
* @param fileIndex the index of the file to verify.
* @throws Exception
*/
private void verifyFileLocation(int fileIndex, int replication)
throws Exception {
DFSClient client = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()),
cluster.getConfiguration(0));
if (fileIndex < numFiles && fileIndex >= 0) {
String filename = filePrefix + fileIndex + fileSuffix;
File file = new File(new Path(providedPath, filename).toUri());
long fileLen = file.length();
long blockSize = conf.getLong(FixedBlockResolver.BLOCKSIZE,
FixedBlockResolver.BLOCKSIZE_DEFAULT);
long numLocatedBlocks =
fileLen == 0 ? 1 : (long) Math.ceil(fileLen * 1.0 / blockSize);
getAndCheckBlockLocations(client, "/" + filename, fileLen,
numLocatedBlocks, replication);
}
}
@Test(timeout=30000)
public void testSetClusterID() throws Exception {
String clusterID = "PROVIDED-CLUSTER";
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class, clusterID, TextFileRegionAliasMap.class);
// 2 Datanodes, 1 PROVIDED and other DISK
startCluster(nnDirPath, 2, null,
new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}},
false);
NameNode nn = cluster.getNameNode();
assertEquals(clusterID, nn.getNamesystem().getClusterId());
}
@Test(timeout=30000)
public void testNumberOfProvidedLocations() throws Exception {
// set default replication to 4
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
// start with 4 PROVIDED location
startCluster(nnDirPath, 4,
new StorageType[]{
StorageType.PROVIDED, StorageType.DISK},
null,
false);
int expectedLocations = 4;
for (int i = 0; i < numFiles; i++) {
verifyFileLocation(i, expectedLocations);
}
// stop 2 datanodes, one after the other and verify number of locations.
for (int i = 1; i <= 2; i++) {
DataNode dn = cluster.getDataNodes().get(0);
cluster.stopDataNode(0);
// make NameNode detect that datanode is down
BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(),
dn.getDatanodeId().getXferAddr());
expectedLocations = 4 - i;
for (int j = 0; j < numFiles; j++) {
verifyFileLocation(j, expectedLocations);
}
}
}
@Test(timeout=30000)
public void testNumberOfProvidedLocationsManyBlocks() throws Exception {
// increase number of blocks per file to at least 10 blocks per file
conf.setLong(FixedBlockResolver.BLOCKSIZE, baseFileLen/10);
// set default replication to 4
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
// start with 4 PROVIDED location
startCluster(nnDirPath, 4,
new StorageType[]{
StorageType.PROVIDED, StorageType.DISK},
null,
false);
int expectedLocations = 4;
for (int i = 0; i < numFiles; i++) {
verifyFileLocation(i, expectedLocations);
}
}
@Test
public void testInMemoryAliasMap() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS,
FsUGIResolver.class, UGIResolver.class);
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
InMemoryLevelDBAliasMapClient.class, BlockAliasMap.class);
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:32445");
File tempDirectory =
Files.createTempDirectory("in-memory-alias-map").toFile();
File leveDBPath = new File(tempDirectory, bpid);
leveDBPath.mkdirs();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDirectory.getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
InMemoryLevelDBAliasMapServer levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, bpid);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
createImage(new FSTreeWalk(providedPath, conf),
nnDirPath,
FixedBlockResolver.class, "",
InMemoryLevelDBAliasMapClient.class);
levelDBAliasMapServer.close();
// start cluster with two datanodes,
// each with 1 PROVIDED volume and other DISK volume
startCluster(nnDirPath, 2,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false);
verifyFileSystemContents();
FileUtils.deleteDirectory(tempDirectory);
}
private DatanodeDescriptor getDatanodeDescriptor(DatanodeManager dnm,
int dnIndex) throws Exception {
return dnm.getDatanode(cluster.getDataNodes().get(dnIndex).getDatanodeId());
}
private void startDecommission(FSNamesystem namesystem, DatanodeManager dnm,
int dnIndex) throws Exception {
namesystem.writeLock();
DatanodeDescriptor dnDesc = getDatanodeDescriptor(dnm, dnIndex);
dnm.getDatanodeAdminManager().startDecommission(dnDesc);
namesystem.writeUnlock();
}
private void startMaintenance(FSNamesystem namesystem, DatanodeManager dnm,
int dnIndex) throws Exception {
namesystem.writeLock();
DatanodeDescriptor dnDesc = getDatanodeDescriptor(dnm, dnIndex);
dnm.getDatanodeAdminManager().startMaintenance(dnDesc, Long.MAX_VALUE);
namesystem.writeUnlock();
}
private void stopMaintenance(FSNamesystem namesystem, DatanodeManager dnm,
int dnIndex) throws Exception {
namesystem.writeLock();
DatanodeDescriptor dnDesc = getDatanodeDescriptor(dnm, dnIndex);
dnm.getDatanodeAdminManager().stopMaintenance(dnDesc);
namesystem.writeUnlock();
}
@Test
public void testDatanodeLifeCycle() throws Exception {
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
startCluster(nnDirPath, 3,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false);
int fileIndex = numFiles - 1;
final BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final DatanodeManager dnm = blockManager.getDatanodeManager();
// to start, all 3 DNs are live in ProvidedDatanodeDescriptor.
verifyFileLocation(fileIndex, 3);
// de-commision first DN; still get 3 replicas.
startDecommission(cluster.getNamesystem(), dnm, 0);
verifyFileLocation(fileIndex, 3);
// remains the same even after heartbeats.
cluster.triggerHeartbeats();
verifyFileLocation(fileIndex, 3);
// start maintenance for 2nd DN; still get 3 replicas.
startMaintenance(cluster.getNamesystem(), dnm, 1);
verifyFileLocation(fileIndex, 3);
DataNode dn1 = cluster.getDataNodes().get(0);
DataNode dn2 = cluster.getDataNodes().get(1);
// stop the 1st DN while being decomissioned.
MiniDFSCluster.DataNodeProperties dn1Properties = cluster.stopDataNode(0);
BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(),
dn1.getDatanodeId().getXferAddr());
// get 2 locations
verifyFileLocation(fileIndex, 2);
// stop dn2 while in maintenance.
MiniDFSCluster.DataNodeProperties dn2Properties = cluster.stopDataNode(1);
BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(),
dn2.getDatanodeId().getXferAddr());
// 2 valid locations will be found as blocks on nodes that die during
// maintenance are not marked for removal.
verifyFileLocation(fileIndex, 2);
// stop the maintenance; get only 1 replicas
stopMaintenance(cluster.getNamesystem(), dnm, 0);
verifyFileLocation(fileIndex, 1);
// restart the stopped DN.
cluster.restartDataNode(dn1Properties, true);
cluster.waitActive();
// reports all 3 replicas
verifyFileLocation(fileIndex, 2);
cluster.restartDataNode(dn2Properties, true);
cluster.waitActive();
// reports all 3 replicas
verifyFileLocation(fileIndex, 3);
}
@Test
public void testProvidedWithHierarchicalTopology() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class,
UGIResolver.class);
String packageName = "org.apache.hadoop.hdfs.server.blockmanagement";
String[] policies = new String[] {
"BlockPlacementPolicyDefault",
"BlockPlacementPolicyRackFaultTolerant",
"BlockPlacementPolicyWithNodeGroup",
"BlockPlacementPolicyWithUpgradeDomain"};
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
FixedBlockResolver.class);
String[] racks =
{"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1",
"/pod1/rack0", "/pod1/rack0", "/pod1/rack1", "/pod1/rack1" };
for (String policy: policies) {
LOG.info("Using policy: " + packageName + "." + policy);
conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy);
startCluster(nnDirPath, racks.length,
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
null, false, racks);
verifyFileSystemContents();
setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
cluster.shutdown();
}
}
}