SQOOP-1493: Add ability to import/export true decimal in Avro instead of serializing it to String

(Abraham Elmahrek via Jarek Jarcec Cecho)
diff --git a/LICENSE.txt b/LICENSE.txt
index c36c7ad..48b2c3b 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -362,3 +362,7 @@
 For lib/snappy-java-<version>.jar:
 
   The Apache License, Version 2.0
+
+Some parts of the code were copied from the Apache Hive Project:
+
+  The Apache License, Version 2.0
\ No newline at end of file
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 4b2f9ce..d8f1dfc 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -18,7 +18,7 @@
 # This properties file lists the versions of the various artifacts we use.
 # It drives ivy and the generation of a maven POM
 
-avro.version=1.7.5
+avro.version=1.8.0-SNAPSHOT
 
 kite-data.version=1.0.0
 
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
index dffbf6e..90cc9d0 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.avro;
 
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.FileReader;
@@ -50,12 +51,29 @@
  * The service class provides methods for creating and converting Avro objects.
  */
 public final class AvroUtil {
+  public static boolean isDecimal(Schema.Field field) {
+    return isDecimal(field.schema());
+  }
+
+  public static boolean isDecimal(Schema schema) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      for (Schema type : schema.getTypes()) {
+        if (isDecimal(type)) {
+          return true;
+        }
+      }
+
+      return false;
+    } else {
+      return "decimal".equals(schema.getProp(LogicalType.LOGICAL_TYPE_PROP));
+    }
+  }
 
   /**
    * Convert a Sqoop's Java representation to Avro representation.
    */
-  public static Object toAvro(Object o, boolean bigDecimalFormatString) {
-    if (o instanceof BigDecimal) {
+  public static Object toAvro(Object o, Schema.Field field, boolean bigDecimalFormatString) {
+    if (o instanceof BigDecimal && !isDecimal(field)) {
       if (bigDecimalFormatString) {
         // Returns a string representation of this without an exponent field.
         return ((BigDecimal) o).toPlainString();
@@ -111,8 +129,9 @@
       Schema schema, boolean bigDecimalFormatString) {
     GenericRecord record = new GenericData.Record(schema);
     for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
-      Object avroObject = toAvro(entry.getValue(), bigDecimalFormatString);
       String avroColumn = toAvroColumn(entry.getKey());
+      Schema.Field field = schema.getField(avroColumn);
+      Object avroObject = toAvro(entry.getValue(), field, bigDecimalFormatString);
       record.put(avroColumn, avroObject);
     }
     return record;
@@ -187,7 +206,12 @@
           throw new IllegalArgumentException("Only support union with null");
         }
       case FIXED:
-        return new BytesWritable(((GenericFixed) avroObject).bytes());
+        if (isDecimal(schema)) {
+          // Should automatically be a BigDecimal object.
+          return avroObject;
+        } else {
+          return new BytesWritable(((GenericFixed) avroObject).bytes());
+        }
       case RECORD:
       case ARRAY:
       case MAP:
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index e19c17b..bd6e99b 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -100,6 +100,11 @@
    */
   public static final String PROP_SPLIT_LIMIT = "split.limit";
 
+  /**
+   * Enable avro logical types (decimal support only).
+   */
+  public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
+
   private ConfigurationConstants() {
     // Disable Explicit Object Creation
   }
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index d9569c5..f98feb3 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -32,6 +32,8 @@
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema.Type;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -223,6 +225,22 @@
   }
 
   /**
+   * Resolve a database-specific type to Avro logical data type.
+   * @param sqlType     sql type
+   * @return            avro type
+   */
+  public LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale) {
+    switch (sqlType) {
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+        return LogicalTypes.decimal(precision, scale);
+      default:
+        throw new IllegalArgumentException("Cannot convert SQL type "
+            + sqlType + " to avro logical type");
+    }
+  }
+
+  /**
    * Return java type for SQL type.
    * @param tableName   table name
    * @param columnName  column name
@@ -259,6 +277,20 @@
   }
 
   /**
+   * Return avro logical type for SQL type.
+   * @param tableName   table name
+   * @param columnName  column name
+   * @param sqlType     sql type
+   * @param precision   precision
+   * @param scale       scale
+   * @return            avro type
+   */
+  public LogicalType toAvroLogicalType(String tableName, String columnName, int sqlType, Integer precision, Integer scale) {
+    // ignore table name and column name by default.
+    return toAvroLogicalType(sqlType, precision, scale);
+  }
+
+  /**
    * Return an unordered mapping from colname to sqltype for
    * all columns in a table.
    *
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
index 20f056a..76c3458 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
@@ -18,8 +18,10 @@
 
 package org.apache.sqoop.mapreduce;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.hadoop.io.NullWritable;
 
 import java.io.IOException;
@@ -31,6 +33,14 @@
     extends GenericRecordExportMapper<AvroWrapper<GenericRecord>, NullWritable> {
 
   @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+
+    // Add decimal support
+    ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+  }
+
+  @Override
   protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
       Context context) throws IOException, InterruptedException {
     context.write(toSqoopRecord(key.datum()), NullWritable.get());
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
index 0ea5ca4..450f947 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
@@ -68,8 +68,7 @@
       throw new IOException(sqlE);
     }
 
-    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(),
-        schema, bigDecimalFormatString);
+    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema, bigDecimalFormatString);
     wrapper.datum(outKey);
     context.write(wrapper, NullWritable.get());
   }
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
index aed1e72..d95feb0 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
@@ -23,10 +23,12 @@
 import java.net.URLDecoder;
 import java.util.Map;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
@@ -34,9 +36,9 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
+import static org.apache.avro.file.CodecFactory.DEFAULT_DEFLATE_LEVEL;
 import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
 import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
 import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
 import static org.apache.avro.mapred.AvroOutputFormat.EXT;
 import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
@@ -53,6 +55,7 @@
   static <T> void configureDataFileWriter(DataFileWriter<T> writer,
     TaskAttemptContext context) throws UnsupportedEncodingException {
     if (FileOutputFormat.getCompressOutput(context)) {
+      // Default level must be greater than 0.
       int level = context.getConfiguration()
         .getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
       String codecName = context.getConfiguration()
@@ -90,6 +93,9 @@
       isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration())
         : AvroJob.getOutputSchema(context.getConfiguration());
 
+    // Add decimal support
+    ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+
     final DataFileWriter<T> WRITER =
       new DataFileWriter<T>(new ReflectDatumWriter<T>());
 
diff --git a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
index ab263c1..b60ee42 100644
--- a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
@@ -21,7 +21,9 @@
 import com.cloudera.sqoop.lib.SqoopRecord;
 import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
 import com.cloudera.sqoop.orm.ClassWriter;
+import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DefaultStringifier;
@@ -76,6 +78,9 @@
 
     columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
         MapWritable.class);
+
+    // Add decimal support
+    GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
   }
 
   protected SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
diff --git a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java
index 0a693d0..3c31c43 100644
--- a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java
+++ b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java
@@ -19,11 +19,13 @@
 package org.apache.sqoop.orm;
 
 import java.io.IOException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -34,6 +36,7 @@
 import com.cloudera.sqoop.manager.ConnManager;
 import org.apache.sqoop.avro.AvroUtil;
 
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.codehaus.jackson.node.NullNode;
 
 /**
@@ -44,6 +47,20 @@
   public static final Log LOG =
       LogFactory.getLog(AvroSchemaGenerator.class.getName());
 
+  /**
+   * Map precision to the number bytes needed for binary conversion.
+   * @see <a href="https://github.com/apache/hive/blob/release-1.1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java#L90">Apache Hive</a>.
+   */
+  public static final int MAX_PRECISION = 38;
+  public static final int PRECISION_TO_BYTE_COUNT[] = new int[MAX_PRECISION];
+  static {
+    for (int prec = 1; prec <= MAX_PRECISION; prec++) {
+      // Estimated number of bytes needed.
+      PRECISION_TO_BYTE_COUNT[prec - 1] = (int)
+          Math.ceil((Math.log(Math.pow(10, prec) - 1) / Math.log(2) + 1) / 8);
+    }
+  }
+
   private final SqoopOptions options;
   private final ConnManager connManager;
   private final String tableName;
@@ -65,14 +82,18 @@
   public Schema generate(String schemaNameOverride) throws IOException {
     ClassWriter classWriter = new ClassWriter(options, connManager,
         tableName, null);
+    Map<String, List<Integer>> columnInfo = classWriter.getColumnInfo();
     Map<String, Integer> columnTypes = classWriter.getColumnTypes();
     String[] columnNames = classWriter.getColumnNames(columnTypes);
 
     List<Field> fields = new ArrayList<Field>();
     for (String columnName : columnNames) {
       String cleanedCol = AvroUtil.toAvroIdentifier(ClassWriter.toJavaIdentifier(columnName));
-      int sqlType = columnTypes.get(columnName);
-      Schema avroSchema = toAvroSchema(sqlType, columnName);
+      List<Integer> columnInfoList = columnInfo.get(columnName);
+      int sqlType = columnInfoList.get(0);
+      Integer precision = columnInfoList.get(1);
+      Integer scale = columnInfoList.get(2);
+      Schema avroSchema = toAvroSchema(sqlType, columnName, precision, scale);
       Field field = new Field(cleanedCol, avroSchema, null,  NullNode.getInstance());
       field.addProp("columnName", columnName);
       field.addProp("sqlType", Integer.toString(sqlType));
@@ -98,17 +119,27 @@
    *
    * @param sqlType Original SQL type (might be overridden by user)
    * @param columnName Column name from the query
+   * @param precision Fixed point precision
+   * @param scale Fixed point scale
    * @return Schema
    */
-  public Schema toAvroSchema(int sqlType, String columnName) {
+  public Schema toAvroSchema(int sqlType, String columnName, Integer precision, Integer scale) {
     List<Schema> childSchemas = new ArrayList<Schema>();
     childSchemas.add(Schema.create(Schema.Type.NULL));
-    childSchemas.add(Schema.create(toAvroType(columnName, sqlType)));
+    if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false)
+        && isLogicalType(sqlType)) {
+      childSchemas.add(
+          toAvroLogicalType(columnName, sqlType, precision, scale)
+              .addToSchema(Schema.create(Type.BYTES))
+      );
+    } else {
+      childSchemas.add(Schema.create(toAvroType(columnName, sqlType)));
+    }
     return Schema.createUnion(childSchemas);
   }
 
   public Schema toAvroSchema(int sqlType) {
-    return toAvroSchema(sqlType, null);
+    return toAvroSchema(sqlType, null, null, null);
   }
 
   private Type toAvroType(String columnName, int sqlType) {
@@ -134,4 +165,18 @@
 
     return connManager.toAvroType(tableName, columnName, sqlType);
   }
+
+  private LogicalType toAvroLogicalType(String columnName, int sqlType, Integer precision, Integer scale) {
+    return connManager.toAvroLogicalType(tableName, columnName, sqlType, precision, scale);
+  }
+
+  private static boolean isLogicalType(int sqlType) {
+    switch(sqlType) {
+      case Types.DECIMAL:
+      case Types.NUMERIC:
+        return true;
+      default:
+        return false;
+    }
+  }
 }
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java
index bf40d2c..5202408 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -26,6 +26,7 @@
 import java.io.Writer;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -1848,6 +1849,14 @@
     }
   }
 
+  protected Map<String, List<Integer>> getColumnInfo() throws IOException {
+    if (options.getCall() == null) {
+      return connManager.getColumnInfo(tableName, options.getSqlQuery());
+    } else {
+      return connManager.getColumnInfoForProcedure(options.getCall());
+    }
+  }
+
   /**
    * Generate the ORM code for a table object containing the named columns.
    * @param columnTypes - mapping from column names to sql types
diff --git a/src/test/com/cloudera/sqoop/TestAvroExport.java b/src/test/com/cloudera/sqoop/TestAvroExport.java
index 5303048..137a6e1 100644
--- a/src/test/com/cloudera/sqoop/TestAvroExport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroExport.java
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -35,6 +36,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.file.DataFileWriter;
@@ -301,6 +304,8 @@
   }
 
   public void testSupportedAvroTypes() throws IOException, SQLException {
+    GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+
     String[] argv = {};
     final int TOTAL_RECORDS = 1 * 10;
 
@@ -308,6 +313,8 @@
     Schema fixed = Schema.createFixed("myfixed", null, null, 2);
     Schema enumeration = Schema.createEnum("myenum", null, null,
         Lists.newArrayList("a", "b"));
+    Schema decimalSchema = LogicalTypes.decimal(3,2)
+        .addToSchema(Schema.createFixed("dec1", null, null, 2));
 
     ColumnGenerator[] gens = new ColumnGenerator[] {
       colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"),
@@ -323,6 +330,10 @@
           b, "BINARY(2)"),
       colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
           "a", "VARCHAR(8)"),
+      colGenerator(new BigDecimal("2.00"), decimalSchema,
+          new BigDecimal("2.00"), "DECIMAL(3,2)"),
+      colGenerator("22.00", Schema.create(Schema.Type.STRING),
+          new BigDecimal("22.00"), "DECIMAL(4,2)"),
     };
     createAvroFile(0, TOTAL_RECORDS, gens);
     createTable(gens);
diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java
index af4b481..00d7a95 100644
--- a/src/test/com/cloudera/sqoop/TestAvroImport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroImport.java
@@ -118,12 +118,12 @@
    *                  to those that {@link #getOutputArgv(boolean, String[])}
    *                  returns
    */
-  private void avroImportTestHelper(String[] extraArgs, String codec)
-    throws IOException {
+  protected void avroImportTestHelper(String[] extraArgs, String codec)
+      throws IOException {
     String[] types =
       {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
-        "VARBINARY(2)", };
-    String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
+        "VARBINARY(2)", "DECIMAL(3,2)"};
+    String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", "'1.00'"};
     createTableWithColTypes(types, vals);
 
     runImport(getOutputArgv(true, extraArgs));
@@ -142,6 +142,7 @@
     checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE);
     checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING);
     checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES);
+    checkField(fields.get(7), "DATA_COL7", Schema.Type.STRING);
 
     GenericRecord record1 = reader.next();
     assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
@@ -155,6 +156,7 @@
     ByteBuffer b = ((ByteBuffer) object);
     assertEquals((byte) 1, b.get(0));
     assertEquals((byte) 2, b.get(1));
+    assertEquals("DATA_COL7", "1.00", record1.get("DATA_COL7").toString());
 
     if (codec != null) {
       assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC));
@@ -248,7 +250,7 @@
     assertEquals("TEST_A_V_R_O", 2015, record1.get("TEST_A_V_R_O"));
   }
 
-  private void checkField(Field field, String name, Type type) {
+  protected void checkField(Field field, String name, Type type) {
     assertEquals(name, field.name());
     assertEquals(Schema.Type.UNION, field.schema().getType());
     assertEquals(Schema.Type.NULL, field.schema().getTypes().get(0).getType());
@@ -270,7 +272,7 @@
 
   }
 
-  private DataFileReader<GenericRecord> read(Path filename) throws IOException {
+  protected DataFileReader<GenericRecord> read(Path filename) throws IOException {
     Configuration conf = new Configuration();
     if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
       conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
@@ -281,7 +283,7 @@
     return new DataFileReader<GenericRecord>(fsInput, datumReader);
   }
 
-  private void checkSchemaFile(final Schema schema) throws IOException {
+  protected void checkSchemaFile(final Schema schema) throws IOException {
     final File schemaFile = new File(schema.getName() + ".avsc");
     assertTrue(schemaFile.exists());
     assertEquals(schema, new Schema.Parser().parse(schemaFile));