MAPREDUCE-2571. CombineFileInputFormat.getSplits throws a java.lang.ArrayStoreException. Contributed by Bochun Bai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/branches/branch-0.22@1133176 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 05c8a9c..485bd0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -581,6 +581,9 @@
     MAPREDUCE-2487. ChainReducer uses MAPPER_BY_VALUE instead of
     REDUCER_BY_VALUE. (Devaraj K via todd)
 
+    MAPREDUCE-2571. CombineFileInputFormat.getSplits throws a
+    java.lang.ArrayStoreException. (Bochun Bai via todd)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES
diff --git a/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java b/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
index 3115a1f..96a30b0 100644
--- a/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
+++ b/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
@@ -68,7 +68,17 @@
 
   public InputSplit[] getSplits(JobConf job, int numSplits) 
     throws IOException {
-    return super.getSplits(new Job(job)).toArray(new InputSplit[0]);
+    List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
+      super.getSplits(new Job(job));
+    InputSplit[] ret = new InputSplit[newStyleSplits.size()];
+    for(int pos = 0; pos < newStyleSplits.size(); ++pos) {
+      org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit = 
+        (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos);
+      ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),
+        newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),
+        newStyleSplit.getLocations());
+    }
+    return ret;
   }
   
   /**
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestCombineFileInputFormat.java b/src/test/mapred/org/apache/hadoop/mapred/TestCombineFileInputFormat.java
new file mode 100644
index 0000000..e07577e
--- /dev/null
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestCombineFileInputFormat.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
+
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestCombineFileInputFormat {
+  private static final Log LOG =
+    LogFactory.getLog(TestCombineFileInputFormat.class.getName());
+  
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null; 
+  static {
+    try {
+      defaultConf.set("fs.default.name", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestCombineFileInputFormat").makeQualified(localFs);
+
+  private static void writeFile(FileSystem fs, Path name, 
+                                String contents) throws IOException {
+    OutputStream stm;
+    stm = fs.create(name);
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+  
+  /**
+   * Test getSplits
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSplits() throws IOException {
+    JobConf job = new JobConf(defaultConf);
+    localFs.delete(workDir, true);
+    writeFile(localFs, new Path(workDir, "test.txt"), 
+              "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
+    FileInputFormat.setInputPaths(job, workDir);
+    CombineFileInputFormat format = new CombineFileInputFormat() {
+      @Override
+      public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+        return new CombineFileRecordReader(job, (CombineFileSplit)split, reporter, CombineFileRecordReader.class);
+      }
+    };
+    final int SIZE_SPLITS = 1;
+    LOG.info("Trying to getSplits with splits = " + SIZE_SPLITS);
+    InputSplit[] splits = format.getSplits(job, SIZE_SPLITS);
+    LOG.info("Got getSplits = " + splits.length);
+    assertEquals("splits == " + SIZE_SPLITS, SIZE_SPLITS, splits.length);
+  }
+}