| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs; |
| |
| import java.io.*; |
| import java.util.*; |
| import junit.framework.TestCase; |
| |
| import org.apache.commons.logging.*; |
| |
| import org.apache.hadoop.mapred.*; |
| import org.apache.hadoop.mapred.lib.*; |
| import org.apache.hadoop.io.*; |
| import org.apache.hadoop.io.SequenceFile.CompressionType; |
| import org.apache.hadoop.conf.*; |
| |
| public class TestFileSystem extends TestCase { |
| private static final Log LOG = InputFormatBase.LOG; |
| |
| private static Configuration conf = new Configuration(); |
| private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096); |
| |
| private static final long MEGA = 1024 * 1024; |
| private static final int SEEKS_PER_FILE = 4; |
| |
| private static String ROOT = System.getProperty("test.build.data","fs_test"); |
| private static Path CONTROL_DIR = new Path(ROOT, "fs_control"); |
| private static Path WRITE_DIR = new Path(ROOT, "fs_write"); |
| private static Path READ_DIR = new Path(ROOT, "fs_read"); |
| private static Path DATA_DIR = new Path(ROOT, "fs_data"); |
| |
| public void testFs() throws Exception { |
| testFs(10 * MEGA, 100, 0); |
| } |
| |
| public static void testFs(long megaBytes, int numFiles, long seed) |
| throws Exception { |
| |
| FileSystem fs = FileSystem.get(conf); |
| |
| if (seed == 0) |
| seed = new Random().nextLong(); |
| |
| LOG.info("seed = "+seed); |
| |
| createControlFile(fs, megaBytes, numFiles, seed); |
| writeTest(fs, false); |
| readTest(fs, false); |
| seekTest(fs, false); |
| fs.delete(CONTROL_DIR); |
| fs.delete(DATA_DIR); |
| fs.delete(WRITE_DIR); |
| fs.delete(READ_DIR); |
| } |
| |
| public static void createControlFile(FileSystem fs, |
| long megaBytes, int numFiles, |
| long seed) throws Exception { |
| |
| LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files"); |
| |
| Path controlFile = new Path(CONTROL_DIR, "files"); |
| fs.delete(controlFile); |
| Random random = new Random(seed); |
| |
| SequenceFile.Writer writer = |
| SequenceFile.createWriter(fs, conf, controlFile, |
| UTF8.class, LongWritable.class, CompressionType.NONE); |
| |
| long totalSize = 0; |
| long maxSize = ((megaBytes / numFiles) * 2) + 1; |
| try { |
| while (totalSize < megaBytes) { |
| UTF8 name = new UTF8(Long.toString(random.nextLong())); |
| |
| long size = random.nextLong(); |
| if (size < 0) |
| size = -size; |
| size = size % maxSize; |
| |
| //LOG.info(" adding: name="+name+" size="+size); |
| |
| writer.append(name, new LongWritable(size)); |
| |
| totalSize += size; |
| } |
| } finally { |
| writer.close(); |
| } |
| LOG.info("created control file for: "+totalSize+" bytes"); |
| } |
| |
| public static class WriteMapper extends Configured implements Mapper { |
| private Random random = new Random(); |
| private byte[] buffer = new byte[BUFFER_SIZE]; |
| private FileSystem fs; |
| private boolean fastCheck; |
| |
| // a random suffix per task |
| private String suffix = "-"+random.nextLong(); |
| |
| { |
| try { |
| fs = FileSystem.get(conf); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public WriteMapper() { super(null); } |
| |
| public WriteMapper(Configuration conf) { super(conf); } |
| |
| public void configure(JobConf job) { |
| setConf(job); |
| fastCheck = job.getBoolean("fs.test.fastCheck", false); |
| } |
| |
| public void map(WritableComparable key, Writable value, |
| OutputCollector collector, Reporter reporter) |
| throws IOException { |
| String name = ((UTF8)key).toString(); |
| long size = ((LongWritable)value).get(); |
| long seed = Long.parseLong(name); |
| |
| random.setSeed(seed); |
| reporter.setStatus("creating " + name); |
| |
| // write to temp file initially to permit parallel execution |
| Path tempFile = new Path(DATA_DIR, name+suffix); |
| OutputStream out = fs.create(tempFile); |
| |
| long written = 0; |
| try { |
| while (written < size) { |
| if (fastCheck) { |
| Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE)); |
| } else { |
| random.nextBytes(buffer); |
| } |
| long remains = size - written; |
| int length = (remains<=buffer.length) ? (int)remains : buffer.length; |
| out.write(buffer, 0, length); |
| written += length; |
| reporter.setStatus("writing "+name+"@"+written+"/"+size); |
| } |
| } finally { |
| out.close(); |
| } |
| // rename to final location |
| fs.rename(tempFile, new Path(DATA_DIR, name)); |
| |
| collector.collect(new UTF8("bytes"), new LongWritable(written)); |
| |
| reporter.setStatus("wrote " + name); |
| } |
| |
| public void close() { |
| } |
| |
| } |
| |
| public static void writeTest(FileSystem fs, boolean fastCheck) |
| throws Exception { |
| |
| fs.delete(DATA_DIR); |
| fs.delete(WRITE_DIR); |
| |
| JobConf job = new JobConf(conf, TestFileSystem.class); |
| job.setBoolean("fs.test.fastCheck", fastCheck); |
| |
| job.setInputPath(CONTROL_DIR); |
| job.setInputFormat(SequenceFileInputFormat.class); |
| job.setInputKeyClass(UTF8.class); |
| job.setInputValueClass(LongWritable.class); |
| |
| job.setMapperClass(WriteMapper.class); |
| job.setReducerClass(LongSumReducer.class); |
| |
| job.setOutputPath(WRITE_DIR); |
| job.setOutputKeyClass(UTF8.class); |
| job.setOutputValueClass(LongWritable.class); |
| job.setNumReduceTasks(1); |
| JobClient.runJob(job); |
| } |
| |
| public static class ReadMapper extends Configured implements Mapper { |
| private Random random = new Random(); |
| private byte[] buffer = new byte[BUFFER_SIZE]; |
| private byte[] check = new byte[BUFFER_SIZE]; |
| private FileSystem fs; |
| private boolean fastCheck; |
| |
| { |
| try { |
| fs = FileSystem.get(conf); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public ReadMapper() { super(null); } |
| |
| public ReadMapper(Configuration conf) { super(conf); } |
| |
| public void configure(JobConf job) { |
| setConf(job); |
| fastCheck = job.getBoolean("fs.test.fastCheck", false); |
| } |
| |
| public void map(WritableComparable key, Writable value, |
| OutputCollector collector, Reporter reporter) |
| throws IOException { |
| String name = ((UTF8)key).toString(); |
| long size = ((LongWritable)value).get(); |
| long seed = Long.parseLong(name); |
| |
| random.setSeed(seed); |
| reporter.setStatus("opening " + name); |
| |
| DataInputStream in = |
| new DataInputStream(fs.open(new Path(DATA_DIR, name))); |
| |
| long read = 0; |
| try { |
| while (read < size) { |
| long remains = size - read; |
| int n = (remains<=buffer.length) ? (int)remains : buffer.length; |
| in.readFully(buffer, 0, n); |
| read += n; |
| if (fastCheck) { |
| Arrays.fill(check, (byte)random.nextInt(Byte.MAX_VALUE)); |
| } else { |
| random.nextBytes(check); |
| } |
| if (n != buffer.length) { |
| Arrays.fill(buffer, n, buffer.length, (byte)0); |
| Arrays.fill(check, n, check.length, (byte)0); |
| } |
| assertTrue(Arrays.equals(buffer, check)); |
| |
| reporter.setStatus("reading "+name+"@"+read+"/"+size); |
| |
| } |
| } finally { |
| in.close(); |
| } |
| |
| collector.collect(new UTF8("bytes"), new LongWritable(read)); |
| |
| reporter.setStatus("read " + name); |
| } |
| |
| public void close() { |
| } |
| |
| } |
| |
| public static void readTest(FileSystem fs, boolean fastCheck) |
| throws Exception { |
| |
| fs.delete(READ_DIR); |
| |
| JobConf job = new JobConf(conf, TestFileSystem.class); |
| job.setBoolean("fs.test.fastCheck", fastCheck); |
| |
| |
| job.setInputPath(CONTROL_DIR); |
| job.setInputFormat(SequenceFileInputFormat.class); |
| job.setInputKeyClass(UTF8.class); |
| job.setInputValueClass(LongWritable.class); |
| |
| job.setMapperClass(ReadMapper.class); |
| job.setReducerClass(LongSumReducer.class); |
| |
| job.setOutputPath(READ_DIR); |
| job.setOutputKeyClass(UTF8.class); |
| job.setOutputValueClass(LongWritable.class); |
| job.setNumReduceTasks(1); |
| JobClient.runJob(job); |
| } |
| |
| |
| public static class SeekMapper extends Configured implements Mapper { |
| private Random random = new Random(); |
| private byte[] check = new byte[BUFFER_SIZE]; |
| private FileSystem fs; |
| private boolean fastCheck; |
| |
| { |
| try { |
| fs = FileSystem.get(conf); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public SeekMapper() { super(null); } |
| |
| public SeekMapper(Configuration conf) { super(conf); } |
| |
| public void configure(JobConf job) { |
| setConf(job); |
| fastCheck = job.getBoolean("fs.test.fastCheck", false); |
| } |
| |
| public void map(WritableComparable key, Writable value, |
| OutputCollector collector, Reporter reporter) |
| throws IOException { |
| String name = ((UTF8)key).toString(); |
| long size = ((LongWritable)value).get(); |
| long seed = Long.parseLong(name); |
| |
| if (size == 0) return; |
| |
| reporter.setStatus("opening " + name); |
| |
| FSDataInputStream in = fs.open(new Path(DATA_DIR, name)); |
| |
| try { |
| for (int i = 0; i < SEEKS_PER_FILE; i++) { |
| // generate a random position |
| long position = Math.abs(random.nextLong()) % size; |
| |
| // seek file to that position |
| reporter.setStatus("seeking " + name); |
| in.seek(position); |
| byte b = in.readByte(); |
| |
| // check that byte matches |
| byte checkByte = 0; |
| // advance random state to that position |
| random.setSeed(seed); |
| for (int p = 0; p <= position; p+= check.length) { |
| reporter.setStatus("generating data for " + name); |
| if (fastCheck) { |
| checkByte = (byte)random.nextInt(Byte.MAX_VALUE); |
| } else { |
| random.nextBytes(check); |
| checkByte = check[(int)(position % check.length)]; |
| } |
| } |
| assertEquals(b, checkByte); |
| } |
| } finally { |
| in.close(); |
| } |
| } |
| |
| public void close() { |
| } |
| |
| } |
| |
| public static void seekTest(FileSystem fs, boolean fastCheck) |
| throws Exception { |
| |
| fs.delete(READ_DIR); |
| |
| JobConf job = new JobConf(conf, TestFileSystem.class); |
| job.setBoolean("fs.test.fastCheck", fastCheck); |
| |
| job.setInputPath(CONTROL_DIR); |
| job.setInputFormat(SequenceFileInputFormat.class); |
| job.setInputKeyClass(UTF8.class); |
| job.setInputValueClass(LongWritable.class); |
| |
| job.setMapperClass(SeekMapper.class); |
| job.setReducerClass(LongSumReducer.class); |
| |
| job.setOutputPath(READ_DIR); |
| job.setOutputKeyClass(UTF8.class); |
| job.setOutputValueClass(LongWritable.class); |
| job.setNumReduceTasks(1); |
| JobClient.runJob(job); |
| } |
| |
| |
| public static void main(String[] args) throws Exception { |
| int megaBytes = 10; |
| int files = 100; |
| boolean noRead = false; |
| boolean noWrite = false; |
| boolean noSeek = false; |
| boolean fastCheck = false; |
| long seed = new Random().nextLong(); |
| |
| String usage = "Usage: TestFileSystem -files N -megaBytes M [-noread] [-nowrite] [-noseek] [-fastcheck]"; |
| |
| if (args.length == 0) { |
| System.err.println(usage); |
| System.exit(-1); |
| } |
| for (int i = 0; i < args.length; i++) { // parse command line |
| if (args[i].equals("-files")) { |
| files = Integer.parseInt(args[++i]); |
| } else if (args[i].equals("-megaBytes")) { |
| megaBytes = Integer.parseInt(args[++i]); |
| } else if (args[i].equals("-noread")) { |
| noRead = true; |
| } else if (args[i].equals("-nowrite")) { |
| noWrite = true; |
| } else if (args[i].equals("-noseek")) { |
| noSeek = true; |
| } else if (args[i].equals("-fastcheck")) { |
| fastCheck = true; |
| } |
| } |
| |
| LOG.info("seed = "+seed); |
| LOG.info("files = " + files); |
| LOG.info("megaBytes = " + megaBytes); |
| |
| FileSystem fs = FileSystem.get(conf); |
| |
| if (!noWrite) { |
| createControlFile(fs, megaBytes*MEGA, files, seed); |
| writeTest(fs, fastCheck); |
| } |
| if (!noRead) { |
| readTest(fs, fastCheck); |
| } |
| if (!noSeek) { |
| seekTest(fs, fastCheck); |
| } |
| } |
| |
| } |