blob: e47cbf0c879e2c6c199e2fe05c5c231c6bb09619 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test erasure coding block placement with skewed # nodes per rack.
*/
public class TestErasureCodingMultipleRacks {
public static final Logger LOG =
LoggerFactory.getLogger(TestErasureCodingMultipleRacks.class);
static {
GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(BlockPlacementPolicyRackFaultTolerant.LOG,
Level.TRACE);
GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.DEBUG);
}
@Rule
public Timeout globalTimeout = new Timeout(300000);
public ErasureCodingPolicy getPolicy() {
return StripedFileTestUtil.getDefaultECPolicy();
}
private MiniDFSCluster cluster;
private ErasureCodingPolicy ecPolicy;
private Configuration conf;
private DistributedFileSystem dfs;
@Before
public void setup() {
ecPolicy = getPolicy();
conf = new HdfsConfiguration();
// disable load consideration to test placement only.
conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false);
}
/**
* Setup cluster with desired number of DN, racks, and specified number of
* rack that only has 1 DN. Other racks will be evenly setup with the number
* of DNs.
* <p>
* This is not done as a {@link Before}, so test cases can setup differently.
*
* @param numDatanodes number of total Datanodes.
* @param numRacks number of total racks
* @param numSingleDnRacks number of racks that only has 1 DN
* @throws Exception
*/
public void setupCluster(final int numDatanodes, final int numRacks,
final int numSingleDnRacks) throws Exception {
cluster = DFSTestUtil
.setupCluster(conf, numDatanodes, numRacks, numSingleDnRacks);
dfs = cluster.getFileSystem();
dfs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName());
}
@After
public void teardown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
// Extreme case.
@Test
public void testSkewedRack1() throws Exception {
final int dataUnits = ecPolicy.getNumDataUnits();
final int parityUnits = ecPolicy.getNumParityUnits();
setupCluster(dataUnits + parityUnits, 2, 1);
final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
byte[] contents = new byte[filesize];
final Path path = new Path("/testfile");
LOG.info("Writing file " + path);
DFSTestUtil.writeFile(dfs, path, contents);
BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
blocks[0].getHosts().length);
}
// 1 rack has many nodes, other racks have single node. Extreme case.
@Test
public void testSkewedRack2() throws Exception {
final int dataUnits = ecPolicy.getNumDataUnits();
final int parityUnits = ecPolicy.getNumParityUnits();
setupCluster(dataUnits + parityUnits * 2, dataUnits, dataUnits - 1);
final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
byte[] contents = new byte[filesize];
final Path path = new Path("/testfile");
LOG.info("Writing file " + path);
DFSTestUtil.writeFile(dfs, path, contents);
BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
blocks[0].getHosts().length);
}
// 2 racks have sufficient nodes, other racks has 1. Should be able to
// tolerate 1 rack failure.
@Test
public void testSkewedRack3() throws Exception {
final int dataUnits = ecPolicy.getNumDataUnits();
final int parityUnits = ecPolicy.getNumParityUnits();
// Create enough extra DNs on the 2 racks to test even placement.
// Desired placement is parityUnits replicas on the 2 racks, and 1 replica
// on the rest of the racks (which only have 1 DN)
int numRacks = dataUnits - parityUnits + 2;
setupCluster(dataUnits + parityUnits * 4, numRacks,
dataUnits - parityUnits);
final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
byte[] contents = new byte[filesize];
for (int i = 0; i < 10; ++i) {
final Path path = new Path("/testfile" + i);
LOG.info("Writing file " + path);
DFSTestUtil.writeFile(dfs, path, contents);
ExtendedBlock extendedBlock = DFSTestUtil.getFirstBlock(dfs, path);
// Wait for replication to finish before testing
DFSTestUtil.waitForReplication(cluster, extendedBlock, numRacks,
ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(), 0);
BlockLocation[] blocks =
dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
blocks[0].getHosts().length);
assertRackFailureTolerated(blocks[0].getTopologyPaths());
}
}
// Verifies that no more than numParityUnits is placed on a rack.
private void assertRackFailureTolerated(final String[] topologies) {
final Map<String, Integer> racksCount = new HashMap<>();
for (String t : topologies) {
final Integer count = racksCount.get(getRackName(t));
if (count == null) {
racksCount.put(getRackName(t), 1);
} else {
racksCount.put(getRackName(t), count + 1);
}
}
LOG.info("Rack count map is: {}", racksCount);
for (Integer count : racksCount.values()) {
assertTrue(count <= ecPolicy.getNumParityUnits());
}
}
private String getRackName(final String topology) {
assert topology.indexOf('/', 1) > 0;
return topology.substring(0, topology.indexOf('/', 1));
}
}