SQOOP-3396: Add parquet numeric support for Parquet in Hive import

(Fero Szabo via Szabolcs Vasas)

This closes #60
diff --git a/src/java/org/apache/sqoop/hive/HiveTypes.java b/src/java/org/apache/sqoop/hive/HiveTypes.java
index 554a036..49a33a6 100644
--- a/src/java/org/apache/sqoop/hive/HiveTypes.java
+++ b/src/java/org/apache/sqoop/hive/HiveTypes.java
@@ -23,6 +23,11 @@
 import org.apache.avro.Schema;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.config.ConfigurationConstants;
+
+import static org.apache.avro.LogicalTypes.Decimal;
 
 /**
  * Defines conversion between SQL types and Hive types.
@@ -37,6 +42,7 @@
   private static final String HIVE_TYPE_STRING = "STRING";
   private static final String HIVE_TYPE_BOOLEAN = "BOOLEAN";
   private static final String HIVE_TYPE_BINARY = "BINARY";
+  private static final String HIVE_TYPE_DECIMAL = "DECIMAL";
 
   public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
 
@@ -83,27 +89,58 @@
       }
   }
 
-  public static String toHiveType(Schema.Type avroType) {
-      switch (avroType) {
-        case BOOLEAN:
-          return HIVE_TYPE_BOOLEAN;
-        case INT:
-          return HIVE_TYPE_INT;
-        case LONG:
-          return HIVE_TYPE_BIGINT;
-        case FLOAT:
-          return HIVE_TYPE_FLOAT;
-        case DOUBLE:
-          return HIVE_TYPE_DOUBLE;
-        case STRING:
-        case ENUM:
-          return HIVE_TYPE_STRING;
-        case BYTES:
-        case FIXED:
-          return HIVE_TYPE_BINARY;
-        default:
-          return null;
+  public static String toHiveType(Schema schema, SqoopOptions options) {
+    if (schema.getType() == Schema.Type.UNION) {
+      for (Schema subSchema : schema.getTypes()) {
+        if (subSchema.getType() != Schema.Type.NULL) {
+          return toHiveType(subSchema, options);
+        }
       }
+    }
+
+    Schema.Type avroType = schema.getType();
+    switch (avroType) {
+      case BOOLEAN:
+        return HIVE_TYPE_BOOLEAN;
+      case INT:
+        return HIVE_TYPE_INT;
+      case LONG:
+        return HIVE_TYPE_BIGINT;
+      case FLOAT:
+        return HIVE_TYPE_FLOAT;
+      case DOUBLE:
+        return HIVE_TYPE_DOUBLE;
+      case STRING:
+      case ENUM:
+        return HIVE_TYPE_STRING;
+      case BYTES:
+        return mapToDecimalOrBinary(schema, options);
+      case FIXED:
+        return HIVE_TYPE_BINARY;
+      default:
+        throw new RuntimeException(String.format("There is no Hive type mapping defined for the Avro type of: %s ", avroType.getName()));
+    }
+  }
+
+  private static String mapToDecimalOrBinary(Schema schema, SqoopOptions options) {
+    boolean logicalTypesEnabled = options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, false);
+    if (logicalTypesEnabled && schema.getLogicalType() instanceof Decimal) {
+      Decimal decimal = (Decimal) schema.getLogicalType();
+
+      // trimming precision and scale to Hive's maximum values.
+      int precision = Math.min(HiveDecimal.MAX_PRECISION, decimal.getPrecision());
+      if (precision < decimal.getPrecision()) {
+        LOG.warn("Warning! Precision in the Hive table definition will be smaller than the actual precision of the column on storage! Hive may not be able to read data from this column.");
+      }
+      int scale = Math.min(HiveDecimal.MAX_SCALE, decimal.getScale());
+      if (scale < decimal.getScale()) {
+        LOG.warn("Warning! Scale in the Hive table definition will be smaller than the actual scale of the column on storage! Hive may not be able to read data from this column.");
+      }
+      return String.format("%s (%d, %d)", HIVE_TYPE_DECIMAL, precision, scale);
+    }
+    else {
+      return HIVE_TYPE_BINARY;
+    }
   }
 
   /**
diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java
index b21dfe5..f296897 100644
--- a/src/java/org/apache/sqoop/hive/TableDefWriter.java
+++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java
@@ -129,7 +129,7 @@
     }
 
     String [] colNames = getColumnNames();
-    Map<String, Schema.Type> columnNameToAvroType = getColumnNameToAvroTypeMapping();
+    Map<String, Schema> columnNameToAvroFieldSchema = getColumnNameToAvroTypeMapping();
     StringBuilder sb = new StringBuilder();
     if (options.doFailIfHiveTableExists()) {
       if (isHiveExternalTableSet) {
@@ -185,7 +185,7 @@
         Integer colType = columnTypes.get(col);
         hiveColType = getHiveColumnTypeForTextTable(userMapping, col, colType);
       } else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
-        hiveColType = HiveTypes.toHiveType(columnNameToAvroType.get(col));
+        hiveColType = HiveTypes.toHiveType(columnNameToAvroFieldSchema.get(col), options);
       } else {
         throw new RuntimeException("File format is not supported for Hive tables.");
       }
@@ -236,33 +236,19 @@
     return sb.toString();
   }
 
-  private Map<String, Schema.Type> getColumnNameToAvroTypeMapping() {
+  private Map<String, Schema> getColumnNameToAvroTypeMapping() {
     if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
       return Collections.emptyMap();
     }
-    Map<String, Schema.Type> result = new HashMap<>();
+    Map<String, Schema> result = new HashMap<>();
     Schema avroSchema = getAvroSchema();
     for (Schema.Field field : avroSchema.getFields()) {
-      result.put(field.name(), getNonNullAvroType(field.schema()));
+      result.put(field.name(), field.schema());
     }
 
     return result;
   }
 
-  private Schema.Type getNonNullAvroType(Schema schema) {
-    if (schema.getType() != Schema.Type.UNION) {
-      return schema.getType();
-    }
-
-    for (Schema subSchema : schema.getTypes()) {
-      if (subSchema.getType() != Schema.Type.NULL) {
-        return subSchema.getType();
-      }
-    }
-
-    return null;
-  }
-
   private String getHiveColumnTypeForTextTable(Properties userMapping, String columnName, Integer columnType) throws IOException {
     String hiveColType = userMapping.getProperty(columnName);
     if (hiveColType == null) {
diff --git a/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java b/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java
index 1b65267..3792499 100644
--- a/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java
+++ b/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java
@@ -18,7 +18,10 @@
 
 package org.apache.sqoop.hive;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.testcategories.sqooptest.UnitTest;
 import org.apache.sqoop.util.BlockJUnit4ClassRunnerWithParametersFactory;
 import org.junit.Test;
@@ -27,10 +30,17 @@
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.sqoop.hive.HiveTypes.toHiveType;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.apache.avro.Schema.create;
+import static org.apache.avro.Schema.createEnum;
+import static org.apache.avro.Schema.createFixed;
+import static org.apache.avro.Schema.createUnion;
+import static org.apache.avro.Schema.Type;
 
 @Category(UnitTest.class)
 @RunWith(Parameterized.class)
@@ -38,29 +48,51 @@
 public class TestHiveTypesForAvroTypeMapping {
 
   private final String hiveType;
-  private final Schema.Type avroType;
+  private final Schema schema;
+  private final SqoopOptions options;
 
-  @Parameters(name = "hiveType = {0}, avroType = {1}")
+  @Parameters(name = "hiveType = {0}, schema = {1}")
   public static Iterable<? extends Object> parameters() {
     return Arrays.asList(
-        new Object[] {"BOOLEAN", Schema.Type.BOOLEAN},
-        new Object[] {"INT", Schema.Type.INT},
-        new Object[] {"BIGINT", Schema.Type.LONG},
-        new Object[] {"FLOAT", Schema.Type.FLOAT},
-        new Object[] {"DOUBLE", Schema.Type.DOUBLE},
-        new Object[] {"STRING", Schema.Type.ENUM},
-        new Object[] {"STRING", Schema.Type.STRING},
-        new Object[] {"BINARY", Schema.Type.BYTES},
-        new Object[] {"BINARY", Schema.Type.FIXED});
+        new Object[]{"BOOLEAN", create(Type.BOOLEAN), new SqoopOptions()},
+        new Object[]{"INT", create(Type.INT), new SqoopOptions()},
+        new Object[]{"BIGINT", create(Type.LONG), new SqoopOptions()},
+        new Object[]{"FLOAT", create(Type.FLOAT), new SqoopOptions()},
+        new Object[]{"DOUBLE", create(Type.DOUBLE), new SqoopOptions()},
+        new Object[]{"STRING", createEnum("ENUM", "doc", "namespace", new ArrayList<>()), new SqoopOptions()},
+        new Object[]{"STRING", create(Type.STRING), new SqoopOptions()},
+        new Object[]{"BINARY", create(Type.BYTES), new SqoopOptions()},
+        new Object[]{"BINARY", createFixed("Fixed", "doc", "space", 1), new SqoopOptions()},
+        new Object[]{"BINARY", createDecimal(20, 10), new SqoopOptions()},
+        new Object[]{"BINARY", create(Type.BYTES), createSqoopOptionsWithLogicalTypesEnabled()},
+        new Object[]{"DECIMAL (20, 10)", createDecimal(20, 10), createSqoopOptionsWithLogicalTypesEnabled()}
+        );
   }
 
-  public TestHiveTypesForAvroTypeMapping(String hiveType, Schema.Type avroType) {
+  private static SqoopOptions createSqoopOptionsWithLogicalTypesEnabled() {
+    SqoopOptions sqoopOptions = new SqoopOptions();
+    sqoopOptions.getConf().setBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, true);
+    return sqoopOptions;
+  }
+
+  private static Schema createDecimal(int precision, int scale) {
+    List<Schema> childSchemas = new ArrayList<>();
+    childSchemas.add(create(Type.NULL));
+    childSchemas.add(
+        LogicalTypes.decimal(precision, scale)
+            .addToSchema(create(Type.BYTES))
+    );
+    return createUnion(childSchemas);
+  }
+
+  public TestHiveTypesForAvroTypeMapping(String hiveType, Schema schema, SqoopOptions options) {
     this.hiveType = hiveType;
-    this.avroType = avroType;
+    this.schema = schema;
+    this.options = options;
   }
 
   @Test
-  public void testAvroTypeToHiveTypeMapping() throws Exception {
-    assertEquals(hiveType, toHiveType(avroType));
+  public void testAvroTypeToHiveTypeMapping() {
+    assertEquals(hiveType, toHiveType(schema, options));
   }
 }
diff --git a/src/test/org/apache/sqoop/hive/numerictypes/NumericTypesHiveImportTest.java b/src/test/org/apache/sqoop/hive/numerictypes/NumericTypesHiveImportTest.java
new file mode 100644
index 0000000..9d78ce5
--- /dev/null
+++ b/src/test/org/apache/sqoop/hive/numerictypes/NumericTypesHiveImportTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive.numerictypes;
+
+import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
+import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
+import org.apache.sqoop.importjob.configuration.HiveTestConfiguration;
+import org.apache.sqoop.importjob.configuration.MysqlImportJobTestConfiguration;
+import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfiguration;
+import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfigurationForNumber;
+import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationForNumeric;
+import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
+import org.apache.sqoop.importjob.configuration.SqlServerImportJobTestConfiguration;
+import org.apache.sqoop.testcategories.thirdpartytest.MysqlTest;
+import org.apache.sqoop.testcategories.thirdpartytest.OracleTest;
+import org.apache.sqoop.testcategories.thirdpartytest.PostgresqlTest;
+import org.apache.sqoop.testcategories.thirdpartytest.SqlServerTest;
+import org.apache.sqoop.testutil.HiveServer2TestUtil;
+import org.apache.sqoop.testutil.NumericTypesTestUtils;
+import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.MysqlDatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.SqlServerDatabaseAdapter;
+import org.apache.sqoop.util.BlockJUnit4ClassRunnerWithParametersFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITH_PADDING_ONLY;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY;
+
+@RunWith(Enclosed.class)
+public class NumericTypesHiveImportTest {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private static HiveMiniCluster hiveMiniCluster;
+
+  private static HiveServer2TestUtil hiveServer2TestUtil;
+
+  @BeforeClass
+  public static void beforeClass() {
+    startHiveMiniCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    stopHiveMiniCluster();
+  }
+
+  public static void startHiveMiniCluster() {
+    hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration());
+    hiveMiniCluster.start();
+    hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
+  }
+
+  public static void stopHiveMiniCluster() {
+    hiveMiniCluster.stop();
+  }
+
+  @Category(MysqlTest.class)
+  public static class MysqlNumericTypesHiveImportTest extends NumericTypesHiveImportTestBase {
+
+    public MysqlNumericTypesHiveImportTest() {
+      super(new MysqlImportJobTestConfiguration(), NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS, NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY,
+          hiveMiniCluster, hiveServer2TestUtil);
+    }
+
+    @Override
+    public DatabaseAdapter createAdapter() {
+      return new MysqlDatabaseAdapter();
+    }
+  }
+
+  @Category(OracleTest.class)
+  @RunWith(Parameterized.class)
+  @Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
+  public static class OracleNumericTypesHiveImportTest extends NumericTypesHiveImportTestBase {
+
+    @Override
+    public DatabaseAdapter createAdapter() {
+      return new OracleDatabaseAdapter();
+    }
+
+    @Parameterized.Parameters(name = "Config: {0}| failWithoutExtraArgs: {1}| failWithPadding: {2}")
+    public static Iterable<? extends Object> testConfigurations() {
+      return Arrays.asList(
+          new Object[]{new OracleImportJobTestConfigurationForNumber(), FAIL_WITHOUT_EXTRA_ARGS, FAIL_WITH_PADDING_ONLY},
+          new Object[]{new OracleImportJobTestConfiguration(), FAIL_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY}
+      );
+    }
+
+    public OracleNumericTypesHiveImportTest(HiveTestConfiguration configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+      super(configuration, failWithoutExtraArgs, failWithPaddingOnly, hiveMiniCluster, hiveServer2TestUtil);
+    }
+  }
+
+  @Category(PostgresqlTest.class)
+  @RunWith(Parameterized.class)
+  @Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
+  public static class PostgresNumericTypesHiveImportTest extends NumericTypesHiveImportTestBase {
+
+    @Override
+    public DatabaseAdapter createAdapter() {
+      return new PostgresDatabaseAdapter();
+    }
+
+    @Parameterized.Parameters(name = "Config: {0}| failWithoutExtraArgs: {1}| failWithPadding: {2}")
+    public static Iterable<? extends Object> testConfigurations() {
+      return Arrays.asList(
+          new Object[]{new PostgresqlImportJobTestConfigurationForNumeric(), FAIL_WITHOUT_EXTRA_ARGS, FAIL_WITH_PADDING_ONLY},
+          new Object[]{new PostgresqlImportJobTestConfigurationPaddingShouldSucceed(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY}
+      );
+    }
+
+    public PostgresNumericTypesHiveImportTest(HiveTestConfiguration configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+      super(configuration, failWithoutExtraArgs, failWithPaddingOnly, hiveMiniCluster, hiveServer2TestUtil);
+    }
+  }
+
+  @Category(SqlServerTest.class)
+  public static class SqlServerNumericTypesHiveImportTest extends NumericTypesHiveImportTestBase {
+
+    public SqlServerNumericTypesHiveImportTest() {
+      super(new SqlServerImportJobTestConfiguration(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY,
+          hiveMiniCluster, hiveServer2TestUtil);
+    }
+
+    @Override
+    public DatabaseAdapter createAdapter() {
+      return new SqlServerDatabaseAdapter();
+    }
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/hive/numerictypes/NumericTypesHiveImportTestBase.java b/src/test/org/apache/sqoop/hive/numerictypes/NumericTypesHiveImportTestBase.java
new file mode 100644
index 0000000..2abf833
--- /dev/null
+++ b/src/test/org/apache/sqoop/hive/numerictypes/NumericTypesHiveImportTestBase.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive.numerictypes;
+
+import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
+import org.apache.sqoop.importjob.configuration.HiveTestConfiguration;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesImportTestBase;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.HiveServer2TestUtil;
+import org.apache.sqoop.testutil.NumericTypesTestUtils;
+
+import static java.util.Arrays.deepEquals;
+import static org.junit.Assert.assertTrue;
+
+public abstract class NumericTypesHiveImportTestBase<T extends HiveTestConfiguration> extends NumericTypesImportTestBase<T> {
+
+  public NumericTypesHiveImportTestBase(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly,
+                                        HiveMiniCluster hiveMiniCluster, HiveServer2TestUtil hiveServer2TestUtil) {
+    super(configuration, failWithoutExtraArgs, failWithPaddingOnly);
+    this.hiveServer2TestUtil = hiveServer2TestUtil;
+    this.hiveMiniCluster = hiveMiniCluster;
+  }
+
+  private final HiveMiniCluster hiveMiniCluster;
+
+  private final HiveServer2TestUtil hiveServer2TestUtil;
+
+  @Override
+  public ArgumentArrayBuilder getArgsBuilder() {
+    ArgumentArrayBuilder builder = new ArgumentArrayBuilder()
+        .withCommonHadoopFlags()
+        .withProperty("parquetjob.configurator.implementation", "hadoop")
+        .withOption("connect", getAdapter().getConnectionString())
+        .withOption("table", getTableName())
+        .withOption("hive-import")
+        .withOption("hs2-url", hiveMiniCluster.getUrl())
+        .withOption("num-mappers", "1")
+        .withOption("as-parquetfile")
+        .withOption("delete-target-dir");
+    NumericTypesTestUtils.addEnableParquetDecimal(builder);
+    return builder;
+  }
+
+  @Override
+  public void verify() {
+    // The result contains a byte[] so we have to use Arrays.deepEquals() to assert.
+    Object[] firstRow = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()).iterator().next().toArray();
+    Object[] expectedResultsForHive = getConfiguration().getExpectedResultsForHive();
+    assertTrue(deepEquals(expectedResultsForHive, firstRow));
+  }
+}
diff --git a/src/test/org/apache/sqoop/importjob/configuration/HiveTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/HiveTestConfiguration.java
new file mode 100644
index 0000000..1a5ce9e
--- /dev/null
+++ b/src/test/org/apache/sqoop/importjob/configuration/HiveTestConfiguration.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.importjob.configuration;
+
+public interface HiveTestConfiguration extends ImportJobTestConfiguration{
+
+  Object[] getExpectedResultsForHive();
+}
diff --git a/src/test/org/apache/sqoop/importjob/configuration/MysqlImportJobTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/MysqlImportJobTestConfiguration.java
index 4936504..1af1363 100644
--- a/src/test/org/apache/sqoop/importjob/configuration/MysqlImportJobTestConfiguration.java
+++ b/src/test/org/apache/sqoop/importjob/configuration/MysqlImportJobTestConfiguration.java
@@ -18,10 +18,17 @@
 
 package org.apache.sqoop.importjob.configuration;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
-public class MysqlImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
+/**
+ * A note on the expected values here:
+ *
+ * With padding turned on, all of the numbers are expected to be padded with 0s, so that the total number of digits
+ * after the decimal point will be equal to their scale.
+ */
+public class MysqlImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration, HiveTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -40,14 +47,14 @@
   public List<String[]> getSampleData() {
     List<String[]> inputData = new ArrayList<>();
     inputData.add(new String[]{"1", "100.030", "1000000.05", "1000000.05", "1000000.05", "1000000.05",
-        "100.040", "1000000.05", "1000000.05", "1000000.05", "1000000.05"});
+        "100.040", "1000000.05", "1000000.05", "1000000.05", "11111111112222222222333333333344444444445555555555.05"});
     return inputData;
   }
 
   @Override
   public String[] getExpectedResultsForAvro() {
     String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
-        "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
+        "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 11111111112222222222333333333344444444445555555555.05000}";
     String[] expectedResult = new String[1];
     expectedResult[0] = expectedRecord;
     return expectedResult;
@@ -55,7 +62,7 @@
 
   @Override
   public String[] getExpectedResultsForParquet() {
-    String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
+    String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,11111111112222222222333333333344444444445555555555.05000";
     String[] expectedResult = new String[1];
     expectedResult[0] = expectedRecord;
     return expectedResult;
@@ -65,4 +72,26 @@
   public String toString() {
     return getClass().getSimpleName();
   }
+
+  /**
+   * Since Mysql permits a precision that is higher than 65, there is a special test case here for the last column:
+   * - parquet and avro import will be successful, so data will be present on storage
+   * - but Hive won't be able to read it, and returns null.
+   */
+  @Override
+  public Object[] getExpectedResultsForHive() {
+    return new Object[]{
+        new Integer(1),
+        new BigDecimal("100"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("100"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        null
+    };
+  }
 }
diff --git a/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java
index 303a523..31ba7ba 100644
--- a/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java
+++ b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java
@@ -18,14 +18,15 @@
 
 package org.apache.sqoop.importjob.configuration;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * This test configuration intends to cover the fact that oracle stores these types without padding them with 0s,
- * therefore when importing into avro, one has to use the padding feature.
+ * therefore when importing into avro and parquet, one has to use the padding feature.
  */
-public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
+public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration, HiveTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -67,4 +68,20 @@
   public String toString() {
     return getClass().getSimpleName();
   }
+
+  @Override
+  public Object[] getExpectedResultsForHive() {
+    return new Object[]{
+        new BigDecimal("1"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("100"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000")
+    };
+  }
 }
diff --git a/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java
index 96dd077..3843bc4 100644
--- a/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java
+++ b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.importjob.configuration;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -28,8 +29,7 @@
  * Therefore, NUMBER requires special treatment.
  * The user has to specify precision and scale when importing into avro.
  */
-public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
-
+public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration, HiveTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -68,4 +68,14 @@
   public String toString() {
     return getClass().getSimpleName();
   }
+
+  @Override
+  public Object[] getExpectedResultsForHive() {
+    return new Object[]{
+        new BigDecimal("1"),
+        new BigDecimal("100.010"),
+        new BigDecimal("100"),
+        new BigDecimal("100.03000")
+    };
+  }
 }
diff --git a/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java
index 8ba0bdc..04e9c36 100644
--- a/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java
+++ b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.importjob.configuration;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -26,7 +27,7 @@
  * for precision and scale for NUMERIC. Also, important, that the accompanying columns
  *  - NUMERIC(20) and NUMERIC(20, 5) don't get modified.
  */
-public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
+public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration, HiveTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -67,4 +68,14 @@
   public String toString() {
     return getClass().getSimpleName();
   }
+
+  @Override
+  public Object[] getExpectedResultsForHive() {
+    return new Object[]{
+        new Integer(1),
+        new BigDecimal("100.010"),
+        new BigDecimal("100"),
+        new BigDecimal("100.01000")
+    };
+  }
 }
diff --git a/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java
index 45eaf04..9347273 100644
--- a/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java
+++ b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java
@@ -18,15 +18,22 @@
 
 package org.apache.sqoop.importjob.configuration;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
-public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
+/**
+ * Numbers with a scale and precision greater that 38 are expected to work in Parquet and Avro import properly.
+ *
+ * With padding turned on, all of the numbers are expected to be padded with 0s, so that the total number of digits
+ * after the decimal point will be equal to their scale.
+ */
+public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration, HiveTestConfiguration {
 
   @Override
   public String[] getTypes() {
-    String[] columnTypes = {"INT", "NUMERIC(20)", "NUMERIC(20,5)", "NUMERIC(20,0)", "NUMERIC(1000,5)",
-        "DECIMAL(20)", "DECIMAL(20)", "DECIMAL(20,5)", "DECIMAL(20,0)", "DECIMAL(1000,5)"};
+    String[] columnTypes = {"INT", "NUMERIC(20)", "NUMERIC(20,5)", "NUMERIC(20,0)", "NUMERIC(1000,50)",
+        "DECIMAL(20)", "DECIMAL(20)", "DECIMAL(20,5)", "DECIMAL(20,0)", "DECIMAL(1000,50)"};
     return columnTypes;
   }
 
@@ -40,14 +47,14 @@
   public List<String[]> getSampleData() {
     List<String[]> inputData = new ArrayList<>();
     inputData.add(new String[]{"1", "1000000.05", "1000000.05", "1000000.05", "1000000.05",
-        "100.02", "1000000.05", "1000000.05", "1000000.05", "1000000.05"});
+        "100.02", "1000000.05", "1000000.05", "1000000.05", "11111111112222222222333333333344444444445555555555.111111111122222222223333333333444444444455555"});
     return inputData;
   }
 
   @Override
   public String[] getExpectedResultsForAvro() {
-    String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
-        "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
+    String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000000000000000000000000000000000000000000000000, " +
+        "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 11111111112222222222333333333344444444445555555555.11111111112222222222333333333344444444445555500000}";
     String[] expectedResult = new String[1];
     expectedResult[0] = expectedRecord;
     return expectedResult;
@@ -55,7 +62,8 @@
 
   @Override
   public String[] getExpectedResultsForParquet() {
-    String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
+    String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000000000000000000000000000000000000000000000000," +
+        "100,1000000,1000000.05000,1000000,11111111112222222222333333333344444444445555555555.11111111112222222222333333333344444444445555500000";
     String[] expectedResult = new String[1];
     expectedResult[0] = expectedRecord;
     return expectedResult;
@@ -65,4 +73,27 @@
   public String toString() {
     return getClass().getSimpleName();
   }
+
+  /**
+   * Special cases for numbers with a precision or scale higher than 38, i.e. the maximum precision and scale in Hive:
+   * - parquet import will be successful, so data will be present on storage
+   * - but Hive won't be able to read it, and returns null instead of objects.
+   *
+   * Because: Hive has an upper limit of 38 for both precision and scale and won't be able to read the numbers (returns null) above the limit.
+   */
+  @Override
+  public Object[] getExpectedResultsForHive() {
+    return new Object[]{
+        new Integer(1),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        null,
+        new BigDecimal("100"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        null
+    };
+  }
 }
diff --git a/src/test/org/apache/sqoop/importjob/configuration/SqlServerImportJobTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/SqlServerImportJobTestConfiguration.java
index aacbabf..b590539 100644
--- a/src/test/org/apache/sqoop/importjob/configuration/SqlServerImportJobTestConfiguration.java
+++ b/src/test/org/apache/sqoop/importjob/configuration/SqlServerImportJobTestConfiguration.java
@@ -18,10 +18,11 @@
 
 package org.apache.sqoop.importjob.configuration;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
-public class SqlServerImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
+public class SqlServerImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration, HiveTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -65,4 +66,21 @@
   public String toString() {
     return getClass().getSimpleName();
   }
+
+  @Override
+  public Object[] getExpectedResultsForHive() {
+    return new Object[]{
+        new Integer(1),
+        new BigDecimal("100"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("100"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000"),
+        new BigDecimal("1000000"),
+        new BigDecimal("1000000.05000")
+    };
+  }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesAvroImportTestBase.java b/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesAvroImportTestBase.java
new file mode 100644
index 0000000..2f76e40
--- /dev/null
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesAvroImportTestBase.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.importjob.numerictypes;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.AvroTestUtils;
+import org.apache.sqoop.testutil.NumericTypesTestUtils;
+
+public abstract class NumericTypesAvroImportTestBase<T extends AvroTestConfiguration> extends NumericTypesImportTestBase<T>  {
+
+  public static final Log LOG = LogFactory.getLog(NumericTypesAvroImportTestBase.class.getName());
+
+  public NumericTypesAvroImportTestBase(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+    super(configuration, failWithoutExtraArgs, failWithPaddingOnly);
+  }
+
+  @Override
+  public ArgumentArrayBuilder getArgsBuilder() {
+    ArgumentArrayBuilder builder =  new ArgumentArrayBuilder();
+    includeCommonOptions(builder);
+    builder.withOption("as-avrodatafile");
+    NumericTypesTestUtils.addEnableAvroDecimal(builder);
+    return builder;
+  }
+
+  @Override
+  public void verify() {
+    AvroTestUtils.registerDecimalConversionUsageForVerification();
+    AvroTestUtils.verify(configuration.getExpectedResultsForAvro(), getConf(), getTablePath());
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesImportTestBase.java b/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesImportTestBase.java
index 320adb3..f889dbd 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesImportTestBase.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesImportTestBase.java
@@ -20,37 +20,18 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.sqoop.SqoopOptions;
-import org.apache.sqoop.importjob.DatabaseAdapterFactory;
-import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
-import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
-import org.apache.sqoop.testcategories.thirdpartytest.ThirdPartyTest;
+import org.apache.sqoop.importjob.configuration.ImportJobTestConfiguration;
 import org.apache.sqoop.testutil.ArgumentArrayBuilder;
-import org.apache.sqoop.testutil.AvroTestUtils;
-import org.apache.sqoop.testutil.ImportJobTestCase;
-import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
-import org.apache.sqoop.util.ParquetReader;
-import org.junit.After;
+import org.apache.sqoop.testutil.NumericTypesTestUtils;
+import org.apache.sqoop.testutil.ThirdPartyTestBase;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.List;
 
-import static org.apache.sqoop.SqoopOptions.FileLayout.AvroDataFile;
-import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
-import static org.junit.Assert.assertEquals;
-
-@Category(ThirdPartyTest.class)
 /**
  * This test covers the behavior of the Avro import for fixed point decimal types, i.e. NUMBER, NUMERIC
  * and DECIMAL.
@@ -65,97 +46,34 @@
  * 2. Decimal padding during avro or parquet import
  * In case of Oracle and Postgres, Sqoop has to pad the values with 0s to avoid errors.
  */
-public abstract class NumericTypesImportTestBase<T extends AvroTestConfiguration & ParquetTestConfiguration> extends ImportJobTestCase implements DatabaseAdapterFactory {
+public abstract class NumericTypesImportTestBase<T extends ImportJobTestConfiguration> extends ThirdPartyTestBase<T>  {
 
   public static final Log LOG = LogFactory.getLog(NumericTypesImportTestBase.class.getName());
 
-  private Configuration conf = new Configuration();
-
-  private final T configuration;
-  private final DatabaseAdapter adapter;
   private final boolean failWithoutExtraArgs;
   private final boolean failWithPadding;
 
-  // Constants for the basic test case, that doesn't use extra arguments
-  // that are required to avoid errors, i.e. padding and default precision and scale.
-  protected final static boolean SUCCEED_WITHOUT_EXTRA_ARGS = false;
-  protected final static boolean FAIL_WITHOUT_EXTRA_ARGS = true;
-
-  // Constants for the test case that has padding specified but not default precision and scale.
-  protected final static boolean SUCCEED_WITH_PADDING_ONLY = false;
-  protected final static boolean FAIL_WITH_PADDING_ONLY = true;
-
-  private Path tableDirPath;
-
   public NumericTypesImportTestBase(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
-    this.adapter = createAdapter();
-    this.configuration = configuration;
+    super(configuration);
     this.failWithoutExtraArgs = failWithoutExtraArgs;
     this.failWithPadding = failWithPaddingOnly;
   }
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Override
-  protected Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  protected boolean useHsqldbTestServer() {
-    return false;
-  }
-
-  @Override
-  protected String getConnectString() {
-    return adapter.getConnectionString();
-  }
-
-  @Override
-  protected SqoopOptions getSqoopOptions(Configuration conf) {
-    SqoopOptions opts = new SqoopOptions(conf);
-    adapter.injectConnectionParameters(opts);
-    return opts;
-  }
-
-  @Override
-  protected void dropTableIfExists(String table) throws SQLException {
-    adapter.dropTableIfExists(table, getManager());
-  }
-
   @Before
   public void setUp() {
     super.setUp();
-    String[] names = configuration.getNames();
-    String[] types = configuration.getTypes();
-    createTableWithColTypesAndNames(names, types, new String[0]);
-    List<String[]> inputData = configuration.getSampleData();
-    for (String[] input  : inputData) {
-      insertIntoTable(names, types, input);
-    }
     tableDirPath = new Path(getWarehouseDir() + "/" + getTableName());
   }
 
-  @After
-  public void tearDown() {
-    try {
-      dropTableIfExists(getTableName());
-    } catch (SQLException e) {
-      LOG.warn("Error trying to drop table on tearDown: " + e);
-    }
-    super.tearDown();
-  }
+  protected Path tableDirPath;
 
-  private ArgumentArrayBuilder getArgsBuilder(SqoopOptions.FileLayout fileLayout) {
-    ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
-    if (AvroDataFile.equals(fileLayout)) {
-      builder.withOption("as-avrodatafile");
-    }
-    else if (ParquetFile.equals(fileLayout)) {
-      builder.withOption("as-parquetfile");
-    }
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
 
+  abstract public ArgumentArrayBuilder getArgsBuilder();
+  abstract public void verify();
+
+  public ArgumentArrayBuilder includeCommonOptions(ArgumentArrayBuilder builder) {
     return builder.withCommonHadoopFlags(true)
         .withOption("warehouse-dir", getWarehouseDir())
         .withOption("num-mappers", "1")
@@ -163,142 +81,40 @@
         .withOption("connect", getConnectString());
   }
 
-  /**
-   * Adds properties to the given arg builder for decimal precision and scale.
-   * @param builder
-   */
-  private void addPrecisionAndScale(ArgumentArrayBuilder builder) {
-    builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38");
-    builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3");
-  }
-
-  /**
-   * Enables padding for decimals in avro and parquet import.
-   * @param builder
-   */
-  private void addPadding(ArgumentArrayBuilder builder) {
-    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
-  }
-
-  private void addEnableAvroDecimal(ArgumentArrayBuilder builder) {
-    builder.withProperty("sqoop.avro.logical_types.decimal.enable", "true");
-  }
-
-  private void addEnableParquetDecimal(ArgumentArrayBuilder builder) {
-    builder.withProperty("sqoop.parquet.logical_types.decimal.enable", "true");
-  }
-
-  private void configureJunitToExpectFailure(boolean failWithPadding) {
-    if (failWithPadding) {
-      thrown.expect(IOException.class);
-      thrown.expectMessage("Failure during job; return status 1");
-    }
-  }
 
   @Test
-  public void testAvroImportWithoutPadding() throws IOException {
-    configureJunitToExpectFailure(failWithoutExtraArgs);
-    ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
-    addEnableAvroDecimal(builder);
+  public void testImportWithoutPadding() throws IOException {
+    if(failWithoutExtraArgs){
+      NumericTypesTestUtils.configureJunitToExpectFailure(thrown);
+    }
+    ArgumentArrayBuilder builder = getArgsBuilder();
     String[] args = builder.build();
     runImport(args);
     if (!failWithoutExtraArgs) {
-      verify(AvroDataFile);
+      verify();
     }
   }
 
   @Test
-  public void testAvroImportWithPadding() throws IOException {
-    configureJunitToExpectFailure(failWithPadding);
-    ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
-    addEnableAvroDecimal(builder);
-    addPadding(builder);
+  public void testImportWithPadding() throws IOException {
+    if(failWithPadding){
+      NumericTypesTestUtils.configureJunitToExpectFailure(thrown);
+    }
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    NumericTypesTestUtils.addPadding(builder);
     runImport(builder.build());
     if (!failWithPadding) {
-      verify(AvroDataFile);
+      verify();
     }
   }
 
   @Test
-  public void testAvroImportWithDefaultPrecisionAndScale() throws  IOException {
-    ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
-    addEnableAvroDecimal(builder);
-    addPadding(builder);
-    addPrecisionAndScale(builder);
+  public void testImportWithDefaultPrecisionAndScale() throws IOException {
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    NumericTypesTestUtils.addPadding(builder);
+    NumericTypesTestUtils.addPrecisionAndScale(builder);
     runImport(builder.build());
-    verify(AvroDataFile);
+    verify();
   }
 
-  @Test
-  public void testParquetImportWithoutPadding() throws IOException {
-    configureJunitToExpectFailure(failWithoutExtraArgs);
-    ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
-    addEnableParquetDecimal(builder);
-    String[] args = builder.build();
-    runImport(args);
-    if (!failWithoutExtraArgs) {
-      verify(ParquetFile);
-    }
-  }
-
-  @Test
-  public void testParquetImportWithPadding() throws IOException {
-    configureJunitToExpectFailure(failWithPadding);
-    ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
-    addEnableParquetDecimal(builder);
-    addPadding(builder);
-    runImport(builder.build());
-    if (!failWithPadding) {
-      verify(ParquetFile);
-    }
-  }
-
-  @Test
-  public void testParquetImportWithDefaultPrecisionAndScale() throws IOException {
-    ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
-    addEnableParquetDecimal(builder);
-    addPadding(builder);
-    addPrecisionAndScale(builder);
-    runImport(builder.build());
-    verify(ParquetFile);
-  }
-
-  private void verify(SqoopOptions.FileLayout fileLayout) {
-    if (AvroDataFile.equals(fileLayout)) {
-      AvroTestUtils.registerDecimalConversionUsageForVerification();
-      AvroTestUtils.verify(configuration.getExpectedResultsForAvro(), getConf(), getTablePath());
-    } else if (ParquetFile.equals(fileLayout)) {
-      verifyParquetFile();
-    }
-  }
-
-  private void verifyParquetFile() {
-    verifyParquetSchema();
-    verifyParquetContent();
-  }
-
-  private void verifyParquetContent() {
-    ParquetReader reader = new ParquetReader(tableDirPath);
-    assertEquals(Arrays.asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted());
-  }
-
-  private void verifyParquetSchema() {
-    ParquetReader reader = new ParquetReader(tableDirPath);
-    MessageType parquetSchema = reader.readParquetSchema();
-
-    String[] types = configuration.getTypes();
-    for (int i = 0; i < types.length; i ++) {
-      String type = types[i];
-      if (isNumericSqlType(type)) {
-        OriginalType parquetFieldType = parquetSchema.getFields().get(i).getOriginalType();
-        assertEquals(OriginalType.DECIMAL, parquetFieldType);
-      }
-    }
-  }
-
-  private boolean isNumericSqlType(String type) {
-    return type.toUpperCase().startsWith("DECIMAL")
-        || type.toUpperCase().startsWith("NUMBER")
-        || type.toUpperCase().startsWith("NUMERIC");
-  }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesParquetImportTestBase.java b/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesParquetImportTestBase.java
new file mode 100644
index 0000000..770d6d5
--- /dev/null
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/NumericTypesParquetImportTestBase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.importjob.numerictypes;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.NumericTypesTestUtils;
+import org.apache.sqoop.util.ParquetReader;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class NumericTypesParquetImportTestBase<T extends ParquetTestConfiguration> extends NumericTypesImportTestBase<T>  {
+
+  public static final Log LOG = LogFactory.getLog(NumericTypesParquetImportTestBase.class.getName());
+
+  public NumericTypesParquetImportTestBase(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+    super(configuration, failWithoutExtraArgs, failWithPaddingOnly);
+  }
+
+  @Override
+  public ArgumentArrayBuilder getArgsBuilder() {
+    ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
+    includeCommonOptions(builder);
+    builder.withOption("as-parquetfile");
+    NumericTypesTestUtils.addEnableParquetDecimal(builder);
+    return builder;
+  }
+
+  @Override
+  public void verify() {
+    verifyParquetSchema();
+    verifyParquetContent();
+  }
+
+  private void verifyParquetContent() {
+    ParquetReader reader = new ParquetReader(tableDirPath);
+    assertEquals(Arrays.asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted());
+  }
+
+  private void verifyParquetSchema() {
+    ParquetReader reader = new ParquetReader(tableDirPath);
+    MessageType parquetSchema = reader.readParquetSchema();
+
+    String[] types = configuration.getTypes();
+    for (int i = 0; i < types.length; i ++) {
+      String type = types[i];
+      if (isNumericSqlType(type)) {
+        OriginalType parquetFieldType = parquetSchema.getFields().get(i).getOriginalType();
+        assertEquals(OriginalType.DECIMAL, parquetFieldType);
+      }
+    }
+  }
+
+  private boolean isNumericSqlType(String type) {
+    return type.toUpperCase().startsWith("DECIMAL")
+        || type.toUpperCase().startsWith("NUMBER")
+        || type.toUpperCase().startsWith("NUMERIC");
+  }
+}
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/MysqlNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/avro/MysqlNumericTypesAvroImportTest.java
similarity index 72%
rename from src/test/org/apache/sqoop/importjob/numerictypes/MysqlNumericTypesImportTest.java
rename to src/test/org/apache/sqoop/importjob/numerictypes/avro/MysqlNumericTypesAvroImportTest.java
index e39be38..46aa307 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/MysqlNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/avro/MysqlNumericTypesAvroImportTest.java
@@ -16,23 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.avro;
 
 import org.apache.sqoop.importjob.configuration.MysqlImportJobTestConfiguration;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesAvroImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.MysqlTest;
+import org.apache.sqoop.testutil.NumericTypesTestUtils;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.MysqlDatabaseAdapter;
 import org.junit.experimental.categories.Category;
 
 @Category(MysqlTest.class)
-public class MysqlNumericTypesImportTest extends NumericTypesImportTestBase {
+public class MysqlNumericTypesAvroImportTest extends NumericTypesAvroImportTestBase {
 
   @Override
   public DatabaseAdapter createAdapter() {
     return new MysqlDatabaseAdapter();
   }
 
-  public MysqlNumericTypesImportTest() {
-    super(new MysqlImportJobTestConfiguration(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY);
+  public MysqlNumericTypesAvroImportTest() {
+    super(new MysqlImportJobTestConfiguration(), NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS, NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY);
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/OracleNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/avro/OracleNumericTypesAvroImportTest.java
similarity index 77%
rename from src/test/org/apache/sqoop/importjob/numerictypes/OracleNumericTypesImportTest.java
rename to src/test/org/apache/sqoop/importjob/numerictypes/avro/OracleNumericTypesAvroImportTest.java
index 292884b..5ec1296 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/OracleNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/avro/OracleNumericTypesAvroImportTest.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.avro;
 
 import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
 import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfiguration;
 import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfigurationForNumber;
 import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesAvroImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.OracleTest;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter;
@@ -32,10 +33,14 @@
 
 import java.util.Arrays;
 
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITH_PADDING_ONLY;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY;
+
 @Category(OracleTest.class)
 @RunWith(Parameterized.class)
 @Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
-public class OracleNumericTypesImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends NumericTypesImportTestBase<T> {
+public class OracleNumericTypesAvroImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends NumericTypesAvroImportTestBase<T> {
 
   @Override
   public DatabaseAdapter createAdapter() {
@@ -50,7 +55,7 @@
     );
   }
 
-  public OracleNumericTypesImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+  public OracleNumericTypesAvroImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
     super(configuration, failWithoutExtraArgs, failWithPaddingOnly);
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/PostgresNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/avro/PostgresNumericTypesAvroImportTest.java
similarity index 75%
copy from src/test/org/apache/sqoop/importjob/numerictypes/PostgresNumericTypesImportTest.java
copy to src/test/org/apache/sqoop/importjob/numerictypes/avro/PostgresNumericTypesAvroImportTest.java
index 003b27d..59088d1 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/PostgresNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/avro/PostgresNumericTypesAvroImportTest.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.avro;
 
 import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
 import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
 import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationForNumeric;
 import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesAvroImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.PostgresqlTest;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter;
@@ -32,10 +33,15 @@
 
 import java.util.Arrays;
 
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITH_PADDING_ONLY;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY;
+
 @Category(PostgresqlTest.class)
 @RunWith(Parameterized.class)
 @Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
-public class PostgresNumericTypesImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends NumericTypesImportTestBase<T> {
+public class PostgresNumericTypesAvroImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends NumericTypesAvroImportTestBase<T> {
 
   @Override
   public DatabaseAdapter createAdapter() {
@@ -50,7 +56,7 @@
     );
   }
 
-  public PostgresNumericTypesImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+  public PostgresNumericTypesAvroImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
     super(configuration, failWithoutExtraArgs, failWithPaddingOnly);
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/SqlServerNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/avro/SqlServerNumericTypesAvroImportTest.java
similarity index 75%
rename from src/test/org/apache/sqoop/importjob/numerictypes/SqlServerNumericTypesImportTest.java
rename to src/test/org/apache/sqoop/importjob/numerictypes/avro/SqlServerNumericTypesAvroImportTest.java
index 17b94c5..9791de5 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/SqlServerNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/avro/SqlServerNumericTypesAvroImportTest.java
@@ -16,23 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.avro;
 
 import org.apache.sqoop.importjob.configuration.SqlServerImportJobTestConfiguration;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesAvroImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.SqlServerTest;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.SqlServerDatabaseAdapter;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY;
+
 @Category(SqlServerTest.class)
-public class SqlServerNumericTypesImportTest extends NumericTypesImportTestBase {
+public class SqlServerNumericTypesAvroImportTest extends NumericTypesAvroImportTestBase {
 
   @Override
   public DatabaseAdapter createAdapter() {
     return new SqlServerDatabaseAdapter();
   }
 
-  public SqlServerNumericTypesImportTest() {
+  public SqlServerNumericTypesAvroImportTest() {
     super(new SqlServerImportJobTestConfiguration(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY);
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/MysqlNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/MysqlNumericTypesParquetImportTest.java
similarity index 71%
copy from src/test/org/apache/sqoop/importjob/numerictypes/MysqlNumericTypesImportTest.java
copy to src/test/org/apache/sqoop/importjob/numerictypes/parquet/MysqlNumericTypesParquetImportTest.java
index e39be38..a1ff95b 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/MysqlNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/MysqlNumericTypesParquetImportTest.java
@@ -16,23 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.parquet;
 
 import org.apache.sqoop.importjob.configuration.MysqlImportJobTestConfiguration;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesParquetImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.MysqlTest;
+import org.apache.sqoop.testutil.NumericTypesTestUtils;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.MysqlDatabaseAdapter;
 import org.junit.experimental.categories.Category;
 
 @Category(MysqlTest.class)
-public class MysqlNumericTypesImportTest extends NumericTypesImportTestBase {
+public class MysqlNumericTypesParquetImportTest extends NumericTypesParquetImportTestBase {
 
   @Override
   public DatabaseAdapter createAdapter() {
     return new MysqlDatabaseAdapter();
   }
 
-  public MysqlNumericTypesImportTest() {
-    super(new MysqlImportJobTestConfiguration(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY);
+  public MysqlNumericTypesParquetImportTest() {
+    super(new MysqlImportJobTestConfiguration(), NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS,
+        NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY);
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/OracleNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/OracleNumericTypesParquetImportTest.java
similarity index 76%
copy from src/test/org/apache/sqoop/importjob/numerictypes/OracleNumericTypesImportTest.java
copy to src/test/org/apache/sqoop/importjob/numerictypes/parquet/OracleNumericTypesParquetImportTest.java
index 292884b..65ed0eb 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/OracleNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/OracleNumericTypesParquetImportTest.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.parquet;
 
 import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
 import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfiguration;
 import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfigurationForNumber;
 import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesAvroImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.OracleTest;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter;
@@ -32,10 +33,14 @@
 
 import java.util.Arrays;
 
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITH_PADDING_ONLY;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY;
+
 @Category(OracleTest.class)
 @RunWith(Parameterized.class)
 @Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
-public class OracleNumericTypesImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends NumericTypesImportTestBase<T> {
+public class OracleNumericTypesParquetImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends NumericTypesAvroImportTestBase<T> {
 
   @Override
   public DatabaseAdapter createAdapter() {
@@ -50,7 +55,7 @@
     );
   }
 
-  public OracleNumericTypesImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+  public OracleNumericTypesParquetImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
     super(configuration, failWithoutExtraArgs, failWithPaddingOnly);
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/PostgresNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/PostgresNumericTypesParquetImportTest.java
similarity index 75%
rename from src/test/org/apache/sqoop/importjob/numerictypes/PostgresNumericTypesImportTest.java
rename to src/test/org/apache/sqoop/importjob/numerictypes/parquet/PostgresNumericTypesParquetImportTest.java
index 003b27d..6e84c83 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/PostgresNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/PostgresNumericTypesParquetImportTest.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.parquet;
 
-import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
 import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
 import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationForNumeric;
 import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesParquetImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.PostgresqlTest;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter;
@@ -32,10 +32,15 @@
 
 import java.util.Arrays;
 
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.FAIL_WITH_PADDING_ONLY;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY;
+
 @Category(PostgresqlTest.class)
 @RunWith(Parameterized.class)
 @Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
-public class PostgresNumericTypesImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends NumericTypesImportTestBase<T> {
+public class PostgresNumericTypesParquetImportTest<T extends ParquetTestConfiguration> extends NumericTypesParquetImportTestBase<T> {
 
   @Override
   public DatabaseAdapter createAdapter() {
@@ -50,7 +55,7 @@
     );
   }
 
-  public PostgresNumericTypesImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+  public PostgresNumericTypesParquetImportTest(T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
     super(configuration, failWithoutExtraArgs, failWithPaddingOnly);
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/numerictypes/SqlServerNumericTypesImportTest.java b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/SqlServerNumericTypesParquetImportTest.java
similarity index 74%
copy from src/test/org/apache/sqoop/importjob/numerictypes/SqlServerNumericTypesImportTest.java
copy to src/test/org/apache/sqoop/importjob/numerictypes/parquet/SqlServerNumericTypesParquetImportTest.java
index 17b94c5..7f5eb35 100644
--- a/src/test/org/apache/sqoop/importjob/numerictypes/SqlServerNumericTypesImportTest.java
+++ b/src/test/org/apache/sqoop/importjob/numerictypes/parquet/SqlServerNumericTypesParquetImportTest.java
@@ -16,23 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.numerictypes;
+package org.apache.sqoop.importjob.numerictypes.parquet;
 
 import org.apache.sqoop.importjob.configuration.SqlServerImportJobTestConfiguration;
+import org.apache.sqoop.importjob.numerictypes.NumericTypesParquetImportTestBase;
 import org.apache.sqoop.testcategories.thirdpartytest.SqlServerTest;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.SqlServerDatabaseAdapter;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITHOUT_EXTRA_ARGS;
+import static org.apache.sqoop.testutil.NumericTypesTestUtils.SUCCEED_WITH_PADDING_ONLY;
+
 @Category(SqlServerTest.class)
-public class SqlServerNumericTypesImportTest extends NumericTypesImportTestBase {
+public class SqlServerNumericTypesParquetImportTest extends NumericTypesParquetImportTestBase {
 
   @Override
   public DatabaseAdapter createAdapter() {
     return new SqlServerDatabaseAdapter();
   }
 
-  public SqlServerNumericTypesImportTest() {
+  public SqlServerNumericTypesParquetImportTest() {
     super(new SqlServerImportJobTestConfiguration(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY);
   }
 }
diff --git a/src/test/org/apache/sqoop/testutil/NumericTypesTestUtils.java b/src/test/org/apache/sqoop/testutil/NumericTypesTestUtils.java
new file mode 100644
index 0000000..60371b9
--- /dev/null
+++ b/src/test/org/apache/sqoop/testutil/NumericTypesTestUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.testutil;
+
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+public class NumericTypesTestUtils {
+  // Constants for the basic test case, that doesn't use extra arguments
+  // that are required to avoid errors, i.e. padding and default precision and scale.
+  public final static boolean SUCCEED_WITHOUT_EXTRA_ARGS = false;
+  public final static boolean FAIL_WITHOUT_EXTRA_ARGS = true;
+  // Constants for the test case that has padding specified but not default precision and scale.
+  public final static boolean SUCCEED_WITH_PADDING_ONLY = false;
+  public final static boolean FAIL_WITH_PADDING_ONLY = true;
+
+  /**
+   * Adds properties to the given arg builder for decimal precision and scale.
+   * @param builder
+   */
+  public static void addPrecisionAndScale(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38");
+    builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3");
+  }
+
+  /**
+   * Enables padding for decimals in avro and parquet import.
+   * @param builder
+   */
+  public static void addPadding(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
+  }
+
+  public static void addEnableAvroDecimal(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.avro.logical_types.decimal.enable", "true");
+  }
+
+  public static void addEnableParquetDecimal(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.parquet.logical_types.decimal.enable", "true");
+  }
+
+  public static void configureJunitToExpectFailure(ExpectedException thrown) {
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Failure during job; return status 1");
+  }
+}
diff --git a/src/test/org/apache/sqoop/testutil/ThirdPartyTestBase.java b/src/test/org/apache/sqoop/testutil/ThirdPartyTestBase.java
new file mode 100644
index 0000000..a2087e3
--- /dev/null
+++ b/src/test/org/apache/sqoop/testutil/ThirdPartyTestBase.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.testutil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.importjob.DatabaseAdapterFactory;
+import org.apache.sqoop.importjob.configuration.ImportJobTestConfiguration;
+import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
+import org.junit.After;
+import org.junit.Before;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public abstract class ThirdPartyTestBase<T extends ImportJobTestConfiguration> extends ImportJobTestCase implements DatabaseAdapterFactory {
+
+  private final DatabaseAdapter adapter;
+  protected final T configuration;
+
+  private Configuration conf = new Configuration();
+
+  public DatabaseAdapter getAdapter() {
+    return adapter;
+  }
+
+  public T getConfiguration() {
+    return configuration;
+  }
+
+  protected ThirdPartyTestBase(T configuration) {
+    this.adapter = createAdapter();
+    this.configuration = configuration;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return adapter.getConnectionString();
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    adapter.injectConnectionParameters(opts);
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    adapter.dropTableIfExists(table, getManager());
+  }
+
+  @Override
+  protected Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    String[] names = configuration.getNames();
+    String[] types = configuration.getTypes();
+    createTableWithColTypesAndNames(names, types, new String[0]);
+    List<String[]> inputData = configuration.getSampleData();
+    for (String[] input  : inputData) {
+      insertIntoTable(names, types, input);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      dropTableIfExists(getTableName());
+    } catch (SQLException e) {
+      LOG.warn("Error trying to drop table on tearDown: " + e);
+    }
+    super.tearDown();
+  }
+
+}