blob: 1fca5c982bf029ac19539089a000cdd8e045b088 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import com.google.common.collect.HashMultiset;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
public class TestCombineFileInputFormat {
private static final String rack1[] = new String[] {
"/r1"
};
private static final String hosts1[] = new String[] {
"host1.rack1.com"
};
private static final String rack2[] = new String[] {
"/r2"
};
private static final String hosts2[] = new String[] {
"host2.rack2.com"
};
private static final String rack3[] = new String[] {
"/r3"
};
private static final String hosts3[] = new String[] {
"host3.rack3.com"
};
final Path inDir = new Path("/racktesting");
final Path outputPath = new Path("/output");
final Path dir1 = new Path(inDir, "/dir1");
final Path dir2 = new Path(inDir, "/dir2");
final Path dir3 = new Path(inDir, "/dir3");
final Path dir4 = new Path(inDir, "/dir4");
final Path dir5 = new Path(inDir, "/dir5");
static final int BLOCKSIZE = 1024;
static final byte[] databuf = new byte[BLOCKSIZE];
@Mock
private List<String> mockList;
@Before
public void initMocks() {
MockitoAnnotations.initMocks(this);
}
private static final String DUMMY_FS_URI = "dummyfs:///";
/** Dummy class to extend CombineFileInputFormat*/
private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
@Override
public RecordReader<Text,Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return null;
}
}
/** Dummy class to extend CombineFileInputFormat. It allows
* non-existent files to be passed into the CombineFileInputFormat, allows
* for easy testing without having to create real files.
*/
private class DummyInputFormat1 extends DummyInputFormat {
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
Path[] files = getInputPaths(job);
List<FileStatus> results = new ArrayList<FileStatus>();
for (int i = 0; i < files.length; i++) {
Path p = files[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
results.add(fs.getFileStatus(p));
}
return results;
}
}
/** Dummy class to extend CombineFileInputFormat. It allows
* testing with files having missing blocks without actually removing replicas.
*/
public static class MissingBlockFileSystem extends DistributedFileSystem {
String fileWithMissingBlocks;
@Override
public void initialize(URI name, Configuration conf) throws IOException {
fileWithMissingBlocks = "";
super.initialize(name, conf);
}
@Override
public BlockLocation[] getFileBlockLocations(
FileStatus stat, long start, long len) throws IOException {
if (stat.isDir()) {
return null;
}
System.out.println("File " + stat.getPath());
String name = stat.getPath().toUri().getPath();
BlockLocation[] locs =
super.getFileBlockLocations(stat, start, len);
if (name.equals(fileWithMissingBlocks)) {
System.out.println("Returning missing blocks for " + fileWithMissingBlocks);
locs[0] = new HdfsBlockLocation(new BlockLocation(new String[0],
new String[0], locs[0].getOffset(), locs[0].getLength()), null);
}
return locs;
}
public void setFileWithMissingBlocks(String f) {
fileWithMissingBlocks = f;
}
}
private static final String DUMMY_KEY = "dummy.rr.key";
private static class DummyRecordReader extends RecordReader<Text, Text> {
private TaskAttemptContext context;
private CombineFileSplit s;
private int idx;
private boolean used;
public DummyRecordReader(CombineFileSplit split, TaskAttemptContext context,
Integer i) {
this.context = context;
this.idx = i;
this.s = split;
this.used = true;
}
/** @return a value specified in the context to check whether the
* context is properly updated by the initialize() method.
*/
public String getDummyConfVal() {
return this.context.getConfiguration().get(DUMMY_KEY);
}
public void initialize(InputSplit split, TaskAttemptContext context) {
this.context = context;
this.s = (CombineFileSplit) split;
// By setting used to true in the c'tor, but false in initialize,
// we can check that initialize() is always called before use
// (e.g., in testReinit()).
this.used = false;
}
public boolean nextKeyValue() {
boolean ret = !used;
this.used = true;
return ret;
}
public Text getCurrentKey() {
return new Text(this.context.getConfiguration().get(DUMMY_KEY));
}
public Text getCurrentValue() {
return new Text(this.s.getPath(idx).toString());
}
public float getProgress() {
return used ? 1.0f : 0.0f;
}
public void close() {
}
}
/** Extend CFIF to use CFRR with DummyRecordReader */
private class ChildRRInputFormat extends CombineFileInputFormat<Text, Text> {
@SuppressWarnings("unchecked")
@Override
public RecordReader<Text,Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader((CombineFileSplit) split, context,
(Class) DummyRecordReader.class);
}
}
@Test
public void testRecordReaderInit() throws InterruptedException, IOException {
// Test that we properly initialize the child recordreader when
// CombineFileInputFormat and CombineFileRecordReader are used.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf1 = new Configuration();
conf1.set(DUMMY_KEY, "STATE1");
TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1") };
long [] lengths = { 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context1);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// Verify that the initial configuration is the one being used.
// Right after construction the dummy key should have value "STATE1"
assertEquals("Invalid initial dummy key value", "STATE1",
rr.getCurrentKey().toString());
// Switch the active context for the RecordReader...
Configuration conf2 = new Configuration();
conf2.set(DUMMY_KEY, "STATE2");
TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId);
rr.initialize(split, context2);
// And verify that the new context is updated into the child record reader.
assertEquals("Invalid secondary dummy key value", "STATE2",
rr.getCurrentKey().toString());
}
@Test
public void testReinit() throws Exception {
// Test that a split containing multiple files works correctly,
// with the child RecordReader getting its initialize() method
// called a second time.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf = new Configuration();
TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1"), new Path("file2") };
long [] lengths = { 1, 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// first initialize() call comes from MapTask. We'll do it here.
rr.initialize(split, context);
// First value is first filename.
assertTrue(rr.nextKeyValue());
assertEquals("file1", rr.getCurrentValue().toString());
// The inner RR will return false, because it only emits one (k, v) pair.
// But there's another sub-split to process. This returns true to us.
assertTrue(rr.nextKeyValue());
// And the 2nd rr will have its initialize method called correctly.
assertEquals("file2", rr.getCurrentValue().toString());
// But after both child RR's have returned their singleton (k, v), this
// should also return false.
assertFalse(rr.nextKeyValue());
}
/**
* For testing each split has the expected name, length, and offset.
*/
private final class Split {
private String name;
private long length;
private long offset;
public Split(String name, long length, long offset) {
this.name = name;
this.length = length;
this.offset = offset;
}
public String getName() {
return name;
}
public long getLength() {
return length;
}
public long getOffset() {
return offset;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Split) {
Split split = ((Split) obj);
return split.name.equals(name) && split.length == length
&& split.offset == offset;
}
return false;
}
}
/**
* The test suppresses unchecked warnings in
* {@link org.mockito.Mockito#reset}. Although calling the method is
* a bad manner, we call the method instead of splitting the test
* (i.e. restarting MiniDFSCluster) to save time.
*/
@Test
@SuppressWarnings("unchecked")
public void testSplitPlacement() throws Exception {
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
try {
/* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
* 1) file1 and file5, just after starting the datanode on r1, with
* a repl factor of 1, and,
* 2) file2, just after starting the datanode on r2, with
* a repl factor of 2, and,
* 3) file3, file4 after starting the all three datanodes, with a repl
* factor of 3.
* At the end, file1, file5 will be present on only datanode1, file2 will
* be present on datanode 1 and datanode2 and
* file3, file4 will be present on all datanodes.
*/
Configuration conf = new Configuration();
conf.setBoolean("dfs.replication.considerLoad", false);
dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
.build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
if (!fileSys.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
Path file1 = new Path(dir1 + "/file1");
writeFile(conf, file1, (short) 1, 1);
// create another file on the same datanode
Path file5 = new Path(dir5 + "/file5");
writeFile(conf, file5, (short) 1, 1);
// split it using a CombinedFile input format
DummyInputFormat inFormat = new DummyInputFormat();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
List<InputSplit> splits = inFormat.getSplits(job);
System.out.println("Made splits(Test0): " + splits.size());
for (InputSplit split : splits) {
System.out.println("File split(Test0): " + split);
}
assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(file5.getName(), fileSplit.getPath(1).getName());
assertEquals(0, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
dfs.waitActive();
// create file on two datanodes.
Path file2 = new Path(dir2 + "/file2");
writeFile(conf, file2, (short) 2, 2);
// split it using a CombinedFile input format
inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
splits = inFormat.getSplits(job);
System.out.println("Made splits(Test1): " + splits.size());
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
/**
* If rack1 is processed first by
* {@link CombineFileInputFormat#createSplits},
* create only one split on rack1. Otherwise create two splits.
*/
if (splits.size() == 2) {
// first split is on rack2, contains file2
if (split.equals(splits.get(0))) {
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(file2.getName(), fileSplit.getPath(1).getName());
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
// second split is on rack1, contains file1
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 1) {
// first split is on rack1, contains file1 and file2.
assertEquals(3, fileSplit.getNumPaths());
Set<Split> expected = new HashSet<>();
expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
List<Split> actual = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
assertTrue(actual.containsAll(expected));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
} else {
fail("Expected split size is 1 or 2, but actual size is "
+ splits.size());
}
}
// create another file on 3 datanodes and 3 racks.
dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
dfs.waitActive();
Path file3 = new Path(dir3 + "/file3");
writeFile(conf, new Path(dir3 + "/file3"), (short) 3, 3);
inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test2): " + split);
}
Set<Split> expected = new HashSet<>();
expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
List<Split> actual = new ArrayList<>();
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
/**
* If rack1 is processed first by
* {@link CombineFileInputFormat#createSplits},
* create only one split on rack1.
* If rack2 or rack3 is processed first and rack1 is processed second,
* create one split on rack2 or rack3 and the other split is on rack1.
* Otherwise create 3 splits for each rack.
*/
if (splits.size() == 3) {
// first split is on rack3, contains file3
if (split.equals(splits.get(0))) {
assertEquals(3, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file3.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(file3.getName(), fileSplit.getPath(1).getName());
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(file3.getName(), fileSplit.getPath(2).getName());
assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
assertEquals(BLOCKSIZE, fileSplit.getLength(2));
assertEquals(hosts3[0], fileSplit.getLocations()[0]);
}
// second split is on rack2, contains file2
if (split.equals(splits.get(1))) {
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(file2.getName(), fileSplit.getPath(1).getName());
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
// third split is on rack1, contains file1
if (split.equals(splits.get(2))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 2) {
// first split is on rack2 or rack3, contains one or two files.
if (split.equals(splits.get(0))) {
assertEquals(1, fileSplit.getLocations().length);
if (fileSplit.getLocations()[0].equals(hosts2[0])) {
assertEquals(2, fileSplit.getNumPaths());
} else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
assertEquals(3, fileSplit.getNumPaths());
} else {
fail("First split should be on rack2 or rack3.");
}
}
// second split is on rack1, contains the rest files.
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 1) {
// first split is rack1, contains all three files.
assertEquals(1, fileSplit.getLocations().length);
assertEquals(6, fileSplit.getNumPaths());
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
} else {
fail("Split size should be 1, 2, or 3.");
}
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
}
assertEquals(6, actual.size());
assertTrue(actual.containsAll(expected));
// create file4 on all three racks
Path file4 = new Path(dir4 + "/file4");
writeFile(conf, file4, (short)3, 3);
inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test3): " + split);
}
expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
actual.clear();
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
/**
* If rack1 is processed first by
* {@link CombineFileInputFormat#createSplits},
* create only one split on rack1.
* If rack2 or rack3 is processed first and rack1 is processed second,
* create one split on rack2 or rack3 and the other split is on rack1.
* Otherwise create 3 splits for each rack.
*/
if (splits.size() == 3) {
// first split is on rack3, contains file3 and file4
if (split.equals(splits.get(0))) {
assertEquals(6, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts3[0], fileSplit.getLocations()[0]);
}
// second split is on rack2, contains file2
if (split.equals(splits.get(1))) {
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(file2.getName(), fileSplit.getPath(1).getName());
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
// third split is on rack1, contains file1
if (split.equals(splits.get(2))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 2) {
// first split is on rack2 or rack3, contains two or three files.
if (split.equals(splits.get(0))) {
assertEquals(1, fileSplit.getLocations().length);
if (fileSplit.getLocations()[0].equals(hosts2[0])) {
assertEquals(5, fileSplit.getNumPaths());
} else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
assertEquals(6, fileSplit.getNumPaths());
} else {
fail("First split should be on rack2 or rack3.");
}
}
// second split is on rack1, contains the rest files.
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 1) {
// first split is rack1, contains all four files.
assertEquals(1, fileSplit.getLocations().length);
assertEquals(9, fileSplit.getNumPaths());
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
} else {
fail("Split size should be 1, 2, or 3.");
}
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
}
assertEquals(9, actual.size());
assertTrue(actual.containsAll(expected));
// maximum split size is 2 blocks
inFormat = new DummyInputFormat();
inFormat.setMinSplitSizeNode(BLOCKSIZE);
inFormat.setMaxSplitSize(2*BLOCKSIZE);
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test4): " + split);
}
assertEquals(5, splits.size());
actual.clear();
reset(mockList);
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
mockList.add(fileSplit.getLocations()[0]);
}
assertEquals(9, actual.size());
assertTrue(actual.containsAll(expected));
// verify the splits are on all the racks
verify(mockList, atLeastOnce()).add(hosts1[0]);
verify(mockList, atLeastOnce()).add(hosts2[0]);
verify(mockList, atLeastOnce()).add(hosts3[0]);
// maximum split size is 3 blocks
inFormat = new DummyInputFormat();
inFormat.setMinSplitSizeNode(BLOCKSIZE);
inFormat.setMaxSplitSize(3*BLOCKSIZE);
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test5): " + split);
}
assertEquals(3, splits.size());
actual.clear();
reset(mockList);
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
mockList.add(fileSplit.getLocations()[0]);
}
assertEquals(9, actual.size());
assertTrue(actual.containsAll(expected));
verify(mockList, atLeastOnce()).add(hosts1[0]);
verify(mockList, atLeastOnce()).add(hosts2[0]);
// maximum split size is 4 blocks
inFormat = new DummyInputFormat();
inFormat.setMaxSplitSize(4*BLOCKSIZE);
FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test6): " + split);
}
assertEquals(3, splits.size());
actual.clear();
reset(mockList);
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
mockList.add(fileSplit.getLocations()[0]);
}
assertEquals(9, actual.size());
assertTrue(actual.containsAll(expected));
verify(mockList, atLeastOnce()).add(hosts1[0]);
// maximum split size is 7 blocks and min is 3 blocks
inFormat = new DummyInputFormat();
inFormat.setMaxSplitSize(7*BLOCKSIZE);
inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test7): " + split);
}
assertEquals(2, splits.size());
actual.clear();
reset(mockList);
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
mockList.add(fileSplit.getLocations()[0]);
}
assertEquals(9, actual.size());
assertTrue(actual.containsAll(expected));
verify(mockList, atLeastOnce()).add(hosts1[0]);
// Rack 1 has file1, file2 and file3 and file4
// Rack 2 has file2 and file3 and file4
// Rack 3 has file3 and file4
// setup a filter so that only (file1 and file2) or (file3 and file4)
// can be combined
inFormat = new DummyInputFormat();
FileInputFormat.addInputPath(job, inDir);
inFormat.setMinSplitSizeRack(1); // everything is at least rack local
inFormat.createPool(new TestFilter(dir1),
new TestFilter(dir2));
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
if (splits.size() == 2) {
// first split is on rack1, contains file1 and file2.
if (split.equals(splits.get(0))) {
assertEquals(3, fileSplit.getNumPaths());
expected.clear();
expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
actual.clear();
for (int i = 0; i < 3; i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
assertTrue(actual.containsAll(expected));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
if (split.equals(splits.get(1))) {
// second split contains the file3 and file4, however,
// the locations is undetermined.
assertEquals(6, fileSplit.getNumPaths());
expected.clear();
expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
actual.clear();
for (int i = 0; i < 6; i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
assertTrue(actual.containsAll(expected));
assertEquals(1, fileSplit.getLocations().length);
}
} else if (splits.size() == 3) {
if (split.equals(splits.get(0))) {
// first split is on rack2, contains file2
assertEquals(2, fileSplit.getNumPaths());
expected.clear();
expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
actual.clear();
for (int i = 0; i < 2; i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
assertTrue(actual.containsAll(expected));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
if (split.equals(splits.get(1))) {
// second split is on rack1, contains file1
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(0, fileSplit.getOffset(0));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
if (split.equals(splits.get(2))) {
// third split contains file3 and file4, however,
// the locations is undetermined.
assertEquals(6, fileSplit.getNumPaths());
expected.clear();
expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
actual.clear();
for (int i = 0; i < 6; i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
assertTrue(actual.containsAll(expected));
assertEquals(1, fileSplit.getLocations().length);
}
} else {
fail("Split size should be 2 or 3.");
}
}
// measure performance when there are multiple pools and
// many files in each pool.
int numPools = 100;
int numFiles = 1000;
DummyInputFormat1 inFormat1 = new DummyInputFormat1();
for (int i = 0; i < numFiles; i++) {
FileInputFormat.setInputPaths(job, file1);
}
inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
final Path dirNoMatch1 = new Path(inDir, "/dirxx");
final Path dirNoMatch2 = new Path(inDir, "/diryy");
for (int i = 0; i < numPools; i++) {
inFormat1.createPool(new TestFilter(dirNoMatch1),
new TestFilter(dirNoMatch2));
}
long start = System.currentTimeMillis();
splits = inFormat1.getSplits(job);
long end = System.currentTimeMillis();
System.out.println("Elapsed time for " + numPools + " pools " +
" and " + numFiles + " files is " +
((end - start)/1000) + " seconds.");
// This file has three whole blocks. If the maxsplit size is
// half the block size, then there should be six splits.
inFormat = new DummyInputFormat();
inFormat.setMaxSplitSize(BLOCKSIZE/2);
FileInputFormat.setInputPaths(job, dir3);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test8): " + split);
}
assertEquals(splits.size(), 6);
} finally {
if (dfs != null) {
dfs.shutdown();
}
}
}
static void writeFile(Configuration conf, Path name,
short replication, int numBlocks)
throws IOException, TimeoutException, InterruptedException {
FileSystem fileSys = FileSystem.get(conf);
FSDataOutputStream stm = fileSys.create(name, true,
conf.getInt("io.file.buffer.size", 4096),
replication, (long)BLOCKSIZE);
writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
}
// Creates the gzip file and return the FileStatus
static FileStatus writeGzipFile(Configuration conf, Path name,
short replication, int numBlocks)
throws IOException, TimeoutException, InterruptedException {
FileSystem fileSys = FileSystem.get(conf);
GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
.getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
return fileSys.getFileStatus(name);
}
private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
OutputStream out, short replication, int numBlocks)
throws IOException, TimeoutException, InterruptedException {
for (int i = 0; i < numBlocks; i++) {
out.write(databuf);
}
out.close();
DFSTestUtil.waitReplication(fileSys, name, replication);
}
@Test
public void testNodeDistribution() throws IOException, InterruptedException {
DummyInputFormat inFormat = new DummyInputFormat();
int numBlocks = 60;
long totLength = 0;
long blockSize = 100;
int numNodes = 10;
long minSizeNode = 50;
long minSizeRack = 50;
int maxSplitSize = 200; // 4 blocks per split.
String[] locations = new String[numNodes];
for (int i = 0; i < numNodes; i++) {
locations[i] = "h" + i;
}
String[] racks = new String[0];
Path path = new Path("hdfs://file");
OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
int hostCountBase = 0;
// Generate block list. Replication 3 per block.
for (int i = 0; i < numBlocks; i++) {
int localHostCount = hostCountBase;
String[] blockHosts = new String[3];
for (int j = 0; j < 3; j++) {
int hostNum = localHostCount % numNodes;
blockHosts[j] = "h" + hostNum;
localHostCount++;
}
hostCountBase++;
blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts,
racks);
totLength += blockSize;
}
List<InputSplit> splits = new ArrayList<InputSplit>();
HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>();
OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);
inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
maxSplitSize, minSizeNode, minSizeRack, splits);
int expectedSplitCount = (int) (totLength / maxSplitSize);
assertEquals(expectedSplitCount, splits.size());
// Ensure 90+% of the splits have node local blocks.
// 100% locality may not always be achieved.
int numLocalSplits = 0;
for (InputSplit inputSplit : splits) {
assertEquals(maxSplitSize, inputSplit.getLength());
if (inputSplit.getLocations().length == 1) {
numLocalSplits++;
}
}
assertTrue(numLocalSplits >= 0.9 * splits.size());
}
@Test
public void testNodeInputSplit() throws IOException, InterruptedException {
// Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on
// both nodes. The grouping ensures that both nodes get splits instead of
// just the first node
DummyInputFormat inFormat = new DummyInputFormat();
int numBlocks = 12;
long totLength = 0;
long blockSize = 100;
long maxSize = 200;
long minSizeNode = 50;
long minSizeRack = 50;
String[] locations = { "h1", "h2" };
String[] racks = new String[0];
Path path = new Path("hdfs://file");
OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
for(int i=0; i<numBlocks; ++i) {
blocks[i] = new OneBlockInfo(path, i*blockSize, blockSize, locations, racks);
totLength += blockSize;
}
List<InputSplit> splits = new ArrayList<InputSplit>();
HashMap<String, Set<String>> rackToNodes =
new HashMap<String, Set<String>>();
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);
inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
maxSize, minSizeNode, minSizeRack, splits);
int expectedSplitCount = (int)(totLength/maxSize);
assertEquals(expectedSplitCount, splits.size());
HashMultiset<String> nodeSplits = HashMultiset.create();
for(int i=0; i<expectedSplitCount; ++i) {
InputSplit inSplit = splits.get(i);
assertEquals(maxSize, inSplit.getLength());
assertEquals(1, inSplit.getLocations().length);
nodeSplits.add(inSplit.getLocations()[0]);
}
assertEquals(3, nodeSplits.count(locations[0]));
assertEquals(3, nodeSplits.count(locations[1]));
}
/**
* The test suppresses unchecked warnings in
* {@link org.mockito.Mockito#reset}. Although calling the method is
* a bad manner, we call the method instead of splitting the test
* (i.e. restarting MiniDFSCluster) to save time.
*/
@Test
@SuppressWarnings("unchecked")
public void testSplitPlacementForCompressedFiles() throws Exception {
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
try {
/* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
* files
* 1) file1 and file5, just after starting the datanode on r1, with
* a repl factor of 1, and,
* 2) file2, just after starting the datanode on r2, with
* a repl factor of 2, and,
* 3) file3, file4 after starting the all three datanodes, with a repl
* factor of 3.
* At the end, file1, file5 will be present on only datanode1, file2 will
* be present on datanode 1 and datanode2 and
* file3, file4 will be present on all datanodes.
*/
Configuration conf = new Configuration();
conf.setBoolean("dfs.replication.considerLoad", false);
dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
.build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
if (!fileSys.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
Path file1 = new Path(dir1 + "/file1.gz");
FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
// create another file on the same datanode
Path file5 = new Path(dir5 + "/file5.gz");
FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
// split it using a CombinedFile input format
DummyInputFormat inFormat = new DummyInputFormat();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
List<InputSplit> splits = inFormat.getSplits(job);
System.out.println("Made splits(Test0): " + splits.size());
for (InputSplit split : splits) {
System.out.println("File split(Test0): " + split);
}
assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(f1.getLen(), fileSplit.getLength(0));
assertEquals(file5.getName(), fileSplit.getPath(1).getName());
assertEquals(0, fileSplit.getOffset(1));
assertEquals(f5.getLen(), fileSplit.getLength(1));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
dfs.waitActive();
// create file on two datanodes.
Path file2 = new Path(dir2 + "/file2.gz");
FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
// split it using a CombinedFile input format
inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
inFormat.setMinSplitSizeRack(f1.getLen());
splits = inFormat.getSplits(job);
System.out.println("Made splits(Test1): " + splits.size());
// make sure that each split has different locations
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
Set<Split> expected = new HashSet<>();
expected.add(new Split(file1.getName(), f1.getLen(), 0));
expected.add(new Split(file2.getName(), f2.getLen(), 0));
List<Split> actual = new ArrayList<>();
/**
* If rack1 is processed first by
* {@link CombineFileInputFormat#createSplits},
* create only one split on rack1. Otherwise create two splits.
*/
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
if (splits.size() == 2) {
if (split.equals(splits.get(0))) {
// first split is on rack2, contains file2.
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(f2.getLen(), fileSplit.getLength(0));
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
if (split.equals(splits.get(1))) {
// second split is on rack1, contains file1.
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(f1.getLen(), fileSplit.getLength(0));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 1) {
// first split is on rack1, contains file1 and file2.
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
} else {
fail("Split size should be 1 or 2.");
}
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
}
assertEquals(2, actual.size());
assertTrue(actual.containsAll(expected));
// create another file on 3 datanodes and 3 racks.
dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
dfs.waitActive();
Path file3 = new Path(dir3 + "/file3.gz");
FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
inFormat.setMinSplitSizeRack(f1.getLen());
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test2): " + split);
}
expected.add(new Split(file3.getName(), f3.getLen(), 0));
actual.clear();
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
/**
* If rack1 is processed first by
* {@link CombineFileInputFormat#createSplits},
* create only one split on rack1.
* If rack2 or rack3 is processed first and rack1 is processed second,
* create one split on rack2 or rack3 and the other split is on rack1.
* Otherwise create 3 splits for each rack.
*/
if (splits.size() == 3) {
// first split is on rack3, contains file3
if (split.equals(splits.get(0))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file3.getName(), fileSplit.getPath(0).getName());
assertEquals(f3.getLen(), fileSplit.getLength(0));
assertEquals(0, fileSplit.getOffset(0));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts3[0], fileSplit.getLocations()[0]);
}
// second split is on rack2, contains file2
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(f2.getLen(), fileSplit.getLength(0));
assertEquals(0, fileSplit.getOffset(0));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
// third split is on rack1, contains file1
if (split.equals(splits.get(2))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(f1.getLen(), fileSplit.getLength(0));
assertEquals(0, fileSplit.getOffset(0));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 2) {
// first split is on rack2 or rack3, contains one or two files.
if (split.equals(splits.get(0))) {
assertEquals(1, fileSplit.getLocations().length);
if (fileSplit.getLocations()[0].equals(hosts2[0])) {
assertEquals(2, fileSplit.getNumPaths());
} else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
assertEquals(1, fileSplit.getNumPaths());
} else {
fail("First split should be on rack2 or rack3.");
}
}
// second split is on rack1, contains the rest files.
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 1) {
// first split is rack1, contains all three files.
assertEquals(1, fileSplit.getLocations().length);
assertEquals(3, fileSplit.getNumPaths());
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
} else {
fail("Split size should be 1, 2, or 3.");
}
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
}
assertEquals(3, actual.size());
assertTrue(actual.containsAll(expected));
// create file4 on all three racks
Path file4 = new Path(dir4 + "/file4.gz");
FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
inFormat.setMinSplitSizeRack(f1.getLen());
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test3): " + split);
}
expected.add(new Split(file3.getName(), f3.getLen(), 0));
actual.clear();
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
/**
* If rack1 is processed first by
* {@link CombineFileInputFormat#createSplits},
* create only one split on rack1.
* If rack2 or rack3 is processed first and rack1 is processed second,
* create one split on rack2 or rack3 and the other split is on rack1.
* Otherwise create 3 splits for each rack.
*/
if (splits.size() == 3) {
// first split is on rack3, contains file3 and file4
if (split.equals(splits.get(0))) {
assertEquals(2, fileSplit.getNumPaths());
assertEquals(hosts3[0], fileSplit.getLocations()[0]);
}
// second split is on rack2, contains file2
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(f2.getLen(), fileSplit.getLength(0));
assertEquals(0, fileSplit.getOffset(0));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
// third split is on rack1, contains file1
if (split.equals(splits.get(2))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(f1.getLen(), fileSplit.getLength(0));
assertEquals(0, fileSplit.getOffset(0));
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 2) {
// first split is on rack2 or rack3, contains two or three files.
if (split.equals(splits.get(0))) {
assertEquals(1, fileSplit.getLocations().length);
if (fileSplit.getLocations()[0].equals(hosts2[0])) {
assertEquals(3, fileSplit.getNumPaths());
} else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
assertEquals(2, fileSplit.getNumPaths());
} else {
fail("First split should be on rack2 or rack3.");
}
}
// second split is on rack1, contains the rest files.
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 1) {
// first split is rack1, contains all four files.
assertEquals(1, fileSplit.getLocations().length);
assertEquals(4, fileSplit.getNumPaths());
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
} else {
fail("Split size should be 1, 2, or 3.");
}
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
}
assertEquals(4, actual.size());
assertTrue(actual.containsAll(expected));
// maximum split size is file1's length
inFormat = new DummyInputFormat();
inFormat.setMinSplitSizeNode(f1.getLen());
inFormat.setMaxSplitSize(f1.getLen());
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test4): " + split);
}
assertEquals(4, splits.size());
actual.clear();
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
mockList.add(fileSplit.getLocations()[0]);
}
assertEquals(4, actual.size());
assertTrue(actual.containsAll(expected));
verify(mockList, atLeastOnce()).add(hosts1[0]);
verify(mockList, atLeastOnce()).add(hosts2[0]);
verify(mockList, atLeastOnce()).add(hosts3[0]);
// maximum split size is twice file1's length
inFormat = new DummyInputFormat();
inFormat.setMinSplitSizeNode(f1.getLen());
inFormat.setMaxSplitSize(2 * f1.getLen());
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test5): " + split);
}
actual.clear();
reset(mockList);
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
mockList.add(fileSplit.getLocations()[0]);
}
assertEquals(4, actual.size());
assertTrue(actual.containsAll(expected));
if (splits.size() == 3) {
// splits are on all the racks
verify(mockList, times(1)).add(hosts1[0]);
verify(mockList, times(1)).add(hosts2[0]);
verify(mockList, times(1)).add(hosts3[0]);
} else if (splits.size() == 2) {
// one split is on rack1, another split is on rack2 or rack3
verify(mockList, times(1)).add(hosts1[0]);
} else {
fail("Split size should be 2 or 3.");
}
// maximum split size is 4 times file1's length
inFormat = new DummyInputFormat();
inFormat.setMinSplitSizeNode(2 * f1.getLen());
inFormat.setMaxSplitSize(4 * f1.getLen());
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test6): " + split);
}
/**
* If rack1 is processed first by
* {@link CombineFileInputFormat#createSplits},
* create only one split on rack1. Otherwise create two splits.
*/
assertTrue("Split size should be 1 or 2.",
splits.size() == 1 || splits.size() == 2);
actual.clear();
reset(mockList);
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
mockList.add(fileSplit.getLocations()[0]);
}
assertEquals(4, actual.size());
assertTrue(actual.containsAll(expected));
verify(mockList, times(1)).add(hosts1[0]);
// maximum split size and min-split-size per rack is 4 times file1's length
inFormat = new DummyInputFormat();
inFormat.setMaxSplitSize(4 * f1.getLen());
inFormat.setMinSplitSizeRack(4 * f1.getLen());
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test7): " + split);
}
assertEquals(1, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(4, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
// minimum split size per node is 4 times file1's length
inFormat = new DummyInputFormat();
inFormat.setMinSplitSizeNode(4 * f1.getLen());
FileInputFormat.setInputPaths(job,
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test8): " + split);
}
assertEquals(1, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(4, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
// Rack 1 has file1, file2 and file3 and file4
// Rack 2 has file2 and file3 and file4
// Rack 3 has file3 and file4
// setup a filter so that only file1 and file2 can be combined
inFormat = new DummyInputFormat();
FileInputFormat.addInputPath(job, inDir);
inFormat.setMinSplitSizeRack(1); // everything is at least rack local
inFormat.createPool(new TestFilter(dir1),
new TestFilter(dir2));
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test9): " + split);
}
actual.clear();
for (InputSplit split : splits) {
fileSplit = (CombineFileSplit) split;
if (splits.size() == 3) {
// If rack2 is processed first
if (split.equals(splits.get(0))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts2[0], fileSplit.getLocations()[0]);
}
if (split.equals(splits.get(1))) {
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
if (split.equals(splits.get(2))) {
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts3[0], fileSplit.getLocations()[0]);
}
} else if (splits.size() == 2) {
// If rack1 is processed first
if (split.equals(splits.get(0))) {
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
}
if (split.equals(splits.get(1))) {
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(hosts3[0], fileSplit.getLocations()[0]);
}
} else {
fail("Split size should be 2 or 3.");
}
for (int i = 0; i < fileSplit.getNumPaths(); i++) {
String name = fileSplit.getPath(i).getName();
long length = fileSplit.getLength(i);
long offset = fileSplit.getOffset(i);
actual.add(new Split(name, length, offset));
}
}
assertEquals(4, actual.size());
assertTrue(actual.containsAll(expected));
// measure performance when there are multiple pools and
// many files in each pool.
int numPools = 100;
int numFiles = 1000;
DummyInputFormat1 inFormat1 = new DummyInputFormat1();
for (int i = 0; i < numFiles; i++) {
FileInputFormat.setInputPaths(job, file1);
}
inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
final Path dirNoMatch1 = new Path(inDir, "/dirxx");
final Path dirNoMatch2 = new Path(inDir, "/diryy");
for (int i = 0; i < numPools; i++) {
inFormat1.createPool(new TestFilter(dirNoMatch1),
new TestFilter(dirNoMatch2));
}
long start = System.currentTimeMillis();
splits = inFormat1.getSplits(job);
long end = System.currentTimeMillis();
System.out.println("Elapsed time for " + numPools + " pools " +
" and " + numFiles + " files is " +
((end - start)) + " milli seconds.");
} finally {
if (dfs != null) {
dfs.shutdown();
}
}
}
/**
* Test that CFIF can handle missing blocks.
*/
@Test
public void testMissingBlocks() throws Exception {
String namenode = null;
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
String testName = "testMissingBlocks";
try {
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", MissingBlockFileSystem.class.getName());
conf.setBoolean("dfs.replication.considerLoad", false);
dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
.build();
dfs.waitActive();
namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
(dfs.getFileSystem()).getUri().getPort();
fileSys = dfs.getFileSystem();
if (!fileSys.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
Path file1 = new Path(dir1 + "/file1");
writeFile(conf, file1, (short)1, 1);
// create another file on the same datanode
Path file5 = new Path(dir5 + "/file5");
writeFile(conf, file5, (short)1, 1);
((MissingBlockFileSystem)fileSys).setFileWithMissingBlocks(file1.toUri().getPath());
// split it using a CombinedFile input format
DummyInputFormat inFormat = new DummyInputFormat();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
List<InputSplit> splits = inFormat.getSplits(job);
System.out.println("Made splits(Test0): " + splits.size());
for (InputSplit split : splits) {
System.out.println("File split(Test0): " + split);
}
assertEquals(splits.size(), 1);
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
assertEquals(file5.getName(), fileSplit.getPath(1).getName());
assertEquals(0, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
} finally {
if (dfs != null) {
dfs.shutdown();
}
}
}
/**
* Test when the input file's length is 0.
*/
@Test
public void testForEmptyFile() throws Exception {
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
Path file = new Path("test" + "/file");
FSDataOutputStream out = fileSys.create(file, true,
conf.getInt("io.file.buffer.size", 4096), (short) 1, (long) BLOCKSIZE);
out.write(new byte[0]);
out.close();
// split it using a CombinedFile input format
DummyInputFormat inFormat = new DummyInputFormat();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, "test");
List<InputSplit> splits = inFormat.getSplits(job);
assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(0, fileSplit.getLength(0));
fileSys.delete(file.getParent(), true);
}
/**
* Test that directories do not get included as part of getSplits()
*/
@Test
public void testGetSplitsWithDirectory() throws Exception {
MiniDFSCluster dfs = null;
try {
Configuration conf = new Configuration();
dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
.build();
dfs.waitActive();
FileSystem fileSys = dfs.getFileSystem();
// Set up the following directory structure:
// /dir1/: directory
// /dir1/file: regular file
// /dir1/dir2/: directory
Path dir1 = new Path("/dir1");
Path file = new Path("/dir1/file1");
Path dir2 = new Path("/dir1/dir2");
if (!fileSys.mkdirs(dir1)) {
throw new IOException("Mkdirs failed to create " + dir1.toString());
}
FSDataOutputStream out = fileSys.create(file);
out.write(new byte[0]);
out.close();
if (!fileSys.mkdirs(dir2)) {
throw new IOException("Mkdirs failed to create " + dir2.toString());
}
// split it using a CombinedFile input format
DummyInputFormat inFormat = new DummyInputFormat();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, "/dir1");
List<InputSplit> splits = inFormat.getSplits(job);
// directories should be omitted from getSplits() - we should only see file1 and not dir2
assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(0, fileSplit.getLength(0));
} finally {
if (dfs != null) {
dfs.shutdown();
}
}
}
/**
* Test when input files are from non-default file systems
*/
@Test
public void testForNonDefaultFileSystem() throws Throwable {
Configuration conf = new Configuration();
// use a fake file system scheme as default
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
// default fs path
assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
// add a local file
String localPathRoot = System.getProperty("test.build.data",
"build/test/data");
Path localPath = new Path(localPathRoot, "testFile1");
FileSystem lfs = FileSystem.getLocal(conf);
FSDataOutputStream dos = lfs.create(localPath);
dos.writeChars("Local file for CFIF");
dos.close();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
DummyInputFormat inFormat = new DummyInputFormat();
List<InputSplit> splits = inFormat.getSplits(job);
assertTrue(splits.size() > 0);
for (InputSplit s : splits) {
CombineFileSplit cfs = (CombineFileSplit)s;
for (Path p : cfs.getPaths()) {
assertEquals(p.toUri().getScheme(), "file");
}
}
}
static class TestFilter implements PathFilter {
private Path p;
// store a path prefix in this TestFilter
public TestFilter(Path p) {
this.p = p;
}
// returns true if the specified path matches the prefix stored
// in this TestFilter.
public boolean accept(Path path) {
if (path.toUri().getPath().indexOf(p.toString()) == 0) {
return true;
}
return false;
}
public String toString() {
return "PathFilter:" + p;
}
}
/*
* Prints out the input splits for the specified files
*/
private void splitRealFiles(String[] args) throws IOException {
Configuration conf = new Configuration();
Job job = Job.getInstance();
FileSystem fs = FileSystem.get(conf);
if (!(fs instanceof DistributedFileSystem)) {
throw new IOException("Wrong file system: " + fs.getClass().getName());
}
long blockSize = fs.getDefaultBlockSize();
DummyInputFormat inFormat = new DummyInputFormat();
for (int i = 0; i < args.length; i++) {
FileInputFormat.addInputPaths(job, args[i]);
}
inFormat.setMinSplitSizeRack(blockSize);
inFormat.setMaxSplitSize(10 * blockSize);
List<InputSplit> splits = inFormat.getSplits(job);
System.out.println("Total number of splits " + splits.size());
for (int i = 0; i < splits.size(); ++i) {
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(i);
System.out.println("Split[" + i + "] " + fileSplit);
}
}
public static void main(String[] args) throws Exception{
// if there are some parameters specified, then use those paths
if (args.length != 0) {
TestCombineFileInputFormat test = new TestCombineFileInputFormat();
test.splitRealFiles(args);
} else {
TestCombineFileInputFormat test = new TestCombineFileInputFormat();
test.testSplitPlacement();
}
}
}