blob: abd26d4f5a4f85ef9bd22bbbdd5ff32d5a6ac02d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* End-to-end test case for upgrade domain
* The test configs upgrade domain for nodes via admin json
* config file and put some nodes to decommission state.
* The test then verifies replicas are placed on the nodes that
* satisfy the upgrade domain policy.
*
*/
public class TestUpgradeDomainBlockPlacementPolicy {
private static final short REPLICATION_FACTOR = (short) 3;
private static final int DEFAULT_BLOCK_SIZE = 1024;
private static final int WAIT_TIMEOUT_MS = 60000;
private static final long FILE_SIZE = DEFAULT_BLOCK_SIZE * 5;
static final String[] racks =
{ "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
static final String[] hosts =
{"host1", "host2", "host3", "host4", "host5", "host6"};
static final String[] upgradeDomains =
{"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"};
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
private MiniDFSCluster cluster = null;
private HostsFileWriter hostsFileWriter = new HostsFileWriter();
private Configuration conf = new HdfsConfiguration();
@Before
public void setup() throws IOException {
StaticMapping.resetMap();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithUpgradeDomain.class,
BlockPlacementPolicy.class);
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class);
hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
.hosts(hosts).build();
cluster.waitActive();
refreshDatanodeAdminProperties();
}
@After
public void teardown() throws IOException {
hostsFileWriter.cleanup();
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/**
* Define admin properties for these datanodes as follows.
* dn0's upgrade domain is ud5.
* dn1's upgrade domain is ud2.
* dn2's upgrade domain is ud3.
* dn3's upgrade domain is ud1.
* dn4's upgrade domain is ud2.
* dn5's upgrade domain is ud4.
* dn0 and dn5 are decommissioned.
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
* rack2. Then any block's replicas should be on either
* {dn1, dn2, d3} or {dn2, dn3, dn4}.
*/
private void refreshDatanodeAdminProperties()
throws IOException {
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
hosts.length];
for (int i = 0; i < hosts.length; i++) {
datanodes[i] = new DatanodeAdminProperties();
DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId();
/*
* Use host names that can be resolved (
* InetSocketAddress#isUnresolved == false). Otherwise,
* CombinedHostFileManager won't allow those hosts.
*/
datanodes[i].setHostName(datanodeID.getIpAddr());
datanodes[i].setPort(datanodeID.getXferPort());
datanodes[i].setUpgradeDomain(upgradeDomains[i]);
}
datanodes[0].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
datanodes[5].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
hostsFileWriter.initIncludeHosts(datanodes);
cluster.getNamesystem(0).getBlockManager().getDatanodeManager().refreshNodes(conf);
expectedDatanodeIDs.clear();
expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId());
expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId());
}
/**
* Define admin properties for these datanodes as follows.
* dn0's upgrade domain is ud5.
* dn1's upgrade domain is ud2.
* dn2's upgrade domain is ud3.
* dn3's upgrade domain is ud1.
* dn4's upgrade domain is ud2.
* dn5's upgrade domain is ud4.
* dn2 and dn3 are decommissioned.
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
* rack2. Then any block's replicas should be on either
* {dn0, dn1, d5} or {dn0, dn4, dn5}.
*/
private void refreshDatanodeAdminProperties2()
throws IOException {
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
hosts.length];
for (int i = 0; i < hosts.length; i++) {
datanodes[i] = new DatanodeAdminProperties();
DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId();
/*
* Use host names that can be resolved (
* InetSocketAddress#isUnresolved == false). Otherwise,
* CombinedHostFileManager won't allow those hosts.
*/
datanodes[i].setHostName(datanodeID.getIpAddr());
datanodes[i].setPort(datanodeID.getXferPort());
datanodes[i].setUpgradeDomain(upgradeDomains[i]);
}
datanodes[2].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
datanodes[3].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
hostsFileWriter.initIncludeHosts(datanodes);
cluster.getNamesystem(0).getBlockManager().getDatanodeManager().refreshNodes(conf);
expectedDatanodeIDs.clear();
expectedDatanodeIDs.add(cluster.getDataNodes().get(0).getDatanodeId());
expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId());
}
private void createFileAndWaitForReplication(final Path path,
final long fileLen)
throws Exception {
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileLen,
REPLICATION_FACTOR, 1000L);
DFSTestUtil.waitForReplication(cluster.getFileSystem(), path,
REPLICATION_FACTOR, WAIT_TIMEOUT_MS);
}
@Test
public void testPlacement() throws Exception {
final long fileSize = FILE_SIZE;
final String testFile = "/testfile";
final Path path = new Path(testFile);
createFileAndWaitForReplication(path, FILE_SIZE);
LocatedBlocks locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize);
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>();
for(DatanodeInfo datanodeInfo : block.getLocations()) {
if (datanodeInfo.getAdminState()
.equals(DatanodeInfo.AdminStates.NORMAL)) {
locs.add(datanodeInfo);
}
}
for (DatanodeID datanodeID : expectedDatanodeIDs) {
Assert.assertTrue(locs.contains(datanodeID));
}
}
}
@Test(timeout = 300000)
public void testPlacementAfterDecommission() throws Exception {
final long fileSize = FILE_SIZE;
final String testFile = "/testfile-afterdecomm";
final Path path = new Path(testFile);
createFileAndWaitForReplication(path, fileSize);
// Decommission some nodes and wait until decommissions have finished.
refreshDatanodeAdminProperties2();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlocks locatedBlocks;
try {
locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize);
} catch (IOException ioe) {
return false;
}
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>();
for (DatanodeInfo datanodeInfo : block.getLocations()) {
if (datanodeInfo.getAdminState().equals(
DatanodeInfo.AdminStates.NORMAL)) {
locs.add(datanodeInfo);
}
}
for (DatanodeID datanodeID : expectedDatanodeIDs) {
if (!locs.contains(datanodeID)) {
return false;
}
}
}
return true;
}
}, 1000, WAIT_TIMEOUT_MS);
// Verify block placement policy of each block.
LocatedBlocks locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize);
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
BlockPlacementStatus status =
cluster.getNamesystem().getBlockManager()
.getBlockPlacementPolicy()
.verifyBlockPlacement(block.getLocations(), REPLICATION_FACTOR);
Assert.assertTrue(status.isPlacementPolicySatisfied());
}
}
}