blob: 062b22e7b05d351533a796d756793452d771e7f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
/**
* Test balancer with multiple NameNodes
*/
public class TestBalancerWithMultipleNameNodes {
static final Log LOG = Balancer.LOG;
{
GenericTestUtils.setLogLevel(LOG, Level.ALL);
DFSTestUtil.setNameNodeLogLevel(Level.ALL);
}
private static final long CAPACITY = 500L;
private static final String RACK0 = "/rack0";
private static final String RACK1 = "/rack1";
private static final String RACK2 = "/rack2";
private static final String FILE_NAME = "/tmp.txt";
private static final Path FILE_PATH = new Path(FILE_NAME);
private static final Random RANDOM = new Random();
static {
TestBalancer.initTestSetup();
}
/** Common objects used in various methods. */
private static class Suite {
final Configuration conf;
final MiniDFSCluster cluster;
final ClientProtocol[] clients;
final short replication;
final BalancerParameters parameters;
Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
BalancerParameters parameters, Configuration conf) throws IOException {
this.conf = conf;
this.cluster = cluster;
clients = new ClientProtocol[nNameNodes];
for(int i = 0; i < nNameNodes; i++) {
clients[i] = cluster.getNameNode(i).getRpcServer();
}
// hard coding replication factor to 1 so logical and raw HDFS size are
// equal
replication = 1;
this.parameters = parameters;
}
}
/* create a file with a length of <code>fileLen</code> */
private static void createFile(Suite s, int index, long len
) throws IOException, InterruptedException, TimeoutException {
final FileSystem fs = s.cluster.getFileSystem(index);
DFSTestUtil.createFile(fs, FILE_PATH, len, s.replication, RANDOM.nextLong());
DFSTestUtil.waitReplication(fs, FILE_PATH, s.replication);
}
/* fill up a cluster with <code>numNodes</code> datanodes
* whose used space to be <code>size</code>
*/
private static ExtendedBlock[][] generateBlocks(Suite s, long size
) throws IOException, InterruptedException, TimeoutException {
final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
for(int n = 0; n < s.clients.length; n++) {
createFile(s, n, size);
final List<LocatedBlock> locatedBlocks =
s.clients[n].getBlockLocations(FILE_NAME, 0, size).getLocatedBlocks();
final int numOfBlocks = locatedBlocks.size();
blocks[n] = new ExtendedBlock[numOfBlocks];
for(int i = 0; i < numOfBlocks; i++) {
final ExtendedBlock b = locatedBlocks.get(i).getBlock();
blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
b.getNumBytes(), b.getGenerationStamp());
}
}
return blocks;
}
/* wait for one heartbeat */
static void wait(final ClientProtocol[] clients,
long expectedUsedSpace, long expectedTotalSpace) throws IOException {
LOG.info("WAIT expectedUsedSpace=" + expectedUsedSpace
+ ", expectedTotalSpace=" + expectedTotalSpace);
for(int n = 0; n < clients.length; n++) {
int i = 0;
for(boolean done = false; !done; ) {
final long[] s = clients[n].getStats();
done = s[0] == expectedTotalSpace && s[1] == expectedUsedSpace;
if (!done) {
sleep(100L);
if (++i % 100 == 0) {
LOG.warn("WAIT i=" + i + ", s=[" + s[0] + ", " + s[1] + "]");
}
}
}
}
}
static void runBalancer(Suite s,
final long totalUsed, final long totalCapacity) throws Exception {
final double avg = totalUsed*100.0/totalCapacity;
LOG.info("BALANCER 0: totalUsed=" + totalUsed
+ ", totalCapacity=" + totalCapacity
+ ", avg=" + avg);
wait(s.clients, totalUsed, totalCapacity);
LOG.info("BALANCER 1");
// get storage reports for relevant blockpools so that we can compare
// blockpool usages after balancer has run
Map<Integer, DatanodeStorageReport[]> preBalancerPoolUsages =
getStorageReports(s);
// start rebalancing
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(s.conf);
final int r = Balancer.run(namenodes, s.parameters, s.conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
LOG.info("BALANCER 2");
wait(s.clients, totalUsed, totalCapacity);
LOG.info("BALANCER 3");
int i = 0;
for(boolean balanced = false; !balanced; i++) {
final long[] used = new long[s.cluster.getDataNodes().size()];
final long[] cap = new long[used.length];
for(int n = 0; n < s.clients.length; n++) {
final DatanodeInfo[] datanodes = s.clients[n].getDatanodeReport(
DatanodeReportType.ALL);
Assert.assertEquals(datanodes.length, used.length);
for(int d = 0; d < datanodes.length; d++) {
if (n == 0) {
used[d] = datanodes[d].getDfsUsed();
cap[d] = datanodes[d].getCapacity();
if (i % 100 == 0) {
LOG.warn("datanodes[" + d
+ "]: getDfsUsed()=" + datanodes[d].getDfsUsed()
+ ", getCapacity()=" + datanodes[d].getCapacity());
}
} else {
Assert.assertEquals(used[d], datanodes[d].getDfsUsed());
Assert.assertEquals(cap[d], datanodes[d].getCapacity());
}
}
}
balanced = true;
for(int d = 0; d < used.length; d++) {
final double p = used[d]*100.0/cap[d];
balanced = p <= avg + s.parameters.getThreshold();
if (!balanced) {
if (i % 100 == 0) {
LOG.warn("datanodes " + d + " is not yet balanced: "
+ "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg);
LOG.warn("TestBalancer.sum(used)=" + TestBalancer.sum(used)
+ ", TestBalancer.sum(cap)=" + TestBalancer.sum(cap));
}
sleep(100);
break;
}
}
}
LOG.info("BALANCER 6");
// cluster is balanced, verify that only selected blockpools were touched
Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages =
getStorageReports(s);
Assert.assertEquals(preBalancerPoolUsages.size(),
postBalancerPoolUsages.size());
for (Map.Entry<Integer, DatanodeStorageReport[]> entry
: preBalancerPoolUsages.entrySet()) {
compareTotalPoolUsage(entry.getValue(),
postBalancerPoolUsages.get(entry.getKey()));
}
}
/**
* Compare the total blockpool usage on each datanode to ensure that nothing
* was balanced.
*
* @param preReports storage reports from pre balancer run
* @param postReports storage reports from post balancer run
*/
private static void compareTotalPoolUsage(DatanodeStorageReport[] preReports,
DatanodeStorageReport[] postReports) {
Assert.assertNotNull(preReports);
Assert.assertNotNull(postReports);
Assert.assertEquals(preReports.length, postReports.length);
for (DatanodeStorageReport preReport : preReports) {
String dnUuid = preReport.getDatanodeInfo().getDatanodeUuid();
for(DatanodeStorageReport postReport : postReports) {
if(postReport.getDatanodeInfo().getDatanodeUuid().equals(dnUuid)) {
Assert.assertEquals(getTotalPoolUsage(preReport),
getTotalPoolUsage(postReport));
LOG.info("Comparision of datanode pool usage pre/post balancer run. "
+ "PrePoolUsage: " + getTotalPoolUsage(preReport)
+ ", PostPoolUsage: " + getTotalPoolUsage(postReport));
break;
}
}
}
}
private static long getTotalPoolUsage(DatanodeStorageReport report) {
long usage = 0L;
for (StorageReport sr : report.getStorageReports()) {
usage += sr.getBlockPoolUsed();
}
return usage;
}
/**
* Get the storage reports for all blockpools that were not specified by the
* balancer blockpool parameters. If none were specified then the parameter
* was not set and do not return any reports.
*
* @param s suite for the test
* @return a map of storage reports where the key is the blockpool index
* @throws IOException
*/
private static Map<Integer,
DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException {
Map<Integer, DatanodeStorageReport[]> reports =
new HashMap<Integer, DatanodeStorageReport[]>();
if (s.parameters.getBlockPools().size() == 0) {
// the blockpools parameter was not set, so we don't need to track any
// blockpools.
return Collections.emptyMap();
}
for (int i = 0; i < s.clients.length; i++) {
if (s.parameters.getBlockPools().contains(
s.cluster.getNamesystem(i)
.getBlockPoolId())) {
// we want to ensure that blockpools not specified by the balancer
// parameters were left alone. Therefore, if the pool was specified,
// skip it. Note: this code assumes the clients in the suite are ordered
// the same way that they are indexed via cluster#getNamesystem(index).
continue;
} else {
LOG.info("Tracking usage of blockpool id: "
+ s.cluster.getNamesystem(i).getBlockPoolId());
reports.put(i,
s.clients[i].getDatanodeStorageReport(DatanodeReportType.LIVE));
}
}
LOG.info("Tracking " + reports.size()
+ " blockpool(s) for pre/post balancer usage.");
return reports;
}
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch(InterruptedException e) {
LOG.error(e);
}
}
private static Configuration createConf() {
final Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
return conf;
}
/**
* First start a cluster and fill the cluster up to a certain size. Then
* redistribute blocks according the required distribution. Finally, balance
* the cluster.
*
* @param nNameNodes Number of NameNodes
* @param nNameNodesToBalance Number of NameNodes to run the balancer on
* @param distributionPerNN The distribution for each NameNode.
* @param capacities Capacities of the datanodes
* @param racks Rack names
* @param conf Configuration
*/
private void unevenDistribution(final int nNameNodes,
final int nNameNodesToBalance, long distributionPerNN[],
long capacities[], String[] racks, Configuration conf) throws Exception {
LOG.info("UNEVEN 0");
final int nDataNodes = distributionPerNN.length;
if (capacities.length != nDataNodes || racks.length != nDataNodes) {
throw new IllegalArgumentException("Array length is not the same");
}
if (nNameNodesToBalance > nNameNodes) {
throw new IllegalArgumentException("Number of namenodes to balance is "
+ "greater than the number of namenodes.");
}
// calculate total space that need to be filled
final long usedSpacePerNN = TestBalancer.sum(distributionPerNN);
// fill the cluster
final ExtendedBlock[][] blocks;
{
LOG.info("UNEVEN 1");
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new Configuration(conf))
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.racks(racks)
.simulatedCapacities(capacities)
.build();
LOG.info("UNEVEN 2");
try {
cluster.waitActive();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
LOG.info("UNEVEN 3");
final Suite s = new Suite(cluster, nNameNodes, nDataNodes, null, conf);
blocks = generateBlocks(s, usedSpacePerNN);
LOG.info("UNEVEN 4");
} finally {
cluster.shutdown();
}
}
conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
// Adjust the capacity of each DN since it will redistribute blocks
// nNameNodes times in the following operations.
long[] newCapacities = new long[nDataNodes];
for (int i = 0; i < nDataNodes; i++) {
newCapacities[i] = capacities[i] * nNameNodes;
}
{
LOG.info("UNEVEN 10");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.racks(racks)
.simulatedCapacities(newCapacities)
.format(false)
.build();
LOG.info("UNEVEN 11");
try {
cluster.waitActive();
LOG.info("UNEVEN 12");
Set<String> blockpools = new HashSet<String>();
for (int i = 0; i < nNameNodesToBalance; i++) {
blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
}
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBlockpools(blockpools);
BalancerParameters params = b.build();
final Suite s =
new Suite(cluster, nNameNodes, nDataNodes, params, conf);
for(int n = 0; n < nNameNodes; n++) {
// redistribute blocks
final Block[][] blocksDN = TestBalancer.distributeBlocks(
blocks[n], s.replication, distributionPerNN);
for(int d = 0; d < blocksDN.length; d++)
cluster.injectBlocks(n, d, Arrays.asList(blocksDN[d]));
LOG.info("UNEVEN 13: n=" + n);
}
final long totalCapacity = TestBalancer.sum(newCapacities);
final long totalUsed = nNameNodes*usedSpacePerNN;
LOG.info("UNEVEN 14");
runBalancer(s, totalUsed, totalCapacity);
LOG.info("UNEVEN 15");
} finally {
cluster.shutdown();
}
LOG.info("UNEVEN 16");
}
}
/**
* This test start a cluster, fill the DataNodes to be 30% full;
* It then adds an empty node and start balancing.
*
* @param nNameNodes Number of NameNodes
* @param capacities Capacities of the datanodes
* @param racks Rack names
* @param newCapacity the capacity of the new DataNode
* @param newRack the rack for the new DataNode
* @param conf Configuration
*/
private void runTest(final int nNameNodes, long[] capacities, String[] racks,
long newCapacity, String newRack, Configuration conf) throws Exception {
final int nDataNodes = capacities.length;
LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
Assert.assertEquals(nDataNodes, racks.length);
LOG.info("RUN_TEST -1");
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new Configuration(conf))
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.racks(racks)
.simulatedCapacities(capacities)
.build();
LOG.info("RUN_TEST 0");
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
cluster.waitActive();
LOG.info("RUN_TEST 1");
final Suite s =
new Suite(cluster, nNameNodes, nDataNodes,
BalancerParameters.DEFAULT, conf);
long totalCapacity = TestBalancer.sum(capacities);
LOG.info("RUN_TEST 2");
// fill up the cluster to be 30% full
final long totalUsed = totalCapacity*3/10;
final long size = (totalUsed/nNameNodes)/s.replication;
for(int n = 0; n < nNameNodes; n++) {
createFile(s, n, size);
}
LOG.info("RUN_TEST 3");
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null,
new String[]{newRack}, new long[]{newCapacity});
totalCapacity += newCapacity;
LOG.info("RUN_TEST 4");
// run RUN_TEST and validate results
runBalancer(s, totalUsed, totalCapacity);
LOG.info("RUN_TEST 5");
} finally {
cluster.shutdown();
}
LOG.info("RUN_TEST 6");
}
/** Test a cluster with even distribution,
* then a new empty node is added to the cluster
*/
@Test
public void testBalancer() throws Exception {
final Configuration conf = createConf();
runTest(2, new long[]{CAPACITY}, new String[]{RACK0},
CAPACITY/2, RACK0, conf);
}
/** Test unevenly distributed cluster */
@Test
public void testUnevenDistribution() throws Exception {
final Configuration conf = createConf();
unevenDistribution(2, 2,
new long[] {30*CAPACITY/100, 5*CAPACITY/100},
new long[]{CAPACITY, CAPACITY},
new String[] {RACK0, RACK1},
conf);
}
@Test
public void testBalancing1OutOf2Blockpools() throws Exception {
final Configuration conf = createConf();
unevenDistribution(2, 1, new long[] { 30 * CAPACITY / 100,
5 * CAPACITY / 100 }, new long[] { CAPACITY, CAPACITY }, new String[] {
RACK0, RACK1 }, conf);
}
@Test
public void testBalancing2OutOf3Blockpools() throws Exception {
final Configuration conf = createConf();
unevenDistribution(3, 2, new long[] { 30 * CAPACITY / 100,
5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY,
CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf);
}
}