blob: 91c0fa1ee9329fd0e9bbbf65d3f268719a160ce9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import junit.framework.TestCase;
/**
* This class tests if a balancer schedules tasks correctly.
*/
public class TestBalancer extends TestCase {
final private static long CAPACITY = 500L;
final private static String RACK0 = "/rack0";
final private static String RACK1 = "/rack1";
final private static String RACK2 = "/rack2";
final static private String fileName = "/tmp.txt";
final static private Path filePath = new Path(fileName);
private MiniDFSCluster cluster;
ClientProtocol client;
static final int DEFAULT_BLOCK_SIZE = 10;
private Balancer balancer;
private Random r = new Random();
static {
Balancer.setBlockMoveWaitTime(1000L) ;
}
private void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong("dfs.heartbeat.interval", 1L);
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
conf.setLong("dfs.balancer.movedWinWidth", 2000L);
}
/* create a file with a length of <code>fileLen</code> */
private void createFile(long fileLen, short replicationFactor)
throws IOException {
FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, fileLen,
replicationFactor, r.nextLong());
DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
}
/* fill up a cluster with <code>numNodes</code> datanodes
* whose used space to be <code>size</code>
*/
private Block[] generateBlocks(Configuration conf, long size, short numNodes) throws IOException {
cluster = new MiniDFSCluster( conf, numNodes, true, null);
try {
cluster.waitActive();
client = DFSClient.createNamenode(conf);
short replicationFactor = (short)(numNodes-1);
long fileLen = size/replicationFactor;
createFile(fileLen, replicationFactor);
List<LocatedBlock> locatedBlocks = client.
getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
int numOfBlocks = locatedBlocks.size();
Block[] blocks = new Block[numOfBlocks];
for(int i=0; i<numOfBlocks; i++) {
Block b = locatedBlocks.get(i).getBlock();
blocks[i] = new Block(b.getBlockId(), b.getNumBytes(), b.getGenerationStamp());
}
return blocks;
} finally {
cluster.shutdown();
}
}
/* Distribute all blocks according to the given distribution */
Block[][] distributeBlocks(Block[] blocks, short replicationFactor,
final long[] distribution ) {
// make a copy
long[] usedSpace = new long[distribution.length];
System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
List<List<Block>> blockReports =
new ArrayList<List<Block>>(usedSpace.length);
Block[][] results = new Block[usedSpace.length][];
for(int i=0; i<usedSpace.length; i++) {
blockReports.add(new ArrayList<Block>());
}
for(int i=0; i<blocks.length; i++) {
for(int j=0; j<replicationFactor; j++) {
boolean notChosen = true;
while(notChosen) {
int chosenIndex = r.nextInt(usedSpace.length);
if( usedSpace[chosenIndex]>0 ) {
notChosen = false;
blockReports.get(chosenIndex).add(blocks[i]);
usedSpace[chosenIndex] -= blocks[i].getNumBytes();
}
}
}
}
for(int i=0; i<usedSpace.length; i++) {
List<Block> nodeBlockList = blockReports.get(i);
results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]);
}
return results;
}
/* we first start a cluster and fill the cluster up to a certain size.
* then redistribute blocks according the required distribution.
* Afterwards a balancer is running to balance the cluster.
*/
private void testUnevenDistribution(Configuration conf,
long distribution[], long capacities[], String[] racks) throws Exception {
int numDatanodes = distribution.length;
if (capacities.length != numDatanodes || racks.length != numDatanodes) {
throw new IllegalArgumentException("Array length is not the same");
}
// calculate total space that need to be filled
long totalUsedSpace=0L;
for(int i=0; i<distribution.length; i++) {
totalUsedSpace += distribution[i];
}
// fill the cluster
Block[] blocks = generateBlocks(conf, totalUsedSpace, (short)numDatanodes);
// redistribute blocks
Block[][] blocksDN = distributeBlocks(
blocks, (short)(numDatanodes-1), distribution);
// restart the cluster: do NOT format the cluster
conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
cluster = new MiniDFSCluster(0, conf, numDatanodes,
false, true, null, racks, capacities);
cluster.waitActive();
client = DFSClient.createNamenode(conf);
for(int i = 0; i < blocksDN.length; i++)
cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
long totalCapacity = 0L;
for(long capacity:capacities) {
totalCapacity += capacity;
}
runBalancer(conf, totalUsedSpace, totalCapacity);
cluster.shutdown();
}
/* wait for one heartbeat */
private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
throws IOException {
long[] status = client.getStats();
while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
try {
Thread.sleep(100L);
} catch(InterruptedException ignored) {
}
status = client.getStats();
}
}
/* This test start a one-node cluster, fill the node to be 30% full;
* It then adds an empty node and start balancing.
* @param newCapacity new node's capacity
* @param new
*/
private void test(Configuration conf, long[] capacities, String[] racks,
long newCapacity, String newRack) throws Exception {
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
cluster = new MiniDFSCluster(0, conf, capacities.length, true, true, null,
racks, capacities);
try {
cluster.waitActive();
client = DFSClient.createNamenode(conf);
long totalCapacity=0L;
for(long capacity:capacities) {
totalCapacity += capacity;
}
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity*3/10;
createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null,
new String[]{newRack}, new long[]{newCapacity});
totalCapacity += newCapacity;
// run balancer and validate results
runBalancer(conf, totalUsedSpace, totalCapacity);
} finally {
cluster.shutdown();
}
}
/* Start balancer and check if the cluster is balanced after the run */
private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity )
throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
balancer = new Balancer(conf);
balancer.run(new String[0]);
waitForHeartBeat(totalUsedSpace, totalCapacity);
boolean balanced;
do {
DatanodeInfo[] datanodeReport =
client.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
balanced = true;
double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
for(DatanodeInfo datanode:datanodeReport) {
if(Math.abs(avgUtilization-
((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
balanced = false;
try {
Thread.sleep(100);
} catch(InterruptedException ignored) {
}
break;
}
}
} while(!balanced);
}
private void runBalancerDefaultConstructor(Configuration conf,
long totalUsedSpace, long totalCapacity) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
balancer = new Balancer();
balancer.setConf(conf);
balancer.run(new String[0]);
waitForHeartBeat(totalUsedSpace, totalCapacity);
boolean balanced;
do {
DatanodeInfo[] datanodeReport = client
.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
balanced = true;
double avgUtilization = ((double) totalUsedSpace) / totalCapacity * 100;
for (DatanodeInfo datanode : datanodeReport) {
if (Math.abs(avgUtilization - ((double) datanode.getDfsUsed())
/ datanode.getCapacity() * 100) > 10) {
balanced = false;
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
break;
}
}
} while (!balanced);
}
/** one-node cluster test*/
private void oneNodeTest(Configuration conf) throws Exception {
// add an empty node with half of the CAPACITY & the same rack
test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
}
/** two-node cluster test */
private void twoNodeTest(Configuration conf) throws Exception {
test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
CAPACITY, RACK2);
}
/** test using a user-supplied conf */
public void integrationTest(Configuration conf) throws Exception {
initConf(conf);
oneNodeTest(conf);
}
/** Test a cluster with even distribution,
* then a new empty node is added to the cluster*/
public void testBalancer0() throws Exception {
Configuration conf = new HdfsConfiguration();
initConf(conf);
oneNodeTest(conf);
twoNodeTest(conf);
}
/** Test unevenly distributed cluster */
public void testBalancer1() throws Exception {
Configuration conf = new HdfsConfiguration();
initConf(conf);
testUnevenDistribution(conf,
new long[] {50*CAPACITY/100, 10*CAPACITY/100},
new long[]{CAPACITY, CAPACITY},
new String[] {RACK0, RACK1});
}
public void testBalancer2() throws Exception {
Configuration conf = new HdfsConfiguration();
initConf(conf);
testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY },
new String[] { RACK0, RACK1 }, CAPACITY, RACK2);
}
private void testBalancerDefaultConstructor(Configuration conf,
long[] capacities, String[] racks, long newCapacity, String newRack)
throws Exception {
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
cluster = new MiniDFSCluster(0, conf, capacities.length, true, true, null,
racks, capacities);
try {
cluster.waitActive();
client = DFSClient.createNamenode(conf);
long totalCapacity = 0L;
for (long capacity : capacities) {
totalCapacity += capacity;
}
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity * 3 / 10;
createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new long[] { newCapacity });
totalCapacity += newCapacity;
// run balancer and validate results
runBalancerDefaultConstructor(conf, totalUsedSpace, totalCapacity);
} finally {
cluster.shutdown();
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
TestBalancer balancerTest = new TestBalancer();
balancerTest.testBalancer0();
balancerTest.testBalancer1();
balancerTest.testBalancer2();
}
}