SQOOP-2295: Hive import with Parquet should append automatically

(Qian Xu via Abraham Elmahrek)
diff --git a/src/docs/man/hive-args.txt b/src/docs/man/hive-args.txt
index 7d9e427..b92a446 100644
--- a/src/docs/man/hive-args.txt
+++ b/src/docs/man/hive-args.txt
@@ -29,7 +29,7 @@
   Overwrites existing data in the hive table if it exists.
 
 --create-hive-table::
-  If set, then the job will fail if the target hive table exits.
+  If set, then the job will fail if the target hive table exists.
   By default this property is false.
 
 --hive-table (table-name)::
diff --git a/src/docs/man/sqoop-create-hive-table.txt b/src/docs/man/sqoop-create-hive-table.txt
index 7aebcc1..afae9d0 100644
--- a/src/docs/man/sqoop-create-hive-table.txt
+++ b/src/docs/man/sqoop-create-hive-table.txt
@@ -35,7 +35,7 @@
   Overwrites existing data in the hive table if it exists.
 
 --create-hive-table::
-  If set, then the job will fail if the target hive table exits.
+  If set, then the job will fail if the target hive table exists.
   By default this property is false.
 
 --hive-table (table-name)::
diff --git a/src/docs/user/create-hive-table.txt b/src/docs/user/create-hive-table.txt
index 3aa34fd..dceb204 100644
--- a/src/docs/user/create-hive-table.txt
+++ b/src/docs/user/create-hive-table.txt
@@ -50,7 +50,7 @@
 +\--hive-home <dir>+          Override +$HIVE_HOME+
 +\--hive-overwrite+           Overwrite existing data in the Hive table.
 +\--create-hive-table+        If set, then the job will fail if the target hive
-                              table exits. By default this property is false.
+                              table exists. By default this property is false.
 +\--hive-table <table-name>+  Sets the table name to use when importing \
                               to Hive.
 +\--table+                    The database table to read the \
diff --git a/src/docs/user/hive-args.txt b/src/docs/user/hive-args.txt
index 53de92d..e54ee1e 100644
--- a/src/docs/user/hive-args.txt
+++ b/src/docs/user/hive-args.txt
@@ -28,7 +28,7 @@
                               default delimiters if none are set.)
 +\--hive-overwrite+           Overwrite existing data in the Hive table.
 +\--create-hive-table+        If set, then the job will fail if the target hive
-                              table exits. By default this property is false.
+                              table exists. By default this property is false.
 +\--hive-table <table-name>+  Sets the table name to use when importing\
                               to Hive.
 +\--hive-drop-import-delims+  Drops '\n', '\r', and '\01' from string\
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index d5bfae2..7521464 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -47,6 +47,7 @@
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
 import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+import org.kitesdk.data.Datasets;
 import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
 
 /**
@@ -105,8 +106,27 @@
       final String schemaNameOverride = tableName;
       Schema schema = generateAvroSchema(tableName, schemaNameOverride);
       String uri = getKiteUri(conf, tableName);
-      ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode(),
-          options.doHiveImport() && options.doOverwriteHiveTable());
+      ParquetJob.WriteMode writeMode;
+
+      if (options.doHiveImport()) {
+        if (options.doOverwriteHiveTable()) {
+          writeMode = ParquetJob.WriteMode.OVERWRITE;
+        } else {
+          writeMode = ParquetJob.WriteMode.APPEND;
+          if (Datasets.exists(uri)) {
+            LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
+                "append data into the existing Hive table. Consider using " +
+                "--hive-overwrite, if you do NOT intend to do appending.");
+          }
+        }
+      } else {
+        // Note that there is no such an import argument for overwriting HDFS
+        // dataset, so overwrite mode is not supported yet.
+        // Sqoop's append mode means to merge two independent datasets. We
+        // choose DEFAULT as write mode.
+        writeMode = ParquetJob.WriteMode.DEFAULT;
+      }
+      ParquetJob.configureImportJob(conf, schema, uri, writeMode);
     }
 
     job.setMapperClass(getMapperClass());
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index df55dbc..c775ef3 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -26,7 +26,6 @@
 import org.kitesdk.data.CompressionType;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetNotFoundException;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.Formats;
 import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
@@ -46,6 +45,9 @@
 
   private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
   static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
+  enum WriteMode {
+    DEFAULT, APPEND, OVERWRITE
+  };
 
   public static Schema getAvroSchema(Configuration conf) {
     return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
@@ -71,14 +73,14 @@
    * {@link org.apache.avro.generic.GenericRecord}.
    */
   public static void configureImportJob(Configuration conf, Schema schema,
-      String uri, boolean reuseExistingDataset, boolean overwrite) throws IOException {
+      String uri, WriteMode writeMode) throws IOException {
     Dataset dataset;
-    if (reuseExistingDataset || overwrite) {
-      try {
-        dataset = Datasets.load(uri);
-      } catch (DatasetNotFoundException ex) {
-        dataset = createDataset(schema, getCompressionType(conf), uri);
+    if (Datasets.exists(uri)) {
+      if (WriteMode.DEFAULT.equals(writeMode)) {
+        throw new IOException("Destination exists! " + uri);
       }
+
+      dataset = Datasets.load(uri);
       Schema writtenWith = dataset.getDescriptor().getSchema();
       if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
         throw new IOException(
@@ -90,10 +92,14 @@
     }
     conf.set(CONF_AVRO_SCHEMA, schema.toString());
 
-    if (overwrite) {
-      DatasetKeyOutputFormat.configure(conf).overwrite(dataset);
+    DatasetKeyOutputFormat.ConfigBuilder builder =
+        DatasetKeyOutputFormat.configure(conf);
+    if (WriteMode.OVERWRITE.equals(writeMode)) {
+      builder.overwrite(dataset);
+    } else if (WriteMode.APPEND.equals(writeMode)) {
+      builder.appendTo(dataset);
     } else {
-      DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
+      builder.writeTo(dataset);
     }
   }
 
diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java
index 07e140a..ae2e617 100644
--- a/src/test/com/cloudera/sqoop/TestParquetImport.java
+++ b/src/test/com/cloudera/sqoop/TestParquetImport.java
@@ -204,7 +204,7 @@
     createTableWithColTypes(types, vals);
 
     runImport(getOutputArgv(true, null));
-    runImportAgain(getOutputArgv(true, new String[]{"--append"}));
+    runImport(getOutputArgv(true, new String[]{"--append"}));
 
     DatasetReader<GenericRecord> reader = getReader();
     try {
@@ -226,7 +226,7 @@
 
     runImport(getOutputArgv(true, null));
     try {
-      runImportAgain(getOutputArgv(true, null));
+      runImport(getOutputArgv(true, null));
       fail("");
     } catch (IOException ex) {
       // ok
diff --git a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
index fa717cb..b626964 100644
--- a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
+++ b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
@@ -20,23 +20,25 @@
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
 import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.testutil.HsqldbTestServer;
 import com.cloudera.sqoop.testutil.ImportJobTestCase;
 import com.cloudera.sqoop.tool.BaseSqoopTool;
 import com.cloudera.sqoop.tool.CodeGenTool;
@@ -44,6 +46,9 @@
 import com.cloudera.sqoop.tool.ImportTool;
 import com.cloudera.sqoop.tool.SqoopTool;
 import org.apache.commons.cli.ParseException;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
 
 /**
  * Test HiveImport capability after an import to HDFS.
@@ -53,11 +58,13 @@
   public static final Log LOG = LogFactory.getLog(
       TestHiveImport.class.getName());
 
+  @Before
   public void setUp() {
     super.setUp();
     HiveImport.setTestMode(true);
   }
 
+  @After
   public void tearDown() {
     super.tearDown();
     HiveImport.setTestMode(false);
@@ -272,11 +279,47 @@
     setNumCols(3);
     String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
     String [] vals = { "'test'", "42", "'somestring'" };
-    String [] args_array = getArgv(false, null);
-    ArrayList<String> args = new ArrayList<String>(Arrays.asList(args_array));
-    args.add("--as-parquetfile");
-    runImportTest(TABLE_NAME, types, vals, "normalImportAsParquet.q", args.toArray(new String[0]),
-            new ImportTool());
+    String [] extraArgs = {"--as-parquetfile"};
+
+    runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs),
+        new ImportTool());
+    verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+  }
+
+  private void verifyHiveDataset(String tableName, Object[][] valsArray) {
+    String datasetUri = String.format("dataset:hive:default/%s",
+        tableName.toLowerCase());
+    assertTrue(Datasets.exists(datasetUri));
+    Dataset dataset = Datasets.load(datasetUri);
+    assertFalse(dataset.isEmpty());
+
+    DatasetReader<GenericRecord> reader = dataset.newReader();
+    try {
+      List<String> expectations = new ArrayList<String>();
+      if (valsArray != null) {
+        for (Object[] vals : valsArray) {
+          expectations.add(Arrays.toString(vals));
+        }
+      }
+
+      while (reader.hasNext() && expectations.size() > 0) {
+        String actual = Arrays.toString(
+            convertGenericRecordToArray(reader.next()));
+        assertTrue("Expect record: " + actual, expectations.remove(actual));
+      }
+      assertFalse(reader.hasNext());
+      assertEquals(0, expectations.size());
+    } finally {
+      reader.close();
+    }
+  }
+
+  private static Object[] convertGenericRecordToArray(GenericRecord record) {
+    Object[] result = new Object[record.getSchema().getFields().size()];
+    for (int i = 0; i < result.length; i++) {
+      result[i] = record.get(i);
+    }
+    return result;
   }
 
   /** Test that table is created in hive with no data import. */
@@ -312,6 +355,53 @@
         new CreateHiveTableTool());
   }
 
+  /**
+   * Test that table is created in hive and replaces the existing table if
+   * any.
+   */
+  @Test
+  public void testCreateOverwriteHiveImportAsParquet() throws IOException {
+    final String TABLE_NAME = "CREATE_OVERWRITE_HIVE_IMPORT_AS_PARQUET";
+    setCurTableName(TABLE_NAME);
+    setNumCols(3);
+    String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+    String [] vals = { "'test'", "42", "'somestring'" };
+    String [] extraArgs = {"--as-parquetfile"};
+    ImportTool tool = new ImportTool();
+
+    runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool);
+    verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+
+    String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" };
+    String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"};
+    runImportTest(TABLE_NAME, types, valsToOverwrite, "",
+        getArgv(false, extraArgsForOverwrite), tool);
+    verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}});
+  }
+
+  /**
+   * Test that records are appended to an existing table.
+   */
+  @Test
+  public void testAppendHiveImportAsParquet() throws IOException {
+    final String TABLE_NAME = "APPEND_HIVE_IMPORT_AS_PARQUET";
+    setCurTableName(TABLE_NAME);
+    setNumCols(3);
+    String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+    String [] vals = { "'test'", "42", "'somestring'" };
+    String [] extraArgs = {"--as-parquetfile"};
+    String [] args = getArgv(false, extraArgs);
+    ImportTool tool = new ImportTool();
+
+    runImportTest(TABLE_NAME, types, vals, "", args, tool);
+    verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+
+    String [] valsToAppend = { "'test2'", "4242", "'somestring2'" };
+    runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool);
+    verifyHiveDataset(TABLE_NAME, new Object[][] {
+        {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}});
+  }
+
   /** Test that dates are coerced properly to strings. */
   @Test
   public void testDate() throws IOException {
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index 7934791..e3098d6 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -244,10 +244,11 @@
       } catch (IOException e) {
         LOG.warn(e);
       }
-    }
-    File s = new File(getWarehouseDir());
-    if (!s.delete()) {
-      LOG.warn("Can't delete " + s.getPath());
+    } else {
+      File s = new File(getWarehouseDir());
+      if (!s.delete()) {
+        LOG.warn("Cannot delete " + s.getPath());
+      }
     }
   }
 
diff --git a/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java
index 293bf10..08408a5 100644
--- a/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java
@@ -35,6 +35,7 @@
 import com.cloudera.sqoop.tool.SqoopTool;
 import com.cloudera.sqoop.tool.ImportTool;
 import com.cloudera.sqoop.util.ClassLoaderStack;
+import org.junit.Before;
 
 /**
  * Class that implements common methods required for tests which import data
@@ -45,6 +46,12 @@
   public static final Log LOG = LogFactory.getLog(
       ImportJobTestCase.class.getName());
 
+  @Before
+  public void setUp() {
+    super.setUp();
+    removeTableDir();
+  }
+
   protected String getTablePrefix() {
     return "IMPORT_TABLE_";
   }
@@ -206,16 +213,6 @@
    * execution).
    */
   protected void runImport(SqoopTool tool, String [] argv) throws IOException {
-    boolean cleanup = true;
-    runImport(cleanup, tool, argv);
-  }
-
-  private void runImport(boolean cleanup, SqoopTool tool,
-      String [] argv) throws IOException {
-    if (cleanup) {
-      removeTableDir();
-    }
-
     // run the tool through the normal entry-point.
     int ret;
     try {
@@ -242,10 +239,4 @@
     runImport(new ImportTool(), argv);
   }
 
-  protected void runImportAgain(String[] argv)
-      throws IOException {
-    boolean cleanup = false;
-    runImport(cleanup, new ImportTool(), argv);
-  }
-
 }
diff --git a/testdata/hive/scripts/normalImportAsParquet.q b/testdata/hive/scripts/normalImportAsParquet.q
deleted file mode 100644
index e434e9b..0000000
--- a/testdata/hive/scripts/normalImportAsParquet.q
+++ /dev/null
@@ -1,17 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-CREATE TABLE IF NOT EXISTS `NORMAL_HIVE_IMPORT_AS_PARQUET` ( `DATA_COL0` STRING, `DATA_COL1` INT, `DATA_COL2` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS PARQUET;
-LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NORMAL_HIVE_IMPORT_AS_PARQUET' INTO TABLE `NORMAL_HIVE_IMPORT_AS_PARQUET`;