| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs; |
| |
| import java.io.BufferedReader; |
| import java.io.DataInputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.PrintStream; |
| import java.util.Date; |
| import java.util.StringTokenizer; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.SequenceFile.CompressionType; |
| import org.apache.hadoop.mapred.*; |
| |
| /** |
| * Distributed i/o benchmark. |
| * <p> |
| * This test writes into or reads from a specified number of files. |
| * File size is specified as a parameter to the test. |
| * Each file is accessed in a separate map task. |
| * <p> |
| * The reducer collects the following statistics: |
| * <ul> |
| * <li>number of tasks completed</li> |
| * <li>number of bytes written/read</li> |
| * <li>execution time</li> |
| * <li>io rate</li> |
| * <li>io rate squared</li> |
| * </ul> |
| * |
| * Finally, the following information is appended to a local file |
| * <ul> |
| * <li>read or write test</li> |
| * <li>date and time the test finished</li> |
| * <li>number of files</li> |
| * <li>total number of bytes processed</li> |
| * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li> |
| * <li>average i/o rate in mb/sec per file</li> |
| * <li>standard i/o rate deviation</li> |
| * </ul> |
| */ |
| public class DFSCIOTest extends TestCase { |
| // Constants |
| private static final Log LOG = LogFactory.getLog(DFSCIOTest.class); |
| private static final int TEST_TYPE_READ = 0; |
| private static final int TEST_TYPE_WRITE = 1; |
| private static final int TEST_TYPE_CLEANUP = 2; |
| private static final int DEFAULT_BUFFER_SIZE = 1000000; |
| private static final String BASE_FILE_NAME = "test_io_"; |
| private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log"; |
| |
| private static Configuration fsConfig = new Configuration(); |
| private static final long MEGA = 0x100000; |
| private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest"); |
| private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); |
| private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); |
| private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); |
| private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); |
| |
| private static Path HDFS_TEST_DIR = new Path("/tmp/DFSCIOTest"); |
| private static String HDFS_LIB_VERSION = System.getProperty("libhdfs.version", "1"); |
| private static String CHMOD = new String("chmod"); |
| private static Path HDFS_SHLIB = new Path(HDFS_TEST_DIR + "/libhdfs.so." + HDFS_LIB_VERSION); |
| private static Path HDFS_READ = new Path(HDFS_TEST_DIR + "/hdfs_read"); |
| private static Path HDFS_WRITE = new Path(HDFS_TEST_DIR + "/hdfs_write"); |
| |
| /** |
| * Run the test with default parameters. |
| * |
| * @throws Exception |
| */ |
| public void testIOs() throws Exception { |
| testIOs(10, 10); |
| } |
| |
| /** |
| * Run the test with the specified parameters. |
| * |
| * @param fileSize file size |
| * @param nrFiles number of files |
| * @throws IOException |
| */ |
| public static void testIOs(int fileSize, int nrFiles) |
| throws IOException { |
| |
| FileSystem fs = FileSystem.get(fsConfig); |
| |
| createControlFile(fs, fileSize, nrFiles); |
| writeTest(fs); |
| readTest(fs); |
| } |
| |
| private static void createControlFile( |
| FileSystem fs, |
| int fileSize, // in MB |
| int nrFiles |
| ) throws IOException { |
| LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); |
| |
| fs.delete(CONTROL_DIR, true); |
| |
| for(int i=0; i < nrFiles; i++) { |
| String name = getFileName(i); |
| Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); |
| SequenceFile.Writer writer = null; |
| try { |
| writer = SequenceFile.createWriter(fs, fsConfig, controlFile, |
| Text.class, LongWritable.class, |
| CompressionType.NONE); |
| writer.append(new Text(name), new LongWritable(fileSize)); |
| } catch(Exception e) { |
| throw new IOException(e.getLocalizedMessage()); |
| } finally { |
| if (writer != null) |
| writer.close(); |
| writer = null; |
| } |
| } |
| LOG.info("created control files for: "+nrFiles+" files"); |
| } |
| |
| private static String getFileName(int fIdx) { |
| return BASE_FILE_NAME + Integer.toString(fIdx); |
| } |
| |
| /** |
| * Write/Read mapper base class. |
| * <p> |
| * Collects the following statistics per task: |
| * <ul> |
| * <li>number of tasks completed</li> |
| * <li>number of bytes written/read</li> |
| * <li>execution time</li> |
| * <li>i/o rate</li> |
| * <li>i/o rate squared</li> |
| * </ul> |
| */ |
| private abstract static class IOStatMapper extends IOMapperBase<Long> { |
| IOStatMapper() { |
| } |
| |
| void collectStats(OutputCollector<Text, Text> output, |
| String name, |
| long execTime, |
| Long objSize) throws IOException { |
| long totalSize = objSize.longValue(); |
| float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); |
| LOG.info("Number of bytes processed = " + totalSize); |
| LOG.info("Exec time = " + execTime); |
| LOG.info("IO rate = " + ioRateMbSec); |
| |
| output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), |
| new Text(String.valueOf(1))); |
| output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), |
| new Text(String.valueOf(totalSize))); |
| output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), |
| new Text(String.valueOf(execTime))); |
| output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), |
| new Text(String.valueOf(ioRateMbSec*1000))); |
| output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), |
| new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); |
| } |
| } |
| |
| /** |
| * Write mapper class. |
| */ |
| public static class WriteMapper extends IOStatMapper { |
| |
| public WriteMapper() { |
| super(); |
| for(int i=0; i < bufferSize; i++) |
| buffer[i] = (byte)('0' + i % 50); |
| } |
| |
| public Long doIO(Reporter reporter, |
| String name, |
| long totalSize |
| ) throws IOException { |
| // create file |
| totalSize *= MEGA; |
| |
| // create instance of local filesystem |
| FileSystem localFS = FileSystem.getLocal(fsConfig); |
| |
| try { |
| // native runtime |
| Runtime runTime = Runtime.getRuntime(); |
| |
| // copy the dso and executable from dfs and chmod them |
| synchronized (this) { |
| localFS.delete(HDFS_TEST_DIR, true); |
| if (!(localFS.mkdirs(HDFS_TEST_DIR))) { |
| throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); |
| } |
| } |
| |
| synchronized (this) { |
| if (!localFS.exists(HDFS_SHLIB)) { |
| FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig); |
| |
| String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); |
| Process process = runTime.exec(chmodCmd); |
| int exitStatus = process.waitFor(); |
| if (exitStatus != 0) { |
| throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); |
| } |
| } |
| } |
| |
| synchronized (this) { |
| if (!localFS.exists(HDFS_WRITE)) { |
| FileUtil.copy(fs, HDFS_WRITE, localFS, HDFS_WRITE, false, fsConfig); |
| |
| String chmodCmd = new String(CHMOD + " a+x " + HDFS_WRITE); |
| Process process = runTime.exec(chmodCmd); |
| int exitStatus = process.waitFor(); |
| if (exitStatus != 0) { |
| throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); |
| } |
| } |
| } |
| |
| // exec the C program |
| Path outFile = new Path(DATA_DIR, name); |
| String writeCmd = new String(HDFS_WRITE + " " + outFile + " " + totalSize + " " + bufferSize); |
| Process process = runTime.exec(writeCmd, null, new File(HDFS_TEST_DIR.toString())); |
| int exitStatus = process.waitFor(); |
| if (exitStatus != 0) { |
| throw new IOException(writeCmd + ": Failed with exitStatus: " + exitStatus); |
| } |
| } catch (InterruptedException interruptedException) { |
| reporter.setStatus(interruptedException.toString()); |
| } finally { |
| localFS.close(); |
| } |
| return new Long(totalSize); |
| } |
| } |
| |
| private static void writeTest(FileSystem fs) |
| throws IOException { |
| |
| fs.delete(DATA_DIR, true); |
| fs.delete(WRITE_DIR, true); |
| |
| runIOTest(WriteMapper.class, WRITE_DIR); |
| } |
| |
| private static void runIOTest( Class<? extends Mapper> mapperClass, |
| Path outputDir |
| ) throws IOException { |
| JobConf job = new JobConf(fsConfig, DFSCIOTest.class); |
| |
| FileInputFormat.setInputPaths(job, CONTROL_DIR); |
| job.setInputFormat(SequenceFileInputFormat.class); |
| |
| job.setMapperClass(mapperClass); |
| job.setReducerClass(AccumulatingReducer.class); |
| |
| FileOutputFormat.setOutputPath(job, outputDir); |
| job.setOutputKeyClass(Text.class); |
| job.setOutputValueClass(Text.class); |
| job.setNumReduceTasks(1); |
| JobClient.runJob(job); |
| } |
| |
| /** |
| * Read mapper class. |
| */ |
| public static class ReadMapper extends IOStatMapper { |
| |
| public ReadMapper() { |
| super(); |
| } |
| |
| public Long doIO(Reporter reporter, |
| String name, |
| long totalSize |
| ) throws IOException { |
| totalSize *= MEGA; |
| |
| // create instance of local filesystem |
| FileSystem localFS = FileSystem.getLocal(fsConfig); |
| |
| try { |
| // native runtime |
| Runtime runTime = Runtime.getRuntime(); |
| |
| // copy the dso and executable from dfs |
| synchronized (this) { |
| localFS.delete(HDFS_TEST_DIR, true); |
| if (!(localFS.mkdirs(HDFS_TEST_DIR))) { |
| throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); |
| } |
| } |
| |
| synchronized (this) { |
| if (!localFS.exists(HDFS_SHLIB)) { |
| if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) { |
| throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem"); |
| } |
| |
| String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); |
| Process process = runTime.exec(chmodCmd); |
| int exitStatus = process.waitFor(); |
| if (exitStatus != 0) { |
| throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); |
| } |
| } |
| } |
| |
| synchronized (this) { |
| if (!localFS.exists(HDFS_READ)) { |
| if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) { |
| throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem"); |
| } |
| |
| String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ); |
| Process process = runTime.exec(chmodCmd); |
| int exitStatus = process.waitFor(); |
| |
| if (exitStatus != 0) { |
| throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); |
| } |
| } |
| } |
| |
| // exec the C program |
| Path inFile = new Path(DATA_DIR, name); |
| String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " + |
| bufferSize); |
| Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString())); |
| int exitStatus = process.waitFor(); |
| |
| if (exitStatus != 0) { |
| throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus); |
| } |
| } catch (InterruptedException interruptedException) { |
| reporter.setStatus(interruptedException.toString()); |
| } finally { |
| localFS.close(); |
| } |
| return new Long(totalSize); |
| } |
| } |
| |
| private static void readTest(FileSystem fs) throws IOException { |
| fs.delete(READ_DIR, true); |
| runIOTest(ReadMapper.class, READ_DIR); |
| } |
| |
| private static void sequentialTest( |
| FileSystem fs, |
| int testType, |
| int fileSize, |
| int nrFiles |
| ) throws Exception { |
| IOStatMapper ioer = null; |
| if (testType == TEST_TYPE_READ) |
| ioer = new ReadMapper(); |
| else if (testType == TEST_TYPE_WRITE) |
| ioer = new WriteMapper(); |
| else |
| return; |
| for(int i=0; i < nrFiles; i++) |
| ioer.doIO(Reporter.NULL, |
| BASE_FILE_NAME+Integer.toString(i), |
| MEGA*fileSize); |
| } |
| |
| public static void main(String[] args) { |
| int testType = TEST_TYPE_READ; |
| int bufferSize = DEFAULT_BUFFER_SIZE; |
| int fileSize = 1; |
| int nrFiles = 1; |
| String resFileName = DEFAULT_RES_FILE_NAME; |
| boolean isSequential = false; |
| |
| String version="DFSCIOTest.0.0.1"; |
| String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; |
| |
| System.out.println(version); |
| if (args.length == 0) { |
| System.err.println(usage); |
| System.exit(-1); |
| } |
| for (int i = 0; i < args.length; i++) { // parse command line |
| if (args[i].startsWith("-r")) { |
| testType = TEST_TYPE_READ; |
| } else if (args[i].startsWith("-w")) { |
| testType = TEST_TYPE_WRITE; |
| } else if (args[i].startsWith("-clean")) { |
| testType = TEST_TYPE_CLEANUP; |
| } else if (args[i].startsWith("-seq")) { |
| isSequential = true; |
| } else if (args[i].equals("-nrFiles")) { |
| nrFiles = Integer.parseInt(args[++i]); |
| } else if (args[i].equals("-fileSize")) { |
| fileSize = Integer.parseInt(args[++i]); |
| } else if (args[i].equals("-bufferSize")) { |
| bufferSize = Integer.parseInt(args[++i]); |
| } else if (args[i].equals("-resFile")) { |
| resFileName = args[++i]; |
| } |
| } |
| |
| LOG.info("nrFiles = " + nrFiles); |
| LOG.info("fileSize (MB) = " + fileSize); |
| LOG.info("bufferSize = " + bufferSize); |
| |
| try { |
| fsConfig.setInt("test.io.file.buffer.size", bufferSize); |
| FileSystem fs = FileSystem.get(fsConfig); |
| |
| if (testType != TEST_TYPE_CLEANUP) { |
| fs.delete(HDFS_TEST_DIR, true); |
| if (!fs.mkdirs(HDFS_TEST_DIR)) { |
| throw new IOException("Mkdirs failed to create " + |
| HDFS_TEST_DIR.toString()); |
| } |
| |
| //Copy the executables over to the remote filesystem |
| String hadoopHome = System.getenv("HADOOP_HOME"); |
| fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION), |
| HDFS_SHLIB); |
| fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ); |
| fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE); |
| } |
| |
| if (isSequential) { |
| long tStart = System.currentTimeMillis(); |
| sequentialTest(fs, testType, fileSize, nrFiles); |
| long execTime = System.currentTimeMillis() - tStart; |
| String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; |
| LOG.info(resultLine); |
| return; |
| } |
| if (testType == TEST_TYPE_CLEANUP) { |
| cleanup(fs); |
| return; |
| } |
| createControlFile(fs, fileSize, nrFiles); |
| long tStart = System.currentTimeMillis(); |
| if (testType == TEST_TYPE_WRITE) |
| writeTest(fs); |
| if (testType == TEST_TYPE_READ) |
| readTest(fs); |
| long execTime = System.currentTimeMillis() - tStart; |
| |
| analyzeResult(fs, testType, execTime, resFileName); |
| } catch(Exception e) { |
| System.err.print(e.getLocalizedMessage()); |
| System.exit(-1); |
| } |
| } |
| |
| private static void analyzeResult( FileSystem fs, |
| int testType, |
| long execTime, |
| String resFileName |
| ) throws IOException { |
| Path reduceFile; |
| if (testType == TEST_TYPE_WRITE) |
| reduceFile = new Path(WRITE_DIR, "part-00000"); |
| else |
| reduceFile = new Path(READ_DIR, "part-00000"); |
| DataInputStream in; |
| in = new DataInputStream(fs.open(reduceFile)); |
| |
| BufferedReader lines; |
| lines = new BufferedReader(new InputStreamReader(in)); |
| long tasks = 0; |
| long size = 0; |
| long time = 0; |
| float rate = 0; |
| float sqrate = 0; |
| String line; |
| while((line = lines.readLine()) != null) { |
| StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); |
| String attr = tokens.nextToken(); |
| if (attr.endsWith(":tasks")) |
| tasks = Long.parseLong(tokens.nextToken()); |
| else if (attr.endsWith(":size")) |
| size = Long.parseLong(tokens. nextToken()); |
| else if (attr.endsWith(":time")) |
| time = Long.parseLong(tokens.nextToken()); |
| else if (attr.endsWith(":rate")) |
| rate = Float.parseFloat(tokens.nextToken()); |
| else if (attr.endsWith(":sqrate")) |
| sqrate = Float.parseFloat(tokens.nextToken()); |
| } |
| |
| double med = rate / 1000 / tasks; |
| double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); |
| String resultLines[] = { |
| "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : |
| (testType == TEST_TYPE_READ) ? "read" : |
| "unknown"), |
| " Date & time: " + new Date(System.currentTimeMillis()), |
| " Number of files: " + tasks, |
| "Total MBytes processed: " + size/MEGA, |
| " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), |
| "Average IO rate mb/sec: " + med, |
| " Std IO rate deviation: " + stdDev, |
| " Test exec time sec: " + (float)execTime / 1000, |
| "" }; |
| |
| PrintStream res = new PrintStream( |
| new FileOutputStream( |
| new File(resFileName), true)); |
| for(int i = 0; i < resultLines.length; i++) { |
| LOG.info(resultLines[i]); |
| res.println(resultLines[i]); |
| } |
| } |
| |
| private static void cleanup(FileSystem fs) throws Exception { |
| LOG.info("Cleaning up test files"); |
| fs.delete(new Path(TEST_ROOT_DIR), true); |
| fs.delete(HDFS_TEST_DIR, true); |
| } |
| } |