blob: 86b3a1553966400fb76504d204b83fde1cde25df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import junit.framework.TestCase;
import junit.framework.AssertionFailedError;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.NameNode;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
/**
* Test DFS logging
* make sure that any namespace mutations are logged.
*/
public class ClusterTestDFSNamespaceLogging extends TestCase implements FSConstants {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.dfs.ClusterTestDFS");
private static Configuration conf = new Configuration();
/**
* all DFS test files go under this base directory
*/
private static String baseDirSpecified=conf.get("test.dfs.data", "/tmp/test-dfs");;
/**
* base dir as File
*/
private static File baseDir=new File(baseDirSpecified);
/**
* name node port
*/
int nameNodePort = conf.getInt("dfs.namenode.port", 9020);
/** DFS client, datanodes, and namenode
*/
DFSClient dfsClient;
ArrayList<DataNode> dataNodeDaemons = new ArrayList<DataNode>();
NameNode nameNodeDaemon;
/** Log header length
*/
private static final int DIR_LOG_HEADER_LEN = 30;
private static final int BLOCK_LOG_HEADER_LEN = 32;
/** DFS block size
*/
private static final int BLOCK_SIZE = 32*1024*1024;
/** Buffer size
*/
private static final int BUFFER_SIZE = 4096;
private BufferedReader logfh;
private String logFile;
protected void setUp() throws Exception {
super.setUp();
conf.setBoolean("test.dfs.same.host.targets.allowed", true);
}
/**
* Remove old files from temp area used by this test case and be sure
* base temp directory can be created.
*/
protected void prepareTempFileSpace() {
if (baseDir.exists()) {
try { // start from a blank state
FileUtil.fullyDelete(baseDir);
} catch (Exception ignored) {
}
}
baseDir.mkdirs();
if (!baseDir.isDirectory()) {
throw new RuntimeException("Value of root directory property"
+ "test.dfs.data for dfs test is not a directory: "
+ baseDirSpecified);
}
}
/**
* Pseudo Distributed FS Test.
* Test DFS by running all the necessary daemons in one process.
*
* @throws Exception
*/
public void testFsPseudoDistributed() throws Exception {
// test on a small cluster with 3 data nodes
testFsPseudoDistributed(3);
}
private void testFsPseudoDistributed(int datanodeNum) throws Exception {
try {
prepareTempFileSpace();
configureDFS();
startDFS(datanodeNum);
if (logfh == null)
try {
logfh = new BufferedReader(new FileReader(logFile));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
throw new AssertionFailedError("Log file does not exist: "+logFile);
}
// create a directory
try {
assertTrue(dfsClient.mkdirs(new UTF8("/data")));
assertMkdirs("/data", false);
} catch (IOException ioe) {
ioe.printStackTrace();
}
try {
assertTrue(dfsClient.mkdirs(new UTF8("data")));
assertMkdirs("data", true);
} catch (IOException ioe) {
ioe.printStackTrace();
}
//
// create a file with 1 data block
try {
createFile("/data/xx", 1);
assertCreate("/data/xx", 1, false);
} catch(IOException ioe) {
assertCreate("/data/xx", 1, true);
}
// create a file with 2 data blocks
try {
createFile("/data/yy", BLOCK_SIZE+1);
assertCreate("/data/yy", BLOCK_SIZE+1, false);
} catch(IOException ioe) {
assertCreate("/data/yy", BLOCK_SIZE+1, true);
}
// create an existing file
try {
createFile("/data/xx", 2);
assertCreate("/data/xx", 2, false);
} catch(IOException ioe) {
assertCreate("/data/xx", 2, true);
}
// delete the file
try {
dfsClient.delete(new UTF8("/data/yy"));
assertDelete("/data/yy", false);
} catch(IOException ioe) {
ioe.printStackTrace();
}
// rename the file
try {
dfsClient.rename(new UTF8("/data/xx"), new UTF8("/data/yy"));
assertRename("/data/xx", "/data/yy", false);
} catch(IOException ioe) {
ioe.printStackTrace();
}
try {
dfsClient.delete(new UTF8("/data/xx"));
assertDelete("/data/xx", true);
} catch(IOException ioe) {
ioe.printStackTrace();
}
try {
dfsClient.rename(new UTF8("/data/xx"), new UTF8("/data/yy"));
assertRename("/data/xx", "/data/yy", true);
} catch(IOException ioe) {
ioe.printStackTrace();
}
} catch (AssertionFailedError afe) {
afe.printStackTrace();
throw afe;
} catch (Throwable t) {
msg("Unexpected exception_a: " + t);
t.printStackTrace();
} finally {
shutdownDFS();
}
}
private void createFile(String filename, long fileSize) throws IOException {
//
// write filesize of data to file
//
byte[] buffer = new byte[BUFFER_SIZE];
UTF8 testFileName = new UTF8(filename); // hardcode filename
OutputStream nos;
nos = dfsClient.create(testFileName, false);
try {
for (long nBytesWritten = 0L;
nBytesWritten < fileSize;
nBytesWritten += buffer.length) {
if ((nBytesWritten + buffer.length) > fileSize) {
int pb = (int) (fileSize - nBytesWritten);
byte[] bufferPartial = new byte[pb];
for(int i=0; i<pb; i++) {
bufferPartial[i]='a';
}
nos.write(buffer);
} else {
for(int i=0; i<buffer.length;i++) {
buffer[i]='a';
}
nos.write(buffer);
}
}
} finally {
nos.flush();
nos.close();
}
}
private void assertMkdirs(String fileName, boolean failed) {
assertHasLogged("NameNode.mkdirs: " +fileName, DIR_LOG_HEADER_LEN+1);
assertHasLogged("NameSystem.mkdirs: "+fileName, DIR_LOG_HEADER_LEN);
if (failed)
assertHasLogged("FSDirectory.mkdirs: "
+"failed to create directory "+fileName, DIR_LOG_HEADER_LEN);
else
assertHasLogged("FSDirectory.mkdirs: created directory "+fileName, DIR_LOG_HEADER_LEN);
}
private void assertCreate(String fileName, int filesize, boolean failed) {
assertHasLogged("NameNode.create: file "+fileName, DIR_LOG_HEADER_LEN+1);
assertHasLogged("NameSystem.startFile: file "+fileName, DIR_LOG_HEADER_LEN);
if (failed) {
assertHasLogged("NameSystem.startFile: "
+"failed to create file " + fileName, DIR_LOG_HEADER_LEN);
} else {
assertHasLogged("NameSystem.allocateBlock: "+fileName, BLOCK_LOG_HEADER_LEN);
int blockNum = (filesize/BLOCK_SIZE*BLOCK_SIZE==filesize)?
filesize/BLOCK_SIZE : 1+filesize/BLOCK_SIZE;
for(int i=1; i<blockNum; i++) {
assertHasLogged("NameNode.addBlock: file "+fileName, BLOCK_LOG_HEADER_LEN+1);
assertHasLogged("NameSystem.getAdditionalBlock: file "+fileName, BLOCK_LOG_HEADER_LEN);
assertHasLogged("NameSystem.allocateBlock: "+fileName, BLOCK_LOG_HEADER_LEN);
}
assertHasLogged("NameNode.complete: "+fileName, DIR_LOG_HEADER_LEN+1);
assertHasLogged("NameSystem.completeFile: "+fileName, DIR_LOG_HEADER_LEN);
assertHasLogged("FSDirectory.addFile: "+fileName+" with "
+blockNum+" blocks is added to the file system", DIR_LOG_HEADER_LEN);
assertHasLogged("NameSystem.completeFile: "+fileName
+" is removed from pendingCreates", DIR_LOG_HEADER_LEN);
}
}
private void assertDelete(String fileName, boolean failed) {
assertHasLogged("NameNode.delete: "+fileName, DIR_LOG_HEADER_LEN+1);
assertHasLogged("NameSystem.delete: "+fileName, DIR_LOG_HEADER_LEN);
assertHasLogged("FSDirectory.delete: "+fileName, DIR_LOG_HEADER_LEN);
if (failed)
assertHasLogged("FSDirectory.unprotectedDelete: "
+"failed to remove "+fileName, DIR_LOG_HEADER_LEN);
else
assertHasLogged("FSDirectory.unprotectedDelete: "
+fileName+" is removed", DIR_LOG_HEADER_LEN);
}
private void assertRename(String src, String dst, boolean failed) {
assertHasLogged("NameNode.rename: "+src+" to "+dst, DIR_LOG_HEADER_LEN+1);
assertHasLogged("NameSystem.renameTo: "+src+" to "+dst, DIR_LOG_HEADER_LEN);
assertHasLogged("FSDirectory.renameTo: "+src+" to "+dst, DIR_LOG_HEADER_LEN);
if (failed)
assertHasLogged("FSDirectory.unprotectedRenameTo: "
+"failed to rename "+src+" to "+dst, DIR_LOG_HEADER_LEN);
else
assertHasLogged("FSDirectory.unprotectedRenameTo: "
+src+" is renamed to "+dst, DIR_LOG_HEADER_LEN);
}
private void assertHasLogged(String target, int headerLen) {
String line;
boolean notFound = true;
try {
while(notFound && (line=logfh.readLine()) != null) {
if (line.length()>headerLen && line.startsWith(target, headerLen))
notFound = false;
}
} catch(java.io.IOException e) {
throw new AssertionFailedError("error reading the log file");
}
if (notFound) {
throw new AssertionFailedError(target+" not logged");
}
}
//
// modify config for test
//
private void configureDFS() throws IOException {
// set given config param to override other config settings
conf.setInt("dfs.block.size", BLOCK_SIZE);
// verify that config changed
assertTrue(BLOCK_SIZE == conf.getInt("dfs.block.size", 2)); // 2 is an intentional obviously-wrong block size
// downsize for testing (just to save resources)
conf.setInt("dfs.namenode.handler.count", 3);
conf.setLong("dfs.blockreport.intervalMsec", 50*1000L);
conf.setLong("dfs.datanode.startupMsec", 15*1000L);
conf.setInt("dfs.replication", 2);
System.setProperty("hadoop.log.dir", baseDirSpecified+"/logs");
conf.setInt("hadoop.logfile.count", 1);
conf.setInt("hadoop.logfile.size", 1000000000);
}
private void startDFS(int dataNodeNum) throws IOException {
//
// start a NameNode
String nameNodeSocketAddr = "localhost:" + nameNodePort;
conf.set("fs.default.name", nameNodeSocketAddr);
String nameFSDir = baseDirSpecified + "/name";
conf.set("dfs.name.dir", nameFSDir);
NameNode.format(conf);
nameNodeDaemon = new NameNode("localhost", nameNodePort, conf);
//
// start DataNodes
//
for (int i = 0; i < dataNodeNum; i++) {
// uniquely config real fs path for data storage for this datanode
String dataDir[] = new String[1];
dataDir[0] = baseDirSpecified + "/datanode" + i;
conf.set("dfs.data.dir", dataDir[0]);
DataNode dn = DataNode.makeInstance(dataDir, conf);
if (dn != null) {
dataNodeDaemons.add(dn);
(new Thread(dn, "DataNode" + i + ": " + dataDir[0])).start();
}
}
assertTrue("incorrect datanodes for test to continue",
(dataNodeDaemons.size() == dataNodeNum));
//
// wait for datanodes to report in
try {
awaitQuiescence();
} catch(InterruptedException e) {
e.printStackTrace();
}
// act as if namenode is a remote process
dfsClient = new DFSClient(new InetSocketAddress("localhost", nameNodePort), conf);
}
private void shutdownDFS() {
// shutdown client
if (dfsClient != null) {
try {
msg("close down subthreads of DFSClient");
dfsClient.close();
} catch (Exception ignored) { }
msg("finished close down of DFSClient");
}
//
// shut down datanode daemons (this takes advantage of being same-process)
msg("begin shutdown of all datanode daemons");
for (int i = 0; i < dataNodeDaemons.size(); i++) {
DataNode dataNode = dataNodeDaemons.get(i);
try {
dataNode.shutdown();
} catch (Exception e) {
msg("ignoring exception during (all) datanode shutdown, e=" + e);
}
}
msg("finished shutdown of all datanode daemons");
// shutdown namenode
msg("begin shutdown of namenode daemon");
try {
nameNodeDaemon.stop();
} catch (Exception e) {
msg("ignoring namenode shutdown exception=" + e);
}
msg("finished shutdown of namenode daemon");
}
/** Wait for the DFS datanodes to become quiescent.
* The initial implementation is to sleep for some fixed amount of time,
* but a better implementation would be to really detect when distributed
* operations are completed.
* @throws InterruptedException
*/
private void awaitQuiescence() throws InterruptedException {
// ToDo: Need observer pattern, not static sleep
// Doug suggested that the block report interval could be made shorter
// and then observing that would be a good way to know when an operation
// was complete (quiescence detect).
sleepAtLeast(30000);
}
private void msg(String s) {
//System.out.println(s);
LOG.info(s);
}
public static void sleepAtLeast(int tmsec) {
long t0 = System.currentTimeMillis();
long t1 = t0;
long tslept = t1 - t0;
while (tmsec > tslept) {
try {
long tsleep = tmsec - tslept;
Thread.sleep(tsleep);
t1 = System.currentTimeMillis();
} catch (InterruptedException ie) {
t1 = System.currentTimeMillis();
}
tslept = t1 - t0;
}
}
public static void main(String[] args) throws Exception {
String usage = "Usage: ClusterTestDFSNameSpaceChangeLogging (no args)";
if (args.length != 0) {
System.err.println(usage);
System.exit(-1);
}
String[] testargs = {"org.apache.hadoop.dfs.ClusterTestDFSNameSpaceChangeLogging"};
junit.textui.TestRunner.main(testargs);
}
}