MAPREDUCE-2571. CombineFileInputFormat.getSplits throws a java.lang.ArrayStoreException. Contributed by Bochun Bai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/branches/branch-0.22@1133176 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 05c8a9c..485bd0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -581,6 +581,9 @@
MAPREDUCE-2487. ChainReducer uses MAPPER_BY_VALUE instead of
REDUCER_BY_VALUE. (Devaraj K via todd)
+ MAPREDUCE-2571. CombineFileInputFormat.getSplits throws a
+ java.lang.ArrayStoreException. (Bochun Bai via todd)
+
Release 0.21.1 - Unreleased
NEW FEATURES
diff --git a/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java b/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
index 3115a1f..96a30b0 100644
--- a/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
+++ b/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
@@ -68,7 +68,17 @@
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
- return super.getSplits(new Job(job)).toArray(new InputSplit[0]);
+ List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
+ super.getSplits(new Job(job));
+ InputSplit[] ret = new InputSplit[newStyleSplits.size()];
+ for(int pos = 0; pos < newStyleSplits.size(); ++pos) {
+ org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit =
+ (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos);
+ ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),
+ newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),
+ newStyleSplit.getLocations());
+ }
+ return ret;
}
/**
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestCombineFileInputFormat.java b/src/test/mapred/org/apache/hadoop/mapred/TestCombineFileInputFormat.java
new file mode 100644
index 0000000..e07577e
--- /dev/null
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestCombineFileInputFormat.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
+
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestCombineFileInputFormat {
+ private static final Log LOG =
+ LogFactory.getLog(TestCombineFileInputFormat.class.getName());
+
+ private static JobConf defaultConf = new JobConf();
+ private static FileSystem localFs = null;
+ static {
+ try {
+ defaultConf.set("fs.default.name", "file:///");
+ localFs = FileSystem.getLocal(defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestCombineFileInputFormat").makeQualified(localFs);
+
+ private static void writeFile(FileSystem fs, Path name,
+ String contents) throws IOException {
+ OutputStream stm;
+ stm = fs.create(name);
+ stm.write(contents.getBytes());
+ stm.close();
+ }
+
+ /**
+ * Test getSplits
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSplits() throws IOException {
+ JobConf job = new JobConf(defaultConf);
+ localFs.delete(workDir, true);
+ writeFile(localFs, new Path(workDir, "test.txt"),
+ "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
+ FileInputFormat.setInputPaths(job, workDir);
+ CombineFileInputFormat format = new CombineFileInputFormat() {
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ return new CombineFileRecordReader(job, (CombineFileSplit)split, reporter, CombineFileRecordReader.class);
+ }
+ };
+ final int SIZE_SPLITS = 1;
+ LOG.info("Trying to getSplits with splits = " + SIZE_SPLITS);
+ InputSplit[] splits = format.getSplits(job, SIZE_SPLITS);
+ LOG.info("Got getSplits = " + splits.length);
+ assertEquals("splits == " + SIZE_SPLITS, SIZE_SPLITS, splits.length);
+ }
+}