blob: a396115283a45c8854db81b31f2ebc72afc1458c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.*;
import java.util.*;
import junit.framework.TestCase;
import org.apache.commons.logging.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.conf.*;
public class TestFileSystem extends TestCase {
private static final Log LOG = InputFormatBase.LOG;
private static Configuration conf = new Configuration();
private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
private static final long MEGA = 1024 * 1024;
private static final int SEEKS_PER_FILE = 4;
private static String ROOT = System.getProperty("test.build.data","fs_test");
private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
private static Path WRITE_DIR = new Path(ROOT, "fs_write");
private static Path READ_DIR = new Path(ROOT, "fs_read");
private static Path DATA_DIR = new Path(ROOT, "fs_data");
public void testFs() throws Exception {
testFs(10 * MEGA, 100, 0);
}
public static void testFs(long megaBytes, int numFiles, long seed)
throws Exception {
FileSystem fs = FileSystem.get(conf);
if (seed == 0)
seed = new Random().nextLong();
LOG.info("seed = "+seed);
createControlFile(fs, megaBytes, numFiles, seed);
writeTest(fs, false);
readTest(fs, false);
seekTest(fs, false);
fs.delete(CONTROL_DIR);
fs.delete(DATA_DIR);
fs.delete(WRITE_DIR);
fs.delete(READ_DIR);
}
public static void createControlFile(FileSystem fs,
long megaBytes, int numFiles,
long seed) throws Exception {
LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files");
Path controlFile = new Path(CONTROL_DIR, "files");
fs.delete(controlFile);
Random random = new Random(seed);
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, controlFile,
UTF8.class, LongWritable.class, CompressionType.NONE);
long totalSize = 0;
long maxSize = ((megaBytes / numFiles) * 2) + 1;
try {
while (totalSize < megaBytes) {
UTF8 name = new UTF8(Long.toString(random.nextLong()));
long size = random.nextLong();
if (size < 0)
size = -size;
size = size % maxSize;
//LOG.info(" adding: name="+name+" size="+size);
writer.append(name, new LongWritable(size));
totalSize += size;
}
} finally {
writer.close();
}
LOG.info("created control file for: "+totalSize+" bytes");
}
public static class WriteMapper extends Configured implements Mapper {
private Random random = new Random();
private byte[] buffer = new byte[BUFFER_SIZE];
private FileSystem fs;
private boolean fastCheck;
// a random suffix per task
private String suffix = "-"+random.nextLong();
{
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public WriteMapper() { super(null); }
public WriteMapper(Configuration conf) { super(conf); }
public void configure(JobConf job) {
setConf(job);
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
public void map(WritableComparable key, Writable value,
OutputCollector collector, Reporter reporter)
throws IOException {
String name = ((UTF8)key).toString();
long size = ((LongWritable)value).get();
long seed = Long.parseLong(name);
random.setSeed(seed);
reporter.setStatus("creating " + name);
// write to temp file initially to permit parallel execution
Path tempFile = new Path(DATA_DIR, name+suffix);
OutputStream out = fs.create(tempFile);
long written = 0;
try {
while (written < size) {
if (fastCheck) {
Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
} else {
random.nextBytes(buffer);
}
long remains = size - written;
int length = (remains<=buffer.length) ? (int)remains : buffer.length;
out.write(buffer, 0, length);
written += length;
reporter.setStatus("writing "+name+"@"+written+"/"+size);
}
} finally {
out.close();
}
// rename to final location
fs.rename(tempFile, new Path(DATA_DIR, name));
collector.collect(new UTF8("bytes"), new LongWritable(written));
reporter.setStatus("wrote " + name);
}
public void close() {
}
}
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR);
fs.delete(WRITE_DIR);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
job.setInputPath(CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(LongWritable.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
job.setOutputPath(WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static class ReadMapper extends Configured implements Mapper {
private Random random = new Random();
private byte[] buffer = new byte[BUFFER_SIZE];
private byte[] check = new byte[BUFFER_SIZE];
private FileSystem fs;
private boolean fastCheck;
{
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public ReadMapper() { super(null); }
public ReadMapper(Configuration conf) { super(conf); }
public void configure(JobConf job) {
setConf(job);
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
public void map(WritableComparable key, Writable value,
OutputCollector collector, Reporter reporter)
throws IOException {
String name = ((UTF8)key).toString();
long size = ((LongWritable)value).get();
long seed = Long.parseLong(name);
random.setSeed(seed);
reporter.setStatus("opening " + name);
DataInputStream in =
new DataInputStream(fs.open(new Path(DATA_DIR, name)));
long read = 0;
try {
while (read < size) {
long remains = size - read;
int n = (remains<=buffer.length) ? (int)remains : buffer.length;
in.readFully(buffer, 0, n);
read += n;
if (fastCheck) {
Arrays.fill(check, (byte)random.nextInt(Byte.MAX_VALUE));
} else {
random.nextBytes(check);
}
if (n != buffer.length) {
Arrays.fill(buffer, n, buffer.length, (byte)0);
Arrays.fill(check, n, check.length, (byte)0);
}
assertTrue(Arrays.equals(buffer, check));
reporter.setStatus("reading "+name+"@"+read+"/"+size);
}
} finally {
in.close();
}
collector.collect(new UTF8("bytes"), new LongWritable(read));
reporter.setStatus("read " + name);
}
public void close() {
}
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
job.setInputPath(CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(LongWritable.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
job.setOutputPath(READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static class SeekMapper extends Configured implements Mapper {
private Random random = new Random();
private byte[] check = new byte[BUFFER_SIZE];
private FileSystem fs;
private boolean fastCheck;
{
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public SeekMapper() { super(null); }
public SeekMapper(Configuration conf) { super(conf); }
public void configure(JobConf job) {
setConf(job);
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
public void map(WritableComparable key, Writable value,
OutputCollector collector, Reporter reporter)
throws IOException {
String name = ((UTF8)key).toString();
long size = ((LongWritable)value).get();
long seed = Long.parseLong(name);
if (size == 0) return;
reporter.setStatus("opening " + name);
FSDataInputStream in = fs.open(new Path(DATA_DIR, name));
try {
for (int i = 0; i < SEEKS_PER_FILE; i++) {
// generate a random position
long position = Math.abs(random.nextLong()) % size;
// seek file to that position
reporter.setStatus("seeking " + name);
in.seek(position);
byte b = in.readByte();
// check that byte matches
byte checkByte = 0;
// advance random state to that position
random.setSeed(seed);
for (int p = 0; p <= position; p+= check.length) {
reporter.setStatus("generating data for " + name);
if (fastCheck) {
checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
} else {
random.nextBytes(check);
checkByte = check[(int)(position % check.length)];
}
}
assertEquals(b, checkByte);
}
} finally {
in.close();
}
}
public void close() {
}
}
public static void seekTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
job.setInputPath(CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(LongWritable.class);
job.setMapperClass(SeekMapper.class);
job.setReducerClass(LongSumReducer.class);
job.setOutputPath(READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void main(String[] args) throws Exception {
int megaBytes = 10;
int files = 100;
boolean noRead = false;
boolean noWrite = false;
boolean noSeek = false;
boolean fastCheck = false;
long seed = new Random().nextLong();
String usage = "Usage: TestFileSystem -files N -megaBytes M [-noread] [-nowrite] [-noseek] [-fastcheck]";
if (args.length == 0) {
System.err.println(usage);
System.exit(-1);
}
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equals("-files")) {
files = Integer.parseInt(args[++i]);
} else if (args[i].equals("-megaBytes")) {
megaBytes = Integer.parseInt(args[++i]);
} else if (args[i].equals("-noread")) {
noRead = true;
} else if (args[i].equals("-nowrite")) {
noWrite = true;
} else if (args[i].equals("-noseek")) {
noSeek = true;
} else if (args[i].equals("-fastcheck")) {
fastCheck = true;
}
}
LOG.info("seed = "+seed);
LOG.info("files = " + files);
LOG.info("megaBytes = " + megaBytes);
FileSystem fs = FileSystem.get(conf);
if (!noWrite) {
createControlFile(fs, megaBytes*MEGA, files, seed);
writeTest(fs, fastCheck);
}
if (!noRead) {
readTest(fs, fastCheck);
}
if (!noSeek) {
seekTest(fs, fastCheck);
}
}
}