HCATALOG-506 desired number of input splits for large files

git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/trunk@1390178 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index af9ab78..f1d7857 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -40,6 +40,8 @@
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-506 desired number of input splits for large files (gmalewicz via traviscrawford)
+
   HCAT-461 Refactor server-extensions as a subproject (traviscrawford)
 
   HCAT-500 HCatStorer should honor user-specified path for external tables (pengfeng via traviscrawford)
diff --git a/src/java/org/apache/hcatalog/common/HCatConstants.java b/src/java/org/apache/hcatalog/common/HCatConstants.java
index d8467c5..1a9e6fc 100644
--- a/src/java/org/apache/hcatalog/common/HCatConstants.java
+++ b/src/java/org/apache/hcatalog/common/HCatConstants.java
@@ -79,6 +79,16 @@
     public static final String HCAT_METASTORE_PRINCIPAL
         = HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname;
 
+    /**
+     * The desired number of input splits produced for each partition. When the
+     * input files are large and few, we want to split them into many splits,
+     * so as to increase the parallelizm of loading the splits. Try also two
+     * other parameters, mapred.min.split.size and mapred.max.split.size, to
+     * control the number of input splits.
+     */
+    public static final String HCAT_DESIRED_PARTITION_NUM_SPLITS =
+        "hcat.desired.partition.num.splits";
+
     // IMPORTANT IMPORTANT IMPORTANT!!!!!
     //The keys used to store info into the job Configuration.
     //If any new keys are added, the HCatStorer needs to be updated. The HCatStorer
diff --git a/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java b/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
index 6f77a23..3532696 100644
--- a/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
+++ b/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
@@ -142,11 +142,17 @@
             org.apache.hadoop.mapred.InputFormat inputFormat =
                 getMapRedInputFormat(jobConf, inputFormatClass);
 
-            //Call getSplit on the InputFormat, create an
-            //HCatSplit for each underlying split
-            //NumSplits is 0 for our purposes
+            //Call getSplit on the InputFormat, create an HCatSplit for each
+            //underlying split. When the desired number of input splits is missing,
+            //use a default number (denoted by zero).
+            //TODO(malewicz): Currently each partition is split independently into
+            //a desired number. However, we want the union of all partitions to be
+            //split into a desired number while maintaining balanced sizes of input
+            //splits.
+            int desiredNumSplits =
+                conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
             org.apache.hadoop.mapred.InputSplit[] baseSplits =
-                inputFormat.getSplits(jobConf, 0);
+                inputFormat.getSplits(jobConf, desiredNumSplits);
 
             for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
                 splits.add(new HCatSplit(