HCATALOG-527 InputJobInfo should not be public

git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/trunk@1421616 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f1baeb..84c8423 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,8 @@
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-527 InputJobInfo should not be public (traviscrawford)
+
   HCAT-560 HCatClient should support addition of new columns to a Table.(mithunr via avandana)
 
   HCAT-558 Update test.sh to test from an extracted src-release (traviscrawford)
diff --git a/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java b/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
index cf9bba2..742626c 100644
--- a/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
+++ b/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
@@ -37,7 +37,6 @@
 import org.apache.hcatalog.data.transfer.ReaderContext;
 import org.apache.hcatalog.data.transfer.state.StateProvider;
 import org.apache.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.shims.HCatHadoopShims;
 
 /**
@@ -60,13 +59,10 @@
 
     @Override
     public ReaderContext prepareRead() throws HCatException {
-
         try {
             Job job = new Job(conf);
-            InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(),
-                re.getTableName(), re.getFilterString());
-            HCatInputFormat.setInput(job, jobInfo);
-            HCatInputFormat hcif = new HCatInputFormat();
+            HCatInputFormat hcif = HCatInputFormat.setInput(
+                job, re.getDbName(), re.getTableName()).setFilter(re.getFilterString());
             ReaderContext cntxt = new ReaderContext();
             cntxt.setInputSplits(hcif.getSplits(
                 HCatHadoopShims.Instance.get().createJobContext(job.getConfiguration(), null)));
diff --git a/core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java b/core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
index 7a7446f..006d818 100644
--- a/core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
+++ b/core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
@@ -19,38 +19,117 @@
 package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
+import java.util.Properties;
 
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 
-/** The InputFormat to use to read data from HCatalog. */
+/**
+ * The InputFormat to use to read data from HCatalog.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public class HCatInputFormat extends HCatBaseInputFormat {
 
+    private Configuration conf;
+    private InputJobInfo inputJobInfo;
+
     /**
-     * @see org.apache.hcatalog.mapreduce.HCatInputFormat#setInput(org.apache.hadoop.conf.Configuration, InputJobInfo)
+     * @deprecated as of release 0.5, and will be removed in a future release
      */
-    public static void setInput(Job job,
-                                InputJobInfo inputJobInfo) throws IOException {
+    @Deprecated
+    public static void setInput(Job job, InputJobInfo inputJobInfo) throws IOException {
         setInput(job.getConfiguration(), inputJobInfo);
     }
 
     /**
-     * Set the input information to use for the job. This queries the metadata server
-     * with the specified partition predicates, gets the matching partitions, and
-     * puts the information in the conf object. The inputInfo object is updated
-     * with information needed in the client context.
-     * @param conf the job Configuration object
-     * @param inputJobInfo the input information about the table to read
-     * @throws IOException the exception in communicating with the metadata server
+     * @deprecated as of release 0.5, and will be removed in a future release
      */
-    public static void setInput(Configuration conf,
-                                InputJobInfo inputJobInfo) throws IOException {
+    @Deprecated
+    public static void setInput(Configuration conf, InputJobInfo inputJobInfo) throws IOException {
+        setInput(conf, inputJobInfo.getDatabaseName(), inputJobInfo.getTableName())
+            .setFilter(inputJobInfo.getFilter())
+            .setProperties(inputJobInfo.getProperties());
+    }
+
+    /**
+     * See {@link #setInput(org.apache.hadoop.conf.Configuration, String, String)}
+     */
+    public static HCatInputFormat setInput(Job job, String dbName, String tableName) throws IOException {
+        return setInput(job.getConfiguration(), dbName, tableName);
+    }
+
+    /**
+     * Set inputs to use for the job. This queries the metastore with the given input
+     * specification and serializes matching partitions into the job conf for use by MR tasks.
+     * @param conf the job configuration
+     * @param dbName database name, which if null 'default' is used
+     * @param tableName table name
+     * @throws IOException on all errors
+     */
+    public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName)
+        throws IOException {
+
+        Preconditions.checkNotNull(conf, "required argument 'conf' is null");
+        Preconditions.checkNotNull(tableName, "required argument 'tableName' is null");
+
+        HCatInputFormat hCatInputFormat = new HCatInputFormat();
+        hCatInputFormat.conf = conf;
+        hCatInputFormat.inputJobInfo = InputJobInfo.create(dbName, tableName, null, null);
+
+        try {
+            InitializeInput.setInput(conf, hCatInputFormat.inputJobInfo);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+
+        return hCatInputFormat;
+    }
+
+    /**
+     * Set a filter on the input table.
+     * @param filter the filter specification, which may be null
+     * @return this
+     * @throws IOException on all errors
+     */
+    public HCatInputFormat setFilter(String filter) throws IOException {
+        // null filters are supported to simplify client code
+        if (filter != null) {
+            inputJobInfo = InputJobInfo.create(
+                inputJobInfo.getDatabaseName(),
+                inputJobInfo.getTableName(),
+                filter,
+                inputJobInfo.getProperties());
+            try {
+                InitializeInput.setInput(conf, inputJobInfo);
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+        return this;
+    }
+
+    /**
+     * Set properties for the input format.
+     * @param properties properties for the input specification
+     * @return this
+     * @throws IOException on all errors
+     */
+    public HCatInputFormat setProperties(Properties properties) throws IOException {
+        Preconditions.checkNotNull(properties, "required argument 'properties' is null");
+        inputJobInfo = InputJobInfo.create(
+            inputJobInfo.getDatabaseName(),
+            inputJobInfo.getTableName(),
+            inputJobInfo.getFilter(),
+            properties);
         try {
             InitializeInput.setInput(conf, inputJobInfo);
         } catch (Exception e) {
             throw new IOException(e);
         }
+        return this;
     }
-
-
 }
diff --git a/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java b/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
index e91ddc7..627c4b3 100644
--- a/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
+++ b/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
@@ -45,7 +45,7 @@
  * serialized and written into the JobContext configuration. The inputInfo is also updated with
  * info required in the client process context.
  */
-public class InitializeInput {
+class InitializeInput {
 
     private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class);
 
@@ -79,8 +79,8 @@
         InputJobInfo inputJobInfo = InputJobInfo.create(
             theirInputJobInfo.getDatabaseName(),
             theirInputJobInfo.getTableName(),
-            theirInputJobInfo.getFilter());
-        inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties());
+            theirInputJobInfo.getFilter(),
+            theirInputJobInfo.getProperties());
         conf.set(
             HCatConstants.HCAT_KEY_JOB_INFO,
             HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null)));
diff --git a/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java b/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
index 8779c5e..2cfbc30 100644
--- a/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
+++ b/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hcatalog.mapreduce;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 
 import java.io.IOException;
@@ -30,17 +32,16 @@
 import java.util.zip.InflaterInputStream;
 
 /**
- * Container for metadata read from the metadata server. Users should specify input to
- * their HCatalog MR jobs as follows:
- * <p><code>
- * HCatInputFormat.setInput(job, InputJobInfo.create(databaseName, tableName, filter));
- * </code></p>
- * Note: while InputJobInfo is public,
- * <a href="https://issues.apache.org/jira/browse/HCATALOG-527">HCATALOG-527</a> discusses
- * removing this class from the public API, by simplifying {@link HCatInputFormat#setInput}
- * to simply take the input specification arguments directly. Use InputJobInfo outside the
- * above context (including serialization) at your own peril!
+ * Container for metadata read from the metadata server.
+ * Prior to release 0.5, InputJobInfo was a key part of the public API, exposed directly
+ * to end-users as an argument to
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, InputJobInfo)}.
+ * Going forward, we plan on treating InputJobInfo as an implementation detail and no longer
+ * expose to end-users. Should you have a need to use InputJobInfo outside HCatalog itself,
+ * please contact the developer mailing list before depending on this class.
  */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class InputJobInfo implements Serializable {
 
     /** The serialization version */
@@ -69,22 +70,22 @@
      * @param tableName the table name
      * @param filter the partition filter
      */
-
     public static InputJobInfo create(String databaseName,
                                       String tableName,
-                                      String filter) {
-        return new InputJobInfo(databaseName, tableName, filter);
+                                      String filter,
+                                      Properties properties) {
+        return new InputJobInfo(databaseName, tableName, filter, properties);
     }
 
-
     private InputJobInfo(String databaseName,
                          String tableName,
-                         String filter) {
+                         String filter,
+                         Properties properties) {
         this.databaseName = (databaseName == null) ?
             MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
         this.tableName = tableName;
         this.filter = filter;
-        this.properties = new Properties();
+        this.properties = properties == null ? new Properties() : properties;
     }
 
     /**
diff --git a/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java b/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java
index 18d368f..3108aed 100644
--- a/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java
+++ b/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java
@@ -110,9 +110,7 @@
 
     private ReaderContext runsInMaster(Map<String, String> config, boolean bogus)
         throws HCatException {
-
-        ReadEntity.Builder builder = new ReadEntity.Builder();
-        ReadEntity entity = builder.withTable("mytbl").build();
+        ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
         HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
         ReaderContext cntxt = reader.prepareRead();
         return cntxt;
diff --git a/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java b/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
index 74c4888..5380bf0 100644
--- a/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
+++ b/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
@@ -321,8 +321,7 @@
         job.setInputFormatClass(HCatInputFormat.class);
         job.setOutputFormatClass(TextOutputFormat.class);
 
-        InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, filter);
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job, dbName, tableName).setFilter(filter);
 
         job.setMapOutputKeyClass(BytesWritable.class);
         job.setMapOutputValueClass(Text.class);
@@ -353,8 +352,7 @@
         job.setInputFormatClass(HCatInputFormat.class);
         job.setOutputFormatClass(TextOutputFormat.class);
 
-        InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, null);
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job, dbName, tableName);
 
         return HCatInputFormat.getTableSchema(job);
     }
diff --git a/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java b/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
index a6b381e..eb6b748 100644
--- a/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
+++ b/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
@@ -119,7 +119,7 @@
         job.setInputFormatClass(HCatInputFormat.class);
         job.setOutputFormatClass(TextOutputFormat.class);
 
-        HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null));
+        HCatInputFormat.setInput(job, "default", "test_bad_records");
 
         job.setMapOutputKeyClass(HCatRecord.class);
         job.setMapOutputValueClass(HCatRecord.class);
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
index 5a4d400..733abb2 100644
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
+++ b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
@@ -115,8 +115,7 @@
             }
         } else {
             Job clone = new Job(job.getConfiguration());
-            HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
-                tableName, getPartitionFilterString()));
+            HCatInputFormat.setInput(job, dbName, tableName).setFilter(getPartitionFilterString());
 
             // We will store all the new /changed properties in the job in the
             // udf context, so the the HCatInputFormat.setInput method need not
diff --git a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
index 3875185..c105a62 100644
--- a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
+++ b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
@@ -63,7 +63,6 @@
 import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 
 import org.junit.Test;
@@ -592,9 +591,7 @@
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapReadAbortedTransaction.class);
         job.setInputFormatClass(HCatInputFormat.class);
-        InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
-            tableName, null);
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job, databaseName, tableName);
         job.setOutputFormatClass(TextOutputFormat.class);
         TextOutputFormat.setOutputPath(job, outputDir);
         job.setMapOutputKeyClass(BytesWritable.class);
diff --git a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
index 81a3099..3ef41b7 100644
--- a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
+++ b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
@@ -61,7 +61,6 @@
 import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.junit.Test;
 
@@ -363,9 +362,7 @@
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapReadAbortedTransaction.class);
         job.setInputFormatClass(HCatInputFormat.class);
-        InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
-            tableName, null);
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job, databaseName, tableName);
         job.setOutputFormatClass(TextOutputFormat.class);
         TextOutputFormat.setOutputPath(job, outputDir);
         job.setMapOutputKeyClass(BytesWritable.class);
diff --git a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
index 78724a1..4da64d9 100644
--- a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
+++ b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
@@ -216,9 +216,7 @@
         MapReadHTable.resetCounters();
 
         job.setInputFormatClass(HCatInputFormat.class);
-        InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName,
-            null);
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName);
         job.setOutputFormatClass(TextOutputFormat.class);
         TextOutputFormat.setOutputPath(job, outputDir);
         job.setMapOutputKeyClass(BytesWritable.class);
@@ -281,10 +279,8 @@
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapReadProjHTable.class);
         job.setInputFormatClass(HCatInputFormat.class);
-        InputJobInfo inputJobInfo = InputJobInfo.create(
-            MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
         HCatInputFormat.setOutputSchema(job, getProjectionSchema());
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
         job.setOutputFormatClass(TextOutputFormat.class);
         TextOutputFormat.setOutputPath(job, outputDir);
         job.setMapOutputKeyClass(BytesWritable.class);
@@ -340,12 +336,10 @@
         job.setMapperClass(MapReadProjectionHTable.class);
         job.setInputFormat(HBaseInputFormat.class);
 
-        InputJobInfo inputJobInfo = InputJobInfo.create(
-            MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
         //Configure projection schema
         job.set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(getProjectionSchema()));
         Job newJob = new Job(job);
-        HCatInputFormat.setInput(newJob, inputJobInfo);
+        HCatInputFormat.setInput(newJob, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
         String inputJobString = newJob.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
         InputJobInfo info = (InputJobInfo) HCatUtil.deserialize(inputJobString);
         job.set(HCatConstants.HCAT_KEY_JOB_INFO, inputJobString);
@@ -411,9 +405,7 @@
         job.setMapperClass(MapReadHTable.class);
         MapReadHTable.resetCounters();
         job.setInputFormatClass(HCatInputFormat.class);
-        InputJobInfo inputJobInfo = InputJobInfo.create(
-            MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
         job.setOutputFormatClass(TextOutputFormat.class);
         TextOutputFormat.setOutputPath(job, outputDir);
         job.setMapOutputKeyClass(BytesWritable.class);
@@ -473,9 +465,7 @@
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapReadHTableRunningAbort.class);
         job.setInputFormatClass(HCatInputFormat.class);
-        InputJobInfo inputJobInfo = InputJobInfo.create(
-            MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
-        HCatInputFormat.setInput(job, inputJobInfo);
+        HCatInputFormat.setInput(job, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
         job.setOutputFormatClass(TextOutputFormat.class);
         TextOutputFormat.setOutputPath(job, outputDir);
         job.setMapOutputKeyClass(BytesWritable.class);
diff --git a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
index e07bf46..9592645 100644
--- a/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
+++ b/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
@@ -23,6 +23,7 @@
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +38,7 @@
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
-import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.junit.Test;
 
@@ -87,15 +88,15 @@
         cmdResponse = hcatDriver.run(tableQuery);
         assertEquals(0, cmdResponse.getResponseCode());
 
-        InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName, null);
         Configuration conf = new Configuration(hcatConf);
         conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
             HCatUtil.serialize(getHiveConf().getAllProperties()));
         Job job = new Job(conf);
-        inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot");
-        InitializeInput.setInput(job, inputInfo);
+        Properties properties = new Properties();
+        properties.setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot");
+        HCatInputFormat.setInput(job, databaseName, tableName).setProperties(properties);
         String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
-        inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
 
         Map<String, Long> revMap = new HashMap<String, Long>();
         revMap.put("cf1", 3L);
@@ -121,9 +122,7 @@
         revMap.clear();
         revMap.put("cf1", 3L);
         hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap, -1);
-        inputInfo = InputJobInfo.create(databaseName, tableName, null);
-        inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot");
-        InitializeInput.setInput(job, inputInfo);
+        HCatInputFormat.setInput(job, databaseName, tableName).setProperties(properties);
         modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
         inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
         hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo());
@@ -138,5 +137,4 @@
         cmdResponse = hcatDriver.run(dropDatabase);
         assertEquals(0, cmdResponse.getResponseCode());
     }
-
 }