blob: 8cfd776d5eb221cf83b98bd18a3fa9625905313f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
import org.apache.sqoop.testcategories.sqooptest.IntegrationTest;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.HiveServer2TestUtil;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.util.BlockJUnit4ClassRunnerWithParametersFactory;
import org.apache.sqoop.util.ParquetReader;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static java.util.Arrays.asList;
import static java.util.Arrays.deepEquals;
import static org.apache.sqoop.testutil.BaseSqoopTestCase.timeFromString;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@RunWith(Enclosed.class)
@Category(IntegrationTest.class)
public class TestHiveServer2ParquetImport {
private static final String[] TEST_COLUMN_NAMES = {"C1_VARCHAR", "C2#INTEGER", "3C_CHAR"};
private static final String[] TEST_COLUMN_TYPES = {"VARCHAR(32)", "INTEGER", "CHAR(64)"};
private static final String[] TEST_COLUMN_ALL_TYPES = {"INTEGER", "BIGINT", "DOUBLE", "DECIMAL(10, 2)", "BOOLEAN", "TIMESTAMP", "BINARY", "VARCHAR(100)", "CHAR(100)"};
private static final List<Object> TEST_COLUMN_ALL_TYPES_VALUES = Arrays.<Object>asList(10, 12345678910123L, 12.34, 456842.45, "TRUE", "2018-06-14 15:00:00.000", "abcdef", "testVarchar", "testChar");
private static final Object[] EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES = {10, 12345678910123L, 12.34, "456842.45", true, timeFromString("2018-06-14 15:00:00.000"), decodeHex("abcdef"), "testVarchar", "testChar"};
private static final List<Object> TEST_COLUMN_VALUES = Arrays.<Object>asList("test", 42, "somestring");
private static final List<Object> TEST_COLUMN_VALUES_MAPPED = Arrays.<Object>asList("test", "42", "somestring");
private static final List<Object> TEST_COLUMN_VALUES_LINE2 = Arrays.<Object>asList("test2", 4242, "somestring2");
private static HiveMiniCluster hiveMiniCluster;
private static HiveServer2TestUtil hiveServer2TestUtil;
@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
public static class ParquetCompressionCodecTestCase extends ImportJobTestCase {
@Parameters(name = "compressionCodec = {0}")
public static Iterable<? extends Object> authenticationParameters() {
return Arrays.asList("snappy", "gzip");
}
@BeforeClass
public static void beforeClass() {
startHiveMiniCluster();
}
@AfterClass
public static void afterClass() {
stopHiveMiniCluster();
}
private final String compressionCodec;
public ParquetCompressionCodecTestCase(String compressionCodec) {
this.compressionCodec = compressionCodec;
}
@Override
@Before
public void setUp() {
super.setUp();
createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES);
}
@Test
public void testHiveImportAsParquetWithCompressionCodecCanBeLoaded() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("compression-codec", compressionCodec)
.build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES));
}
@Test
public void testImportedFilesHaveCorrectCodec() throws Exception {
Path tablePath = new Path(hiveMiniCluster.getTempFolderPath() + "/" + getTableName().toLowerCase());
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("compression-codec", compressionCodec)
.build();
runImport(args);
CompressionCodecName codec = new ParquetReader(tablePath).getCodec();
assertEquals(compressionCodec, codec.name().toLowerCase());
}
}
public static class GeneralParquetTestCase extends ImportJobTestCase {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@BeforeClass
public static void beforeClass() {
startHiveMiniCluster();
}
@AfterClass
public static void afterClass() {
stopHiveMiniCluster();
}
@Override
@Before
public void setUp() {
super.setUp();
createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES);
}
@Test
public void testNormalHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName()).build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES));
}
@Test
public void testHiveImportAsParquetWithMapColumnJavaAndOriginalColumnNameSucceeds() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-java", "C2#INTEGER=String")
.build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES_MAPPED));
}
/**
* This test case documents that the Avro identifier(C2_INTEGER)
* of a special column name(C2#INTEGER) cannot be used in map-column-java.
* The reason is that org.apache.sqoop.orm.AvroSchemaGenerator#toAvroType(java.lang.String, int)
* which maps the Avro schema type uses the original column name and
* not the Avro identifier but org.apache.sqoop.orm.ClassWriter#toJavaType(java.lang.String, int)
* can map the DAO class field types based on the Avro identifier too so there will be a discrepancy
* between the generated Avro schema types and the DAO class field types.
*/
@Test
public void testHiveImportAsParquetWithMapColumnJavaAndAvroIdentifierFails() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-java", "C2_INTEGER=String")
.build();
expectedException.expect(IOException.class);
runImport(args);
}
/**
* This test case documents that a mapping with the Avro identifier(C2_INTEGER)
* of a special column name(C2#INTEGER) is ignored in map-column-hive.
* The reason is that the column type of the Avro schema and the Hive table must
* be equal and if we would be able to override the Hive column type using map-column-hive
* the inconsistency would cause a Hive error during reading.
*/
@Test
public void testHiveImportAsParquetWithMapColumnHiveAndAvroIdentifierIgnoresMapping() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-hive", "C2_INTEGER=STRING")
.build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES));
}
/**
* This test case documents that the special column name(C2#INTEGER)
* cannot be used in map-column-hive.
* The reason is that Sqoop uses the Avro identifier(C2_INTEGER) as Hive column
* name and there is a check in org.apache.sqoop.hive.TableDefWriter#getCreateTableStmt()
* which verifies that all the columns in map-column-hive are actually valid column names.
* Since C2_INTEGER is used instead of C2#INTEGER the check will fail on the latter.
*/
@Test
public void testHiveImportAsParquetWithMapColumnHiveAndOriginalColumnNameFails() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-hive", "C2#INTEGER=STRING")
.build();
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("No column by the name C2#INTEGERfound while importing data");
runImportThrowingException(args);
}
@Test
public void testAllDataTypesHiveImportAsParquet() throws Exception {
setCurTableName("all_datatypes_table");
createTableWithColTypes(TEST_COLUMN_ALL_TYPES, TEST_COLUMN_ALL_TYPES_VALUES);
String[] args = commonArgs(getConnectString(), getTableName()).build();
runImport(args);
// The result contains a byte[] so we have to use Arrays.deepEquals() to assert.
Object[] firstRow = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()).iterator().next().toArray();
assertTrue(deepEquals(EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES, firstRow));
}
@Test
public void testAppendHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName()).build();
runImport(args);
insertIntoTable(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2);
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES, TEST_COLUMN_VALUES_LINE2));
}
@Test
public void testCreateOverwriteHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("hive-overwrite")
.build();
runImport(args);
// Recreate the test table to contain different test data.
dropTableIfExists(getTableName());
createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2);
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertEquals(asList(TEST_COLUMN_VALUES_LINE2), rows);
}
/**
* --create-hive-table option is now supported with the Hadoop Parquet writer implementation.
*/
@Test
public void testCreateHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("create-hive-table")
.build();
runImport(args);
expectedException.expectMessage("Error executing Hive import.");
runImportThrowingException(args);
}
/**
* This scenario works fine since the Hadoop Parquet writer implementation does not
* check the Parquet schema of the existing files. The exception will be thrown
* by Hive when it tries to read the files with different schema.
*/
@Test
public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
String hiveTableName = "hiveImportAsParquetWhenTableExistsWithIncompatibleSchema";
String[] incompatibleSchemaTableTypes = {"INTEGER", "INTEGER", "INTEGER"};
List<Object> incompatibleSchemaTableData = Arrays.<Object>asList(100, 200, 300);
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("hive-table", hiveTableName)
.build();
runImport(args);
// We make sure we create a new table in the test RDBMS.
incrementTableNum();
createTableWithColTypes(incompatibleSchemaTableTypes, incompatibleSchemaTableData);
// Recreate the argument array to pick up the new RDBMS table name.
args = commonArgs(getConnectString(), getTableName())
.withOption("hive-table", hiveTableName)
.build();
runImport(args);
}
}
private static ArgumentArrayBuilder commonArgs(String connectString, String tableName) {
return new ArgumentArrayBuilder()
.withProperty("parquetjob.configurator.implementation", "hadoop")
.withOption("connect", connectString)
.withOption("table", tableName)
.withOption("hive-import")
.withOption("hs2-url", hiveMiniCluster.getUrl())
.withOption("num-mappers", "1")
.withOption("as-parquetfile")
.withOption("delete-target-dir");
}
public static void startHiveMiniCluster() {
hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration());
hiveMiniCluster.start();
hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
}
public static void stopHiveMiniCluster() {
hiveMiniCluster.stop();
}
private static byte[] decodeHex(String hexString) {
try {
return Hex.decodeHex(hexString.toCharArray());
} catch (DecoderException e) {
throw new RuntimeException(e);
}
}
}