SQOOP-2295: Hive import with Parquet should append automatically
(Qian Xu via Abraham Elmahrek)
diff --git a/src/docs/man/hive-args.txt b/src/docs/man/hive-args.txt
index 7d9e427..b92a446 100644
--- a/src/docs/man/hive-args.txt
+++ b/src/docs/man/hive-args.txt
@@ -29,7 +29,7 @@
Overwrites existing data in the hive table if it exists.
--create-hive-table::
- If set, then the job will fail if the target hive table exits.
+ If set, then the job will fail if the target hive table exists.
By default this property is false.
--hive-table (table-name)::
diff --git a/src/docs/man/sqoop-create-hive-table.txt b/src/docs/man/sqoop-create-hive-table.txt
index 7aebcc1..afae9d0 100644
--- a/src/docs/man/sqoop-create-hive-table.txt
+++ b/src/docs/man/sqoop-create-hive-table.txt
@@ -35,7 +35,7 @@
Overwrites existing data in the hive table if it exists.
--create-hive-table::
- If set, then the job will fail if the target hive table exits.
+ If set, then the job will fail if the target hive table exists.
By default this property is false.
--hive-table (table-name)::
diff --git a/src/docs/user/create-hive-table.txt b/src/docs/user/create-hive-table.txt
index 3aa34fd..dceb204 100644
--- a/src/docs/user/create-hive-table.txt
+++ b/src/docs/user/create-hive-table.txt
@@ -50,7 +50,7 @@
+\--hive-home <dir>+ Override +$HIVE_HOME+
+\--hive-overwrite+ Overwrite existing data in the Hive table.
+\--create-hive-table+ If set, then the job will fail if the target hive
- table exits. By default this property is false.
+ table exists. By default this property is false.
+\--hive-table <table-name>+ Sets the table name to use when importing \
to Hive.
+\--table+ The database table to read the \
diff --git a/src/docs/user/hive-args.txt b/src/docs/user/hive-args.txt
index 53de92d..e54ee1e 100644
--- a/src/docs/user/hive-args.txt
+++ b/src/docs/user/hive-args.txt
@@ -28,7 +28,7 @@
default delimiters if none are set.)
+\--hive-overwrite+ Overwrite existing data in the Hive table.
+\--create-hive-table+ If set, then the job will fail if the target hive
- table exits. By default this property is false.
+ table exists. By default this property is false.
+\--hive-table <table-name>+ Sets the table name to use when importing\
to Hive.
+\--hive-drop-import-delims+ Drops '\n', '\r', and '\01' from string\
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index d5bfae2..7521464 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -47,6 +47,7 @@
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+import org.kitesdk.data.Datasets;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
/**
@@ -105,8 +106,27 @@
final String schemaNameOverride = tableName;
Schema schema = generateAvroSchema(tableName, schemaNameOverride);
String uri = getKiteUri(conf, tableName);
- ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode(),
- options.doHiveImport() && options.doOverwriteHiveTable());
+ ParquetJob.WriteMode writeMode;
+
+ if (options.doHiveImport()) {
+ if (options.doOverwriteHiveTable()) {
+ writeMode = ParquetJob.WriteMode.OVERWRITE;
+ } else {
+ writeMode = ParquetJob.WriteMode.APPEND;
+ if (Datasets.exists(uri)) {
+ LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
+ "append data into the existing Hive table. Consider using " +
+ "--hive-overwrite, if you do NOT intend to do appending.");
+ }
+ }
+ } else {
+ // Note that there is no such an import argument for overwriting HDFS
+ // dataset, so overwrite mode is not supported yet.
+ // Sqoop's append mode means to merge two independent datasets. We
+ // choose DEFAULT as write mode.
+ writeMode = ParquetJob.WriteMode.DEFAULT;
+ }
+ ParquetJob.configureImportJob(conf, schema, uri, writeMode);
}
job.setMapperClass(getMapperClass());
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index df55dbc..c775ef3 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -26,7 +26,6 @@
import org.kitesdk.data.CompressionType;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
@@ -46,6 +45,9 @@
private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
+ enum WriteMode {
+ DEFAULT, APPEND, OVERWRITE
+ };
public static Schema getAvroSchema(Configuration conf) {
return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
@@ -71,14 +73,14 @@
* {@link org.apache.avro.generic.GenericRecord}.
*/
public static void configureImportJob(Configuration conf, Schema schema,
- String uri, boolean reuseExistingDataset, boolean overwrite) throws IOException {
+ String uri, WriteMode writeMode) throws IOException {
Dataset dataset;
- if (reuseExistingDataset || overwrite) {
- try {
- dataset = Datasets.load(uri);
- } catch (DatasetNotFoundException ex) {
- dataset = createDataset(schema, getCompressionType(conf), uri);
+ if (Datasets.exists(uri)) {
+ if (WriteMode.DEFAULT.equals(writeMode)) {
+ throw new IOException("Destination exists! " + uri);
}
+
+ dataset = Datasets.load(uri);
Schema writtenWith = dataset.getDescriptor().getSchema();
if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
throw new IOException(
@@ -90,10 +92,14 @@
}
conf.set(CONF_AVRO_SCHEMA, schema.toString());
- if (overwrite) {
- DatasetKeyOutputFormat.configure(conf).overwrite(dataset);
+ DatasetKeyOutputFormat.ConfigBuilder builder =
+ DatasetKeyOutputFormat.configure(conf);
+ if (WriteMode.OVERWRITE.equals(writeMode)) {
+ builder.overwrite(dataset);
+ } else if (WriteMode.APPEND.equals(writeMode)) {
+ builder.appendTo(dataset);
} else {
- DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
+ builder.writeTo(dataset);
}
}
diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java
index 07e140a..ae2e617 100644
--- a/src/test/com/cloudera/sqoop/TestParquetImport.java
+++ b/src/test/com/cloudera/sqoop/TestParquetImport.java
@@ -204,7 +204,7 @@
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, null));
- runImportAgain(getOutputArgv(true, new String[]{"--append"}));
+ runImport(getOutputArgv(true, new String[]{"--append"}));
DatasetReader<GenericRecord> reader = getReader();
try {
@@ -226,7 +226,7 @@
runImport(getOutputArgv(true, null));
try {
- runImportAgain(getOutputArgv(true, null));
+ runImport(getOutputArgv(true, null));
fail("");
} catch (IOException ex) {
// ok
diff --git a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
index fa717cb..b626964 100644
--- a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
+++ b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
@@ -20,23 +20,25 @@
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
import com.cloudera.sqoop.tool.BaseSqoopTool;
import com.cloudera.sqoop.tool.CodeGenTool;
@@ -44,6 +46,9 @@
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.SqoopTool;
import org.apache.commons.cli.ParseException;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
/**
* Test HiveImport capability after an import to HDFS.
@@ -53,11 +58,13 @@
public static final Log LOG = LogFactory.getLog(
TestHiveImport.class.getName());
+ @Before
public void setUp() {
super.setUp();
HiveImport.setTestMode(true);
}
+ @After
public void tearDown() {
super.tearDown();
HiveImport.setTestMode(false);
@@ -272,11 +279,47 @@
setNumCols(3);
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
String [] vals = { "'test'", "42", "'somestring'" };
- String [] args_array = getArgv(false, null);
- ArrayList<String> args = new ArrayList<String>(Arrays.asList(args_array));
- args.add("--as-parquetfile");
- runImportTest(TABLE_NAME, types, vals, "normalImportAsParquet.q", args.toArray(new String[0]),
- new ImportTool());
+ String [] extraArgs = {"--as-parquetfile"};
+
+ runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs),
+ new ImportTool());
+ verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+ }
+
+ private void verifyHiveDataset(String tableName, Object[][] valsArray) {
+ String datasetUri = String.format("dataset:hive:default/%s",
+ tableName.toLowerCase());
+ assertTrue(Datasets.exists(datasetUri));
+ Dataset dataset = Datasets.load(datasetUri);
+ assertFalse(dataset.isEmpty());
+
+ DatasetReader<GenericRecord> reader = dataset.newReader();
+ try {
+ List<String> expectations = new ArrayList<String>();
+ if (valsArray != null) {
+ for (Object[] vals : valsArray) {
+ expectations.add(Arrays.toString(vals));
+ }
+ }
+
+ while (reader.hasNext() && expectations.size() > 0) {
+ String actual = Arrays.toString(
+ convertGenericRecordToArray(reader.next()));
+ assertTrue("Expect record: " + actual, expectations.remove(actual));
+ }
+ assertFalse(reader.hasNext());
+ assertEquals(0, expectations.size());
+ } finally {
+ reader.close();
+ }
+ }
+
+ private static Object[] convertGenericRecordToArray(GenericRecord record) {
+ Object[] result = new Object[record.getSchema().getFields().size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = record.get(i);
+ }
+ return result;
}
/** Test that table is created in hive with no data import. */
@@ -312,6 +355,53 @@
new CreateHiveTableTool());
}
+ /**
+ * Test that table is created in hive and replaces the existing table if
+ * any.
+ */
+ @Test
+ public void testCreateOverwriteHiveImportAsParquet() throws IOException {
+ final String TABLE_NAME = "CREATE_OVERWRITE_HIVE_IMPORT_AS_PARQUET";
+ setCurTableName(TABLE_NAME);
+ setNumCols(3);
+ String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+ String [] vals = { "'test'", "42", "'somestring'" };
+ String [] extraArgs = {"--as-parquetfile"};
+ ImportTool tool = new ImportTool();
+
+ runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool);
+ verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+
+ String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" };
+ String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"};
+ runImportTest(TABLE_NAME, types, valsToOverwrite, "",
+ getArgv(false, extraArgsForOverwrite), tool);
+ verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}});
+ }
+
+ /**
+ * Test that records are appended to an existing table.
+ */
+ @Test
+ public void testAppendHiveImportAsParquet() throws IOException {
+ final String TABLE_NAME = "APPEND_HIVE_IMPORT_AS_PARQUET";
+ setCurTableName(TABLE_NAME);
+ setNumCols(3);
+ String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+ String [] vals = { "'test'", "42", "'somestring'" };
+ String [] extraArgs = {"--as-parquetfile"};
+ String [] args = getArgv(false, extraArgs);
+ ImportTool tool = new ImportTool();
+
+ runImportTest(TABLE_NAME, types, vals, "", args, tool);
+ verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+
+ String [] valsToAppend = { "'test2'", "4242", "'somestring2'" };
+ runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool);
+ verifyHiveDataset(TABLE_NAME, new Object[][] {
+ {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}});
+ }
+
/** Test that dates are coerced properly to strings. */
@Test
public void testDate() throws IOException {
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index 7934791..e3098d6 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -244,10 +244,11 @@
} catch (IOException e) {
LOG.warn(e);
}
- }
- File s = new File(getWarehouseDir());
- if (!s.delete()) {
- LOG.warn("Can't delete " + s.getPath());
+ } else {
+ File s = new File(getWarehouseDir());
+ if (!s.delete()) {
+ LOG.warn("Cannot delete " + s.getPath());
+ }
}
}
diff --git a/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java
index 293bf10..08408a5 100644
--- a/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java
@@ -35,6 +35,7 @@
import com.cloudera.sqoop.tool.SqoopTool;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.util.ClassLoaderStack;
+import org.junit.Before;
/**
* Class that implements common methods required for tests which import data
@@ -45,6 +46,12 @@
public static final Log LOG = LogFactory.getLog(
ImportJobTestCase.class.getName());
+ @Before
+ public void setUp() {
+ super.setUp();
+ removeTableDir();
+ }
+
protected String getTablePrefix() {
return "IMPORT_TABLE_";
}
@@ -206,16 +213,6 @@
* execution).
*/
protected void runImport(SqoopTool tool, String [] argv) throws IOException {
- boolean cleanup = true;
- runImport(cleanup, tool, argv);
- }
-
- private void runImport(boolean cleanup, SqoopTool tool,
- String [] argv) throws IOException {
- if (cleanup) {
- removeTableDir();
- }
-
// run the tool through the normal entry-point.
int ret;
try {
@@ -242,10 +239,4 @@
runImport(new ImportTool(), argv);
}
- protected void runImportAgain(String[] argv)
- throws IOException {
- boolean cleanup = false;
- runImport(cleanup, new ImportTool(), argv);
- }
-
}
diff --git a/testdata/hive/scripts/normalImportAsParquet.q b/testdata/hive/scripts/normalImportAsParquet.q
deleted file mode 100644
index e434e9b..0000000
--- a/testdata/hive/scripts/normalImportAsParquet.q
+++ /dev/null
@@ -1,17 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements. See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership. The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-CREATE TABLE IF NOT EXISTS `NORMAL_HIVE_IMPORT_AS_PARQUET` ( `DATA_COL0` STRING, `DATA_COL1` INT, `DATA_COL2` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS PARQUET;
-LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NORMAL_HIVE_IMPORT_AS_PARQUET' INTO TABLE `NORMAL_HIVE_IMPORT_AS_PARQUET`;