blob: f42d90b464e7c49f1414e4dad453a75428c9f367 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import junit.framework.TestCase;
import junit.framework.AssertionFailedError;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.conf.Configuration;
import java.io.File;
import java.io.FilenameFilter;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.ListIterator;
import java.util.Random;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
/**
* Test DFS.
* ClusterTestDFS is a JUnit test for DFS using "pseudo multiprocessing" (or
more strictly, pseudo distributed) meaning all daemons run in one process
and sockets are used to communicate between daemons. The test permutes
* various block sizes, number of files, file sizes, and number of
* datanodes. After creating 1 or more files and filling them with random
* data, one datanode is shutdown, and then the files are verfified.
* Next, all the random test files are deleted and we test for leakage
* (non-deletion) by directly checking the real directories corresponding
* to the datanodes still running.
* <p>
* Usage notes: TEST_PERMUTATION_MAX can be adjusted to perform more or
* less testing of permutations. The ceiling of useful permutation is
* TEST_PERMUTATION_MAX_CEILING.
* <p>
* DFSClient emits many messages that can be ignored like:
* "Failed to connect to *:7000:java.net.ConnectException: Connection refused: connect"
* because a datanode is forced to close during testing.
* <p>
* Warnings about "Zero targets found" can be ignored (these are naggingly
* emitted even though it is not possible to achieve the desired replication
* level with the number of active datanodes.)
* <p>
* Possible Extensions:
* <p>Bring a datanode down and restart it to verify reconnection to namenode.
* <p>Simulate running out of disk space on one datanode only.
* <p>Bring the namenode down and restart it to verify that datanodes reconnect.
* <p>
* <p>For a another approach to filesystem testing, see the high level
* (HadoopFS level) test {@link org.apache.hadoop.fs.TestFileSystem}.
*/
public class ClusterTestDFS extends TestCase implements FSConstants {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.dfs.ClusterTestDFS");
private static Configuration conf = new Configuration();
private static int BUFFER_SIZE =
conf.getInt("io.file.buffer.size", 4096);
private static int testCycleNumber = 0;
/**
* all DFS test files go under this base directory
*/
private static String baseDirSpecified;
/**
* base dir as File
*/
private static File baseDir;
/** DFS block sizes to permute over in multiple test cycles
* (array length should be prime).
*/
private static final int[] BLOCK_SIZES = {100000, 4096};
/** DFS file sizes to permute over in multiple test cycles
* (array length should be prime).
*/
private static final int[] FILE_SIZES =
{100000, 100001, 4095, 4096, 4097, 1000000, 1000001};
/** DFS file counts to permute over in multiple test cycles
* (array length should be prime).
*/
private static final int[] FILE_COUNTS = {1, 10, 100};
/** Number of useful permutations or test cycles.
* (The 2 factor represents the alternating 2 or 3 number of datanodes
* started.)
*/
private static final int TEST_PERMUTATION_MAX_CEILING =
BLOCK_SIZES.length * FILE_SIZES.length * FILE_COUNTS.length * 2;
/** Number of permutations of DFS test parameters to perform.
* If this is greater than ceiling TEST_PERMUTATION_MAX_CEILING, then the
* ceiling value is used.
*/
private static final int TEST_PERMUTATION_MAX = 3;
private Constructor randomDataGeneratorCtor = null;
static {
baseDirSpecified = System.getProperty("test.dfs.data", "/tmp/dfs_test");
baseDir = new File(baseDirSpecified);
}
protected void setUp() throws Exception {
super.setUp();
conf.setBoolean("test.dfs.same.host.targets.allowed", true);
}
/**
* Remove old files from temp area used by this test case and be sure
* base temp directory can be created.
*/
protected void prepareTempFileSpace() {
if (baseDir.exists()) {
try { // start from a blank slate
FileUtil.fullyDelete(baseDir);
} catch (Exception ignored) {
}
}
baseDir.mkdirs();
if (!baseDir.isDirectory()) {
throw new RuntimeException("Value of root directory property test.dfs.data for dfs test is not a directory: "
+ baseDirSpecified);
}
}
/**
* Pseudo Distributed FS Test.
* Test DFS by running all the necessary daemons in one process.
* Test various block sizes, number of files, disk space consumption,
* and leakage.
*
* @throws Exception
*/
public void testFsPseudoDistributed()
throws Exception {
while (testCycleNumber < TEST_PERMUTATION_MAX &&
testCycleNumber < TEST_PERMUTATION_MAX_CEILING) {
int blockSize = BLOCK_SIZES[testCycleNumber % BLOCK_SIZES.length];
int numFiles = FILE_COUNTS[testCycleNumber % FILE_COUNTS.length];
int fileSize = FILE_SIZES[testCycleNumber % FILE_SIZES.length];
prepareTempFileSpace();
testFsPseudoDistributed(fileSize, numFiles, blockSize,
(testCycleNumber % 2) + 2);
}
}
/**
* Pseudo Distributed FS Testing.
* Do one test cycle with given parameters.
*
* @param nBytes number of bytes to write to each file.
* @param numFiles number of files to create.
* @param blockSize block size to use for this test cycle.
* @param initialDNcount number of datanodes to create
* @throws Exception
*/
public void testFsPseudoDistributed(long nBytes, int numFiles,
int blockSize, int initialDNcount)
throws Exception {
long startTime = System.currentTimeMillis();
int bufferSize = Math.min(BUFFER_SIZE, blockSize);
boolean checkDataDirsEmpty = false;
int iDatanodeClosed = 0;
Random randomDataGenerator = makeRandomDataGenerator();
final int currentTestCycleNumber = testCycleNumber;
msg("using randomDataGenerator=" + randomDataGenerator.getClass().getName());
//
// modify config for test
//
// set given config param to override other config settings
conf.setInt("test.dfs.block_size", blockSize);
// verify that config changed
assertTrue(blockSize == conf.getInt("test.dfs.block_size", 2)); // 2 is an intentional obviously-wrong block size
// downsize for testing (just to save resources)
conf.setInt("dfs.namenode.handler.count", 3);
if (false) { // use MersenneTwister, if present
conf.set("hadoop.random.class",
"org.apache.hadoop.util.MersenneTwister");
}
conf.setLong("dfs.blockreport.intervalMsec", 50*1000L);
conf.setLong("dfs.datanode.startupMsec", 15*1000L);
String nameFSDir = baseDirSpecified + "/name";
msg("----Start Test Cycle=" + currentTestCycleNumber +
" test.dfs.block_size=" + blockSize +
" nBytes=" + nBytes +
" numFiles=" + numFiles +
" initialDNcount=" + initialDNcount);
//
// start a NameNode
int nameNodePort = 9000 + testCycleNumber++; // ToDo: settable base port
String nameNodeSocketAddr = "localhost:" + nameNodePort;
conf.set("dfs.name.dir", nameFSDir);
NameNode nameNodeDaemon = new NameNode("localhost", nameNodePort, conf);
DFSClient dfsClient = null;
try {
//
// start some DataNodes
//
ArrayList<DataNode> listOfDataNodeDaemons = new ArrayList<DataNode>();
conf.set("fs.default.name", nameNodeSocketAddr);
for (int i = 0; i < initialDNcount; i++) {
// uniquely config real fs path for data storage for this datanode
String dataDirs[] = new String[1];
dataDirs[0] = baseDirSpecified + "/datanode" + i;
conf.set("dfs.data.dir", dataDirs[0]);
DataNode dn = DataNode.makeInstance(dataDirs, conf);
if (dn != null) {
listOfDataNodeDaemons.add(dn);
(new Thread(dn, "DataNode" + i + ": " + dataDirs[0])).start();
}
}
try {
assertTrue("insufficient datanodes for test to continue",
(listOfDataNodeDaemons.size() >= 2));
//
// wait for datanodes to report in
awaitQuiescence();
// act as if namenode is a remote process
dfsClient = new DFSClient(new InetSocketAddress("localhost", nameNodePort), conf);
//
// write nBytes of data using randomDataGenerator to numFiles
//
ArrayList<UTF8> testfilesList = new ArrayList<UTF8>();
byte[] buffer = new byte[bufferSize];
UTF8 testFileName = null;
for (int iFileNumber = 0; iFileNumber < numFiles; iFileNumber++) {
testFileName = new UTF8("/f" + iFileNumber);
testfilesList.add(testFileName);
OutputStream nos = dfsClient.create(testFileName, false);
try {
for (long nBytesWritten = 0L;
nBytesWritten < nBytes;
nBytesWritten += buffer.length) {
if ((nBytesWritten + buffer.length) > nBytes) {
// calculate byte count needed to exactly hit nBytes in length
// to keep randomDataGenerator in sync during the verify step
int pb = (int) (nBytes - nBytesWritten);
byte[] bufferPartial = new byte[pb];
randomDataGenerator.nextBytes(bufferPartial);
nos.write(bufferPartial);
} else {
randomDataGenerator.nextBytes(buffer);
nos.write(buffer);
}
}
} finally {
nos.flush();
nos.close();
}
}
//
// No need to wait for blocks to be replicated because replication
// is supposed to be complete when the file is closed.
//
//
// take one datanode down
iDatanodeClosed =
currentTestCycleNumber % listOfDataNodeDaemons.size();
DataNode dn = (DataNode) listOfDataNodeDaemons.get(iDatanodeClosed);
msg("shutdown datanode daemon " + iDatanodeClosed +
" dn=" + dn.data);
try {
dn.shutdown();
} catch (Exception e) {
msg("ignoring datanode shutdown exception=" + e);
}
//
// verify data against a "rewound" randomDataGenerator
// that all of the data is intact
long lastLong = randomDataGenerator.nextLong();
randomDataGenerator = makeRandomDataGenerator(); // restart (make new) PRNG
ListIterator li = testfilesList.listIterator();
while (li.hasNext()) {
testFileName = (UTF8) li.next();
FSInputStream nis = dfsClient.open(testFileName);
byte[] bufferGolden = new byte[bufferSize];
int m = 42;
try {
while (m != -1) {
m = nis.read(buffer);
if (m == buffer.length) {
randomDataGenerator.nextBytes(bufferGolden);
assertBytesEqual(buffer, bufferGolden, buffer.length);
} else if (m > 0) {
byte[] bufferGoldenPartial = new byte[m];
randomDataGenerator.nextBytes(bufferGoldenPartial);
assertBytesEqual(buffer, bufferGoldenPartial, bufferGoldenPartial.length);
}
}
} finally {
nis.close();
}
}
// verify last randomDataGenerator rand val to ensure last file length was checked
long lastLongAgain = randomDataGenerator.nextLong();
assertEquals(lastLong, lastLongAgain);
msg("Finished validating all file contents");
//
// now delete all the created files
msg("Delete all random test files under DFS via remaining datanodes");
li = testfilesList.listIterator();
while (li.hasNext()) {
testFileName = (UTF8) li.next();
assertTrue(dfsClient.delete(testFileName));
}
//
// wait for delete to be propagated
// (unlike writing files, delete is lazy)
msg("Test thread sleeping while datanodes propagate delete...");
awaitQuiescence();
msg("Test thread awakens to verify file contents");
//
// check that the datanode's block directory is empty
// (except for datanode that had forced shutdown)
checkDataDirsEmpty = true; // do it during finally clause
} catch (AssertionFailedError afe) {
throw afe;
} catch (Throwable t) {
msg("Unexpected exception_b: " + t);
t.printStackTrace();
} finally {
//
// shut down datanode daemons (this takes advantage of being same-process)
msg("begin shutdown of all datanode daemons for test cycle " +
currentTestCycleNumber);
for (int i = 0; i < listOfDataNodeDaemons.size(); i++) {
DataNode dataNode = (DataNode) listOfDataNodeDaemons.get(i);
if (i != iDatanodeClosed) {
try {
if (checkDataDirsEmpty) {
assertNoBlocks(dataNode);
}
dataNode.shutdown();
} catch (Exception e) {
msg("ignoring exception during (all) datanode shutdown, e=" + e);
}
}
}
}
msg("finished shutdown of all datanode daemons for test cycle " +
currentTestCycleNumber);
if (dfsClient != null) {
try {
msg("close down subthreads of DFSClient");
dfsClient.close();
} catch (Exception ignored) { }
msg("finished close down of DFSClient");
}
} catch (AssertionFailedError afe) {
throw afe;
} catch (Throwable t) {
msg("Unexpected exception_a: " + t);
t.printStackTrace();
} finally {
// shut down namenode daemon (this takes advantage of being same-process)
msg("begin shutdown of namenode daemon for test cycle " +
currentTestCycleNumber);
try {
nameNodeDaemon.stop();
} catch (Exception e) {
msg("ignoring namenode shutdown exception=" + e);
}
msg("finished shutdown of namenode daemon for test cycle " +
currentTestCycleNumber);
}
msg("test cycle " + currentTestCycleNumber + " elapsed time=" +
(System.currentTimeMillis() - startTime) / 1000. + "sec");
msg("threads still running (look for stragglers): ");
msg(summarizeThreadGroup());
}
private void assertNoBlocks(DataNode dn) {
Block[] blocks = dn.data.getBlockReport();
// if this fails, the delete did not propagate because either
// awaitQuiescence() returned before the disk images were removed
// or a real failure was detected.
assertTrue(" data dir not empty: " + dn.data.volumes,
blocks.length==0);
}
/**
* Make a data generator.
* Allows optional use of high quality PRNG by setting property
* hadoop.random.class to the full class path of a subclass of
* java.util.Random such as "...util.MersenneTwister".
* The property test.dfs.random.seed can supply a seed for reproducible
* testing (a default is set here if property is not set.)
*/
private Random makeRandomDataGenerator() {
long seed = conf.getLong("test.dfs.random.seed", 0xB437EF);
try {
if (randomDataGeneratorCtor == null) {
// lazy init
String rndDataGenClassname =
conf.get("hadoop.random.class", "java.util.Random");
Class<?> clazz = Class.forName(rndDataGenClassname);
randomDataGeneratorCtor = clazz.getConstructor(Long.TYPE);
}
if (randomDataGeneratorCtor != null) {
Object arg[] = {new Long(seed)};
return (Random) randomDataGeneratorCtor.newInstance(arg);
}
} catch (ClassNotFoundException absorb) {
} catch (NoSuchMethodException absorb) {
} catch (SecurityException absorb) {
} catch (InstantiationException absorb) {
} catch (IllegalAccessException absorb) {
} catch (IllegalArgumentException absorb) {
} catch (InvocationTargetException absorb) {
}
// last resort
return new java.util.Random(seed);
}
/** Wait for the DFS datanodes to become quiescent.
* The initial implementation is to sleep for some fixed amount of time,
* but a better implementation would be to really detect when distributed
* operations are completed.
* @throws InterruptedException
*/
private void awaitQuiescence() throws InterruptedException {
// ToDo: Need observer pattern, not static sleep
// Doug suggested that the block report interval could be made shorter
// and then observing that would be a good way to know when an operation
// was complete (quiescence detect).
sleepAtLeast(60000);
}
private void assertBytesEqual(byte[] buffer, byte[] bufferGolden, int len) {
for (int i = 0; i < len; i++) {
assertEquals(buffer[i], bufferGolden[i]);
}
}
private void msg(String s) {
//System.out.println(s);
LOG.info(s);
}
public static void sleepAtLeast(int tmsec) {
long t0 = System.currentTimeMillis();
long t1 = t0;
long tslept = t1 - t0;
while (tmsec > tslept) {
try {
long tsleep = tmsec - tslept;
Thread.sleep(tsleep);
t1 = System.currentTimeMillis();
} catch (InterruptedException ie) {
t1 = System.currentTimeMillis();
}
tslept = t1 - t0;
}
}
public static String summarizeThreadGroup() {
int n = 10;
int k = 0;
Thread[] tarray = null;
StringBuffer sb = new StringBuffer(500);
do {
n = n * 10;
tarray = new Thread[n];
k = Thread.enumerate(tarray);
} while (k == n); // while array is too small...
for (int i = 0; i < k; i++) {
Thread thread = tarray[i];
sb.append(thread.toString());
sb.append("\n");
}
return sb.toString();
}
public static void main(String[] args) throws Exception {
String usage = "Usage: ClusterTestDFS (no args)";
if (args.length != 0) {
System.err.println(usage);
System.exit(-1);
}
String[] testargs = {"org.apache.hadoop.dfs.ClusterTestDFS"};
junit.textui.TestRunner.main(testargs);
}
}