blob: 9506ec185fdf86662df2714f2ade0145159939d7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.URI;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.test.system.HDFSCluster;
import org.apache.hadoop.hdfs.test.system.NNClient;
import org.apache.hadoop.hdfs.test.system.DNClient;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.util.ajax.JSON;
public class TestBalancer {
private static final Log LOG = LogFactory.getLog(TestBalancer.class);
private static final String BALANCER_TEMP_DIR = "balancer-temp";
private Configuration hadoopConf;
private HDFSCluster dfsCluster;
public TestBalancer() throws Exception {
}
@Before
public void setUp() throws Exception {
hadoopConf = new Configuration();
dfsCluster = HDFSCluster.createCluster(hadoopConf);
dfsCluster.setUp();
}
@After
public void tearDown() throws Exception {
dfsCluster.tearDown();
}
// Trivial @Test
public void testNamenodePing() throws IOException {
LOG.info("testing filesystem ping");
NNClient namenode = dfsCluster.getNNClient();
namenode.ping();
LOG.info("done.");
}
// Trivial @Test
public void testNamenodeConnectDisconnect() throws IOException {
LOG.info("connecting to namenode");
NNClient namenode = dfsCluster.getNNClient();
namenode.connect();
LOG.info("done.");
LOG.info("disconnecting from namenode");
namenode.disconnect();
}
/**
* The basic scenario for balancer test is as follows
*
* - Bring up cluster with 1 DataNode
* - Load DataNode to >50%
* - Count files/blocks on DataNode
* - Add new, empty DataNode to cluster
* - Run Balancer
* - Count files/blocks on DataNodes
* - Blocks counts from before and after Balancer run should be consistent
*
*/
@Test
public void testBalancerBasicScenario() throws IOException {
Path balancerTempDir = null;
try {
List<DNClient> testnodes = reserveDatanodesForTest(2);
DNClient testnode1 = testnodes.get(0);
DNClient testnode2 = testnodes.get(1);
shutdownNonTestNodes(testnodes);
LOG.info("attempting to kill both test nodes");
stopDatanode(testnode1);
stopDatanode(testnode2);
LOG.info("starting up datanode ["+
testnode1.getHostName()+
"] and loading it with data");
startDatanode(testnode1);
// mkdir balancer-temp
balancerTempDir = makeTempDir();
// write 2 blocks to file system
LOG.info("generating filesystem load");
// TODO spec blocks to generate by blockCount, blockSize, # of writers
generateFileSystemLoad(2); // generate 2 blocks of test data
LOG.info("measure space used on 1st node");
long usedSpace0 = getDatanodeUsedSpace(testnode1);
LOG.info("datanode " + testnode1.getHostName()
+ " contains " + usedSpace0 + " bytes");
LOG.info("bring up a 2nd node and run balancer on DFS");
startDatanode(testnode2);
runBalancerAndVerify(testnodes);
} catch (Throwable t) {
LOG.info("method testBalancer failed", t);
} finally {
// finally block to run cleanup
LOG.info("clean off test data from DFS [rmr ~/balancer-temp]");
try {
deleteTempDir(balancerTempDir);
} catch (Exception e) {
LOG.warn("problem cleaning up temp dir", e);
}
// restart killed nodes
Iterator<DNClient> iter = dfsCluster.getDNClients().iterator();
while (iter.hasNext()) {
DNClient dn = iter.next();
startDatanode( dn );
}
}
}
private void shutdownNonTestNodes(List<DNClient> testnodes) {
Set killSet = new HashSet(getAllDatanodes());
killSet.removeAll(testnodes);
LOG.info("attempting to kill/suspend all the nodes not used for this test");
Iterator<DNClient> iter = killSet.iterator();
DNClient dn = null;
while (iter.hasNext()) {
dn = iter.next();
// kill may not work with some secure-HDFS configs,
// so using our stopDataNode() method
stopDatanode(dn);
}
}
/**
* Kill all datanodes but leave reservationCount nodes alive,
* return a list of the reserved datanodes
*/
private List<DNClient> reserveDatanodesForTest(int reservationCount) {
List<DNClient> testDNs = new LinkedList<DNClient>();
List<DNClient> dieDNs = new LinkedList<DNClient>();
LOG.info("getting collection of live data nodes");
List<DNClient> dnList = getAllDatanodes();
int dnCount = dnList.size();
// check to make sure there is enough capacity on these nodes to run test
Assert.assertTrue(
String.format(
"not enough datanodes available to run test,"
+ " need %d datanodes but have only %d available",
reservationCount, dnCount),
( dnCount >= reservationCount ));
LOG.info("selecting "+reservationCount+" nodes for test");
dieDNs = new LinkedList<DNClient>(dnList);
testDNs = new LinkedList<DNClient>();
final int LEN = dnCount - 1;
int i = getRandom(LEN);
DNClient testDN = dieDNs.get(i);
testDNs.add(testDN);
dieDNs.remove(testDN);
int j = i;
do {
i = getRandom(LEN);
} while (i != j);
testDN = dieDNs.get(i);
testDNs.add(testDN);
dieDNs.remove(testDN);
LOG.info("nodes reserved for test");
printDatanodeList(testDNs);
LOG.info("nodes not used in test");
printDatanodeList(dieDNs);
return testDNs;
}
private List<DNClient> getAllDatanodes() {
return dfsCluster.getDNClients();
}
private final static DNClient[] DATANODE_ARRAY = {};
private DNClient[] toDatanodeArray(List<DNClient> datanodeList) {
return (DNClient[]) datanodeList.toArray(DATANODE_ARRAY);
}
/**
* Return a random number between 0 and N inclusive.
*
* @param int n
* @param n max number to return
* @return random integer between 0 and N
*/
private int getRandom(int n) {
return (int) (n * Math.random());
}
/**
* Calculate if the error in expected and observed values is within tolerance
*
* @param expectedValue expected value of experiment
* @param observedValue observed value of experiment
* @param tolerance per cent tolerance for error, represented as a int
*/
private boolean withinTolerance(long expectedValue,
long observedValue,
int tolerance) {
double diff = 1.0 * Math.abs(observedValue - expectedValue);
double thrs = expectedValue * (tolerance/100);
return diff > thrs;
}
// emulate tolerance calculation in balancer code
public final static int DEFAULT_TOLERANCE = 10; // 10%
protected boolean isClusterBalanced(DNClient[] datanodes) throws IOException {
return isClusterBalanced(datanodes, DEFAULT_TOLERANCE);
}
protected boolean isClusterBalanced(DNClient[] datanodes, int tolerance)
throws IOException {
Assert.assertFalse("empty datanode array specified",
ArrayUtils.isEmpty(datanodes));
boolean result = true;
double[] utilizationByNode = new double[ datanodes.length ];
double totalUsedSpace = 0L;
double totalCapacity = 0L;
Map datanodeVolumeMap = new HashMap();
// accumulate space stored on each node
for(int i=0; i<datanodes.length; i++) {
DNClient datanode = datanodes[i];
Map volumeInfoMap = getDatanodeVolumeAttributes(datanode);
long usedSpace = (Long)volumeInfoMap.get(ATTRNAME_USED_SPACE);
long capacity = (Long)volumeInfoMap.get(ATTRNAME_CAPACITY );
utilizationByNode[i] = ( ((double)usedSpace)/capacity ) * 100;
totalUsedSpace += usedSpace;
totalCapacity += capacity;
}
// here we are reusing previously fetched volume-info, for speed
// an alternative is to get fresh values from the cluster here instead
double avgUtilization = ( totalUsedSpace/totalCapacity ) * 100;
for(int i=0; i<datanodes.length; i++) {
double varUtilization = Math.abs(avgUtilization - utilizationByNode[i]);
if(varUtilization > tolerance) {
result = false;
break;
}
}
return result;
}
/**
* Make a working directory for storing temporary files
*
* @throws IOException
*/
private Path makeTempDir() throws IOException {
Path temp = new Path(BALANCER_TEMP_DIR);
FileSystem srcFs = temp.getFileSystem(hadoopConf);
FileStatus fstatus = null;
try {
fstatus = srcFs.getFileStatus(temp);
if (fstatus.isDir()) {
LOG.warn(BALANCER_TEMP_DIR + ": File exists");
} else {
LOG.warn(BALANCER_TEMP_DIR + " exists but is not a directory");
}
deleteTempDir(temp);
} catch (FileNotFoundException fileNotFoundExc) {
} finally {
if (!srcFs.mkdirs(temp)) {
throw new IOException("failed to create " + BALANCER_TEMP_DIR);
}
}
return temp;
}
/**
* Remove the working directory used to store temporary files
*
* @param temp
* @throws IOException
*/
private void deleteTempDir(Path temp) throws IOException {
FileSystem srcFs = temp.getFileSystem(hadoopConf);
LOG.info("attempting to delete path " + temp + "; this path exists? -> " + srcFs.exists(temp));
srcFs.delete(temp, true);
}
private void printDatanodeList(List<DNClient> lis) {
for (DNClient datanode : lis) {
LOG.info("\t" + datanode.getHostName());
}
}
private final static String CMD_STOP_DN = "sudo yinst stop hadoop_datanode_admin";
private void stopDatanode(DNClient dn) {
String dnHost = dn.getHostName();
runAndWatch(dnHost, CMD_STOP_DN);
}
private final static String CMD_START_DN = "sudo yinst start hadoop_datanode_admin";
private void startDatanode(DNClient dn) {
String dnHost = dn.getHostName();
runAndWatch(dnHost, CMD_START_DN);
}
/* using "old" default block size of 64M */
private static final int DFS_BLOCK_SIZE = 67108864;
private static final short DEFAULT_REPLICATION = 3;
private void generateFileSystemLoad(long numBlocks) {
generateFileSystemLoad(numBlocks, DEFAULT_REPLICATION);
}
private void generateFileSystemLoad(long numBlocks, short replication) {
String destfile = "hdfs:///user/hadoopqa/";// + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
SecureRandom randgen = new SecureRandom();
ByteArrayOutputStream dat = null;
ByteArrayInputStream in = null;
final int CHUNK = 4096;
final Configuration testConf = new Configuration(hadoopConf);
try {
testConf.setInt("dfs.replication", replication);
for (int i = 0; i < numBlocks; i++) {
FileSystem fs = FileSystem.get(
URI.create(destfile), testConf);
OutputStream out = fs.create(
new Path(destfile),
replication,
new ProgressReporter());
dat = new ByteArrayOutputStream(DFS_BLOCK_SIZE);
for (int z = 0; z < DFS_BLOCK_SIZE; z += CHUNK) {
byte[] bytes = new byte[CHUNK];
randgen.nextBytes(bytes);
dat.write(bytes, 0, CHUNK);
}
in = new ByteArrayInputStream(dat.toByteArray());
IOUtils.copyBytes(in, out, CHUNK, true);
LOG.info("wrote block " + (i + 1) + " of " + numBlocks);
}
} catch (IOException ioExc) {
LOG.warn("f/s loadgen failed!", ioExc);
} finally {
try {
dat.close();
} catch (Exception e) {
}
try {
in.close();
} catch (Exception e) {
}
}
}
// TODO this should be taken from the environment
public final static String HADOOP_HOME = "/grid/0/gs/gridre/yroot.biga/share/hadoop-current";
public final static String CMD_SSH = "/usr/bin/ssh";
public final static String CMD_KINIT = "/usr/kerberos/bin/kinit";
public final static String CMD_HADOOP = HADOOP_HOME + "/bin/hadoop";
public final static String OPT_BALANCER = "balancer";
public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab";
public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM";
public final static int DEFAULT_THRESHOLD = 10;
private int runBalancer() throws IOException {
return runBalancer(DEFAULT_THRESHOLD);
}
private int runBalancer(int threshold) throws IOException {
return runBalancer(""+threshold);
}
/*
* TODO change the heap size balancer uses so it can run on gateways
* i.e., 14G heap is too big for gateways
*/
private int runBalancer(String threshold)
throws IOException {
String balancerCommand = String.format("\"%s -k -t %s %s; %s %s -threshold %s",
CMD_KINIT,
KERB_KEYTAB,
KERB_PRINCIPAL,
CMD_HADOOP,
OPT_BALANCER,
threshold);
String nnHost = dfsCluster.getNNClient().getHostName();
return runAndWatch(nnHost, balancerCommand);
}
private void runBalancerAndVerify(List<DNClient> testnodes)
throws IOException {
runBalancerAndVerify(testnodes, DEFAULT_THRESHOLD);
}
private void runBalancerAndVerify(List<DNClient> testnodes, int threshold)
throws IOException {
runBalancerAndVerify(testnodes, ""+DEFAULT_THRESHOLD);
}
private void runBalancerAndVerify(List<DNClient> testnodes, String threshold)
throws IOException {
int exitStatus = runBalancer(threshold);
// assert balancer exits with status SUCCESSe
Assert.assertTrue(
String.format("balancer returned non-success exit code: %d",
exitStatus),
(exitStatus == SUCCESS));
DNClient[] testnodeArr = toDatanodeArray(testnodes);
Assert.assertTrue(
"cluster is not balanced",
isClusterBalanced(testnodeArr));
}
private int runAndWatch(String remoteHost, String remoteCommand) {
int exitStatus = -1;
try {
Process proc = new ProcessBuilder(CMD_SSH, remoteHost, remoteCommand).start();
watchProcStream(proc.getInputStream(), System.out);
watchProcStream(proc.getErrorStream(), System.err);
exitStatus = proc.waitFor();
} catch(InterruptedException intExc) {
LOG.warn("got thread interrupt error", intExc);
} catch(IOException ioExc) {
LOG.warn("got i/o error", ioExc);
}
return exitStatus;
}
private void watchProcStream(InputStream in, PrintStream out) {
new Thread(new StreamWatcher(in, out)).start();
}
private static final String DATANODE_VOLUME_INFO = "VolumeInfo";
private static final String ATTRNAME_USED_SPACE = "usedSpace";
private static final String ATTRNAME_FREE_SPACE = "freeSpace";
// pseudo attribute, JMX doesn't really provide this
private static final String ATTRNAME_CAPACITY = "capacity";
// TODO maybe the static methods below belong in some utility class...
private static long getDatanodeUsedSpace(DNClient datanode)
throws IOException {
return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_USED_SPACE);
}/*
private static long getDatanodeFreeSpace(DNClient datanode)
throws IOException {
return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_FREE_SPACE);
}*/
private static Map getDatanodeVolumeAttributes(DNClient datanode)
throws IOException {
Map result = new HashMap();
long usedSpace = getVolumeAttribute(datanode, ATTRNAME_USED_SPACE);
long freeSpace = getVolumeAttribute(datanode, ATTRNAME_FREE_SPACE);
result.put(ATTRNAME_USED_SPACE, usedSpace);
result.put(ATTRNAME_CAPACITY, usedSpace+freeSpace);
return result;
}
private static long getVolumeAttribute(DNClient datanode,
String attribName)
throws IOException {
Object volInfo = datanode.getDaemonAttribute(DATANODE_VOLUME_INFO);
Assert
.assertNotNull( String
.format( "Attribute \"%s\" should be non-null",
DATANODE_VOLUME_INFO ),
volInfo );
String strVolInfo = volInfo.toString();
LOG.debug( String.format("Value of %s: %s",
DATANODE_VOLUME_INFO,
strVolInfo) );
Map volInfoMap = (Map) JSON.parse(strVolInfo);
long attrVal = 0L;
for(Object key: volInfoMap.keySet()) {
Map attrMap = (Map) volInfoMap.get(key);
long val = (Long) attrMap.get(attribName);
attrVal += val;
}
return attrVal;
}
/** simple utility to watch streams from an exec'ed process */
static class StreamWatcher implements Runnable {
private BufferedReader reader;
private PrintStream printer;
StreamWatcher(InputStream in, PrintStream out) {
reader = getReader(in);
printer = out;
}
private static BufferedReader getReader(InputStream in) {
return new BufferedReader(new InputStreamReader(in));
}
public void run() {
try {
if (reader.ready()) {
printer.println(reader.readLine());
}
} catch (IOException ioExc) {
}
}
}
/** simple utility to report progress in generating data */
static class ProgressReporter implements Progressable {
StringBuffer buf = null;
public void progress() {
if (buf == null) {
buf = new StringBuffer();
}
buf.append(".");
if (buf.length() == 10000) {
LOG.info("..........");
buf = null;
}
}
}
// A constant for SUCCESS exit code
static final int SUCCESS = 1;
/**
* Balancer_01
* Start balancer and check if the cluster is balanced after the run.
* Cluster should end up in balanced state.
*/
@Test
public void testBalancerSimple() throws IOException {
DNClient[] datanodes = toDatanodeArray( getAllDatanodes() );
int exitStatus = runBalancer();
// assert on successful exit code here
Assert.assertTrue(
String.format("balancer returned non-success exit code: %d",
exitStatus),
(exitStatus == SUCCESS));
Assert.assertTrue( "cluster is not balanced", isClusterBalanced(datanodes) );
}
/**
* Balancer_02
* Test a cluster with even distribution, then a new empty node is
* added to the cluster. Here, even distribution effectively means the
* cluster is in "balanced" state, as bytes consumed for block allocation
* are evenly distributed throughout the cluster.
*/
@Test
public void testBalancerEvenDistributionWithNewNodeAdded() throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
// get all nodes
// need to get an external reserve of nodes we can boot up
// to add to this cluster?
// HOW?
// IDEA try to steal some nodes from omega-M for now.....
// hmmm also need a way to give an alternate "empty-node" config
// to "hide" the data that may already exist on this node
}
/**
* Balancer_03
* Bring up a 1-node DFS cluster. Set files replication factor to be 1
* and fill up the node to 30% full. Then add an empty datanode.
*/
@Test
public void testBalancerSingleNodeClusterWithNewNodeAdded() throws IOException {
// empty datanode: mod config to point to non-default blocks dir.
// limit capacity to available storage space
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_04
* The same as _03 except that the empty new data node is on a
* different rack.
*/
@Test
public void testBalancerSingleNodeClusterWithNewNodeAddedFromDifferentRack()
throws IOException {
// need rack awareness
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_05
* The same as _03 except that the empty new data node is half the
* capacity as the old one.
*/
@Test
public void testBalancerSingleNodeClusterWithHalfCapacityNewNode() {
// how to limit node capacity?
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_06
* Bring up a 2-node cluster and fill one node to be 60% and the
* other to be 10% full. All nodes are on different racks.
*/
@Test
public void testBalancerTwoNodeMultiRackCluster() {
// need rack awareness
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_07
* Bring up a dfs cluster with nodes A and B. Set file replication
* factor to be 2 and fill up the cluster to 30% full. Then add an
* empty data node C. All three nodes are on the same rack.
*/
@Test
public void testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded()
throws IOException {
final short TEST_REPLICATION_FACTOR = 3;
List<DNClient> testnodes = reserveDatanodesForTest(3);
DNClient dnA = testnodes.get(0);
DNClient dnB = testnodes.get(1);
DNClient dnC = testnodes.get(2);
stopDatanode(dnC);
// change test: 30% full-er (ie, 30% over pre-test capacity),
// use most heavily node as baseline
long targetLoad = (long) (
(1/DFS_BLOCK_SIZE) *
0.30 *
Math.max( getDatanodeUsedSpace(dnA), getDatanodeUsedSpace(dnB) ) );
generateFileSystemLoad(targetLoad, TEST_REPLICATION_FACTOR);
startDatanode(dnC);
runBalancerAndVerify(testnodes);
}
/**
* Balancer_08
* The same as _07 except that A, B and C are on different racks.
*/
@Test
public void testBalancerTwoNodeMultiRackClusterWithNewNodeAdded()
throws IOException {
// need rack awareness
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_09
* The same as _07 except that interrupt balancing.
*/
@Test
public void testBalancerTwoNodeSingleRackClusterInterruptingRebalance()
throws IOException {
// interrupt thread
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_10
* Restart rebalancing until it is done.
*/
@Test
public void testBalancerRestartInterruptedBalancerUntilDone()
throws IOException {
// need kill-restart thread
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_11
* The same as _07 except that the namenode is shutdown while rebalancing.
*/
@Test
public void testBalancerTwoNodeSingleRackShutdownNameNodeDuringRebalance()
throws IOException {
// need NN shutdown thread in addition
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_12
* The same as _05 except that FS writes occur during rebalancing.
*/
@Test
public void
testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSWrites()
throws IOException {
// writer thread
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_13
* The same as _05 except that FS deletes occur during rebalancing.
*/
@Test
public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletes()
throws IOException {
// eraser thread
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_14
* The same as _05 except that FS deletes AND writes occur during
* rebalancing.
*/
@Test
public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletesAndWrites()
throws IOException {
// writer & eraser threads
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_15
* Scalability test: Populate a 750-node cluster, then
* 1. Run rebalancing after 3 nodes are added
* 2. Run rebalancing after 2 racks of nodes (60 nodes) are added
* 3. Run rebalancing after 2 racks of nodes are added and concurrently
* executing file writing and deleting at the same time
*/
@Test
public void testBalancerScalability() throws IOException {
/* work in progress->
*
*
List<DNClient> dnList = getAllDatanodes();
int dnCount = dnList.size();
Assert.assertTrue(
String.format(
"not enough datanodes available to run test,"
+ " need 2 datanodes but have only %d available",
dnCount),
( dnCount == (875 - 2) ));
List<DNClient> datanodes = reserveDatanodesForTest(750);
shutdownNonTestNodes(datanodes);
*/
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_16
* Start balancer with a negative threshold value.
*/
@Test
public void testBalancerConfiguredWithThresholdValueNegative()
throws IOException {
List<DNClient> testnodes = getAllDatanodes();
final int TRIALS=5;
for(int i=0; i<TRIALS; i++) {
int negThreshold = (int)(-1 * 100 * Math.random());
runBalancerAndVerify(testnodes, negThreshold);
}
}
/**
* Balancer_17
* Start balancer with out-of-range threshold value
* (e.g. -123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989)
*/
@Test
public void testBalancerConfiguredWithThresholdValueOutOfRange()
throws IOException {
List<DNClient> testnodes = getAllDatanodes();
final int[] THRESHOLD_OUT_OF_RANGE_DATA = {
-123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989
};
for(int threshold: THRESHOLD_OUT_OF_RANGE_DATA) {
runBalancerAndVerify(testnodes, threshold);
}
}
/**
* Balancer_18
* Start balancer with alpha-numeric threshold value
* (e.g., 103dsf, asd234, asfd, ASD, #$asd, 2345&, $35, %34)
*/
@Test
public void testBalancerConfiguredWithThresholdValueAlphanumeric()
throws IOException {
List<DNClient> testnodes = getAllDatanodes();
final String[] THRESHOLD_ALPHA_DATA = {
"103dsf", "asd234", "asfd", "ASD", "#$asd", "2345&", "$35", "%34",
"0x64", "0xde", "0xad", "0xbe", "0xef"
};
for(String threshold: THRESHOLD_ALPHA_DATA) {
runBalancerAndVerify(testnodes,threshold);
}
}
/**
* Balancer_19
* Start 2 instances of balancer on the same gateway
*/
@Test
public void testBalancerRunTwoConcurrentInstancesOnSingleGateway()
throws IOException {
// do on gateway logic with small balancer heap
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_20
* Start 2 instances of balancer on two different gateways
*/
@Test
public void testBalancerRunTwoConcurrentInstancesOnDistinctGateways()
throws IOException {
// do on gateway logic with small balancer heap
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Balancer_21
* Start balancer when the cluster is already balanced
*/
@Test
public void testBalancerOnBalancedCluster() throws IOException {
// run balancer twice
testBalancerSimple();
testBalancerSimple();
}
/**
* Balancer_22
* Running the balancer with half the data nodes not running
*/
@Test
public void testBalancerWithOnlyHalfOfDataNodesRunning()
throws IOException {
List<DNClient> datanodes = getAllDatanodes();
int testnodeCount = (int)Math.floor(datanodes.size() * 0.5);
List<DNClient> testnodes = reserveDatanodesForTest(testnodeCount);
runBalancerAndVerify(testnodes);
}
/**
* Balancer_23
* Running the balancer and simultaneously simulating load on the
* cluster with half the data nodes not running.
*/
@Test
public void testBalancerOnBusyClusterWithOnlyHalfOfDatanodesRunning()
throws IOException {
// load thread
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* Protocol Test Prelude
*
* First set up 3 node cluster with nodes NA, NB and NC, which are on
* different racks. Then create a file with one block B with a replication
* factor 3. Finally add a new node ND to the cluster on the same rack as NC.
*/
/**
* ProtocolTest_01
* Copy block B from ND to NA with del hint NC
*/
@Test
public void
testBlockReplacementProtocolFailWhenCopyBlockSourceDoesNotHaveBlockToCopy()
throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
}
/*
* ProtocolTest_02
* Copy block B from NA to NB with del hint NB
*/
@Test
public void
testBlockReplacementProtocolFailWhenCopyBlockDestinationContainsBlockCopy()
throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* ProtocolTest_03
* Copy block B from NA to ND with del hint NB
*/
@Test
public void testBlockReplacementProtocolCopyBlock() throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* ProtocolTest_04
* Copy block B from NB to NC with del hint NA
*/
@Test
public void testBlockReplacementProtocolWithInvalidHint()
throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* ThrottleTest_01
* Create a throttler with 1MB/s bandwidth. Send 6MB data, and throttle
* at 0.5MB, 0.75MB, and in the end [1MB/s?].
*/
/**
* NamenodeProtocolTest_01
* Get blocks from datanode 0 with a size of 2 blocks.
*/
@Test
public void testNamenodeProtocolGetBlocksCheckThroughput()
throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* NamenodeProtocolTest_02
* Get blocks from datanode 0 with a size of 1 block.
*/
@Test
public void testNamenodeProtocolGetSingleBlock()
throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* NamenodeProtocolTest_03
* Get blocks from datanode 0 with a size of 0.
*/
@Test
public void testNamenodeProtocolGetZeroBlocks() throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
}
/**
* NamenodeProtocolTest_04
* Get blocks from datanode 0 with a size of -1.
*/
@Test
public void testNamenodeProtocolGetMinusOneBlocks() throws Exception {
}
/**
* NamenodeProtocolTest_05
* Get blocks from a non-existent datanode.
*/
@Test
public void testNamenodeProtocolGetBlocksFromNonexistentDatanode()
throws IOException {
final short replication = 1;
Path balancerTempDir = null;
try {
// reserve 2 nodes for test
List<DNClient> testnodes = reserveDatanodesForTest(2);
shutdownNonTestNodes(testnodes);
DNClient testnode1 = testnodes.get(0);
DNClient testnode2 = testnodes.get(1);
// write some blocks with replication factor of 1
balancerTempDir = makeTempDir();
generateFileSystemLoad(20, replication);
// get block locations from NN
NNClient namenode = dfsCluster.getNNClient();
// TODO extend namenode to get block locations
//namenode.get
// shutdown 1 node
stopDatanode(testnode1);
// attempt to retrieve blocks from the dead node
// we should fail
} finally {
// cleanup
// finally block to run cleanup
LOG.info("clean off test data from DFS [rmr ~/balancer-temp]");
try {
deleteTempDir(balancerTempDir);
} catch (Exception e) {
LOG.warn("problem cleaning up temp dir", e);
}
}
}
}