SQOOP-1493: Add ability to import/export true decimal in Avro instead of serializing it to String
(Abraham Elmahrek via Jarek Jarcec Cecho)
diff --git a/LICENSE.txt b/LICENSE.txt
index c36c7ad..48b2c3b 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -362,3 +362,7 @@
For lib/snappy-java-<version>.jar:
The Apache License, Version 2.0
+
+Some parts of the code were copied from the Apache Hive Project:
+
+ The Apache License, Version 2.0
\ No newline at end of file
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 4b2f9ce..d8f1dfc 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -18,7 +18,7 @@
# This properties file lists the versions of the various artifacts we use.
# It drives ivy and the generation of a maven POM
-avro.version=1.7.5
+avro.version=1.8.0-SNAPSHOT
kite-data.version=1.0.0
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
index dffbf6e..90cc9d0 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.avro;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
@@ -50,12 +51,29 @@
* The service class provides methods for creating and converting Avro objects.
*/
public final class AvroUtil {
+ public static boolean isDecimal(Schema.Field field) {
+ return isDecimal(field.schema());
+ }
+
+ public static boolean isDecimal(Schema schema) {
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ for (Schema type : schema.getTypes()) {
+ if (isDecimal(type)) {
+ return true;
+ }
+ }
+
+ return false;
+ } else {
+ return "decimal".equals(schema.getProp(LogicalType.LOGICAL_TYPE_PROP));
+ }
+ }
/**
* Convert a Sqoop's Java representation to Avro representation.
*/
- public static Object toAvro(Object o, boolean bigDecimalFormatString) {
- if (o instanceof BigDecimal) {
+ public static Object toAvro(Object o, Schema.Field field, boolean bigDecimalFormatString) {
+ if (o instanceof BigDecimal && !isDecimal(field)) {
if (bigDecimalFormatString) {
// Returns a string representation of this without an exponent field.
return ((BigDecimal) o).toPlainString();
@@ -111,8 +129,9 @@
Schema schema, boolean bigDecimalFormatString) {
GenericRecord record = new GenericData.Record(schema);
for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
- Object avroObject = toAvro(entry.getValue(), bigDecimalFormatString);
String avroColumn = toAvroColumn(entry.getKey());
+ Schema.Field field = schema.getField(avroColumn);
+ Object avroObject = toAvro(entry.getValue(), field, bigDecimalFormatString);
record.put(avroColumn, avroObject);
}
return record;
@@ -187,7 +206,12 @@
throw new IllegalArgumentException("Only support union with null");
}
case FIXED:
- return new BytesWritable(((GenericFixed) avroObject).bytes());
+ if (isDecimal(schema)) {
+ // Should automatically be a BigDecimal object.
+ return avroObject;
+ } else {
+ return new BytesWritable(((GenericFixed) avroObject).bytes());
+ }
case RECORD:
case ARRAY:
case MAP:
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index e19c17b..bd6e99b 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -100,6 +100,11 @@
*/
public static final String PROP_SPLIT_LIMIT = "split.limit";
+ /**
+ * Enable avro logical types (decimal support only).
+ */
+ public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
+
private ConfigurationConstants() {
// Disable Explicit Object Creation
}
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index d9569c5..f98feb3 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -32,6 +32,8 @@
import java.util.Set;
import java.util.StringTokenizer;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema.Type;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -223,6 +225,22 @@
}
/**
+ * Resolve a database-specific type to Avro logical data type.
+ * @param sqlType sql type
+ * @return avro type
+ */
+ public LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale) {
+ switch (sqlType) {
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ return LogicalTypes.decimal(precision, scale);
+ default:
+ throw new IllegalArgumentException("Cannot convert SQL type "
+ + sqlType + " to avro logical type");
+ }
+ }
+
+ /**
* Return java type for SQL type.
* @param tableName table name
* @param columnName column name
@@ -259,6 +277,20 @@
}
/**
+ * Return avro logical type for SQL type.
+ * @param tableName table name
+ * @param columnName column name
+ * @param sqlType sql type
+ * @param precision precision
+ * @param scale scale
+ * @return avro type
+ */
+ public LogicalType toAvroLogicalType(String tableName, String columnName, int sqlType, Integer precision, Integer scale) {
+ // ignore table name and column name by default.
+ return toAvroLogicalType(sqlType, precision, scale);
+ }
+
+ /**
* Return an unordered mapping from colname to sqltype for
* all columns in a table.
*
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
index 20f056a..76c3458 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
@@ -18,8 +18,10 @@
package org.apache.sqoop.mapreduce;
+import org.apache.avro.Conversions;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
@@ -31,6 +33,14 @@
extends GenericRecordExportMapper<AvroWrapper<GenericRecord>, NullWritable> {
@Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ // Add decimal support
+ ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+ }
+
+ @Override
protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
Context context) throws IOException, InterruptedException {
context.write(toSqoopRecord(key.datum()), NullWritable.get());
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
index 0ea5ca4..450f947 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
@@ -68,8 +68,7 @@
throw new IOException(sqlE);
}
- GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(),
- schema, bigDecimalFormatString);
+ GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema, bigDecimalFormatString);
wrapper.datum(outKey);
context.write(wrapper, NullWritable.get());
}
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
index aed1e72..d95feb0 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
@@ -23,10 +23,12 @@
import java.net.URLDecoder;
import java.util.Map;
+import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
@@ -34,9 +36,9 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import static org.apache.avro.file.CodecFactory.DEFAULT_DEFLATE_LEVEL;
import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
import static org.apache.avro.mapred.AvroOutputFormat.EXT;
import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
@@ -53,6 +55,7 @@
static <T> void configureDataFileWriter(DataFileWriter<T> writer,
TaskAttemptContext context) throws UnsupportedEncodingException {
if (FileOutputFormat.getCompressOutput(context)) {
+ // Default level must be greater than 0.
int level = context.getConfiguration()
.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = context.getConfiguration()
@@ -90,6 +93,9 @@
isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration())
: AvroJob.getOutputSchema(context.getConfiguration());
+ // Add decimal support
+ ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+
final DataFileWriter<T> WRITER =
new DataFileWriter<T>(new ReflectDatumWriter<T>());
diff --git a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
index ab263c1..b60ee42 100644
--- a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
@@ -21,7 +21,9 @@
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
import com.cloudera.sqoop.orm.ClassWriter;
+import org.apache.avro.Conversions;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DefaultStringifier;
@@ -76,6 +78,9 @@
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
+
+ // Add decimal support
+ GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
}
protected SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
diff --git a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java
index 0a693d0..3c31c43 100644
--- a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java
+++ b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java
@@ -19,11 +19,13 @@
package org.apache.sqoop.orm;
import java.io.IOException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
@@ -34,6 +36,7 @@
import com.cloudera.sqoop.manager.ConnManager;
import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.config.ConfigurationConstants;
import org.codehaus.jackson.node.NullNode;
/**
@@ -44,6 +47,20 @@
public static final Log LOG =
LogFactory.getLog(AvroSchemaGenerator.class.getName());
+ /**
+ * Map precision to the number bytes needed for binary conversion.
+ * @see <a href="https://github.com/apache/hive/blob/release-1.1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java#L90">Apache Hive</a>.
+ */
+ public static final int MAX_PRECISION = 38;
+ public static final int PRECISION_TO_BYTE_COUNT[] = new int[MAX_PRECISION];
+ static {
+ for (int prec = 1; prec <= MAX_PRECISION; prec++) {
+ // Estimated number of bytes needed.
+ PRECISION_TO_BYTE_COUNT[prec - 1] = (int)
+ Math.ceil((Math.log(Math.pow(10, prec) - 1) / Math.log(2) + 1) / 8);
+ }
+ }
+
private final SqoopOptions options;
private final ConnManager connManager;
private final String tableName;
@@ -65,14 +82,18 @@
public Schema generate(String schemaNameOverride) throws IOException {
ClassWriter classWriter = new ClassWriter(options, connManager,
tableName, null);
+ Map<String, List<Integer>> columnInfo = classWriter.getColumnInfo();
Map<String, Integer> columnTypes = classWriter.getColumnTypes();
String[] columnNames = classWriter.getColumnNames(columnTypes);
List<Field> fields = new ArrayList<Field>();
for (String columnName : columnNames) {
String cleanedCol = AvroUtil.toAvroIdentifier(ClassWriter.toJavaIdentifier(columnName));
- int sqlType = columnTypes.get(columnName);
- Schema avroSchema = toAvroSchema(sqlType, columnName);
+ List<Integer> columnInfoList = columnInfo.get(columnName);
+ int sqlType = columnInfoList.get(0);
+ Integer precision = columnInfoList.get(1);
+ Integer scale = columnInfoList.get(2);
+ Schema avroSchema = toAvroSchema(sqlType, columnName, precision, scale);
Field field = new Field(cleanedCol, avroSchema, null, NullNode.getInstance());
field.addProp("columnName", columnName);
field.addProp("sqlType", Integer.toString(sqlType));
@@ -98,17 +119,27 @@
*
* @param sqlType Original SQL type (might be overridden by user)
* @param columnName Column name from the query
+ * @param precision Fixed point precision
+ * @param scale Fixed point scale
* @return Schema
*/
- public Schema toAvroSchema(int sqlType, String columnName) {
+ public Schema toAvroSchema(int sqlType, String columnName, Integer precision, Integer scale) {
List<Schema> childSchemas = new ArrayList<Schema>();
childSchemas.add(Schema.create(Schema.Type.NULL));
- childSchemas.add(Schema.create(toAvroType(columnName, sqlType)));
+ if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false)
+ && isLogicalType(sqlType)) {
+ childSchemas.add(
+ toAvroLogicalType(columnName, sqlType, precision, scale)
+ .addToSchema(Schema.create(Type.BYTES))
+ );
+ } else {
+ childSchemas.add(Schema.create(toAvroType(columnName, sqlType)));
+ }
return Schema.createUnion(childSchemas);
}
public Schema toAvroSchema(int sqlType) {
- return toAvroSchema(sqlType, null);
+ return toAvroSchema(sqlType, null, null, null);
}
private Type toAvroType(String columnName, int sqlType) {
@@ -134,4 +165,18 @@
return connManager.toAvroType(tableName, columnName, sqlType);
}
+
+ private LogicalType toAvroLogicalType(String columnName, int sqlType, Integer precision, Integer scale) {
+ return connManager.toAvroLogicalType(tableName, columnName, sqlType, precision, scale);
+ }
+
+ private static boolean isLogicalType(int sqlType) {
+ switch(sqlType) {
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java
index bf40d2c..5202408 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -26,6 +26,7 @@
import java.io.Writer;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -1848,6 +1849,14 @@
}
}
+ protected Map<String, List<Integer>> getColumnInfo() throws IOException {
+ if (options.getCall() == null) {
+ return connManager.getColumnInfo(tableName, options.getSqlQuery());
+ } else {
+ return connManager.getColumnInfoForProcedure(options.getCall());
+ }
+ }
+
/**
* Generate the ORM code for a table object containing the named columns.
* @param columnTypes - mapping from column names to sql types
diff --git a/src/test/com/cloudera/sqoop/TestAvroExport.java b/src/test/com/cloudera/sqoop/TestAvroExport.java
index 5303048..137a6e1 100644
--- a/src/test/com/cloudera/sqoop/TestAvroExport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroExport.java
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -35,6 +36,8 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileWriter;
@@ -301,6 +304,8 @@
}
public void testSupportedAvroTypes() throws IOException, SQLException {
+ GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+
String[] argv = {};
final int TOTAL_RECORDS = 1 * 10;
@@ -308,6 +313,8 @@
Schema fixed = Schema.createFixed("myfixed", null, null, 2);
Schema enumeration = Schema.createEnum("myenum", null, null,
Lists.newArrayList("a", "b"));
+ Schema decimalSchema = LogicalTypes.decimal(3,2)
+ .addToSchema(Schema.createFixed("dec1", null, null, 2));
ColumnGenerator[] gens = new ColumnGenerator[] {
colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"),
@@ -323,6 +330,10 @@
b, "BINARY(2)"),
colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
"a", "VARCHAR(8)"),
+ colGenerator(new BigDecimal("2.00"), decimalSchema,
+ new BigDecimal("2.00"), "DECIMAL(3,2)"),
+ colGenerator("22.00", Schema.create(Schema.Type.STRING),
+ new BigDecimal("22.00"), "DECIMAL(4,2)"),
};
createAvroFile(0, TOTAL_RECORDS, gens);
createTable(gens);
diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java
index af4b481..00d7a95 100644
--- a/src/test/com/cloudera/sqoop/TestAvroImport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroImport.java
@@ -118,12 +118,12 @@
* to those that {@link #getOutputArgv(boolean, String[])}
* returns
*/
- private void avroImportTestHelper(String[] extraArgs, String codec)
- throws IOException {
+ protected void avroImportTestHelper(String[] extraArgs, String codec)
+ throws IOException {
String[] types =
{"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
- "VARBINARY(2)", };
- String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
+ "VARBINARY(2)", "DECIMAL(3,2)"};
+ String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", "'1.00'"};
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, extraArgs));
@@ -142,6 +142,7 @@
checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE);
checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING);
checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES);
+ checkField(fields.get(7), "DATA_COL7", Schema.Type.STRING);
GenericRecord record1 = reader.next();
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
@@ -155,6 +156,7 @@
ByteBuffer b = ((ByteBuffer) object);
assertEquals((byte) 1, b.get(0));
assertEquals((byte) 2, b.get(1));
+ assertEquals("DATA_COL7", "1.00", record1.get("DATA_COL7").toString());
if (codec != null) {
assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC));
@@ -248,7 +250,7 @@
assertEquals("TEST_A_V_R_O", 2015, record1.get("TEST_A_V_R_O"));
}
- private void checkField(Field field, String name, Type type) {
+ protected void checkField(Field field, String name, Type type) {
assertEquals(name, field.name());
assertEquals(Schema.Type.UNION, field.schema().getType());
assertEquals(Schema.Type.NULL, field.schema().getTypes().get(0).getType());
@@ -270,7 +272,7 @@
}
- private DataFileReader<GenericRecord> read(Path filename) throws IOException {
+ protected DataFileReader<GenericRecord> read(Path filename) throws IOException {
Configuration conf = new Configuration();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
@@ -281,7 +283,7 @@
return new DataFileReader<GenericRecord>(fsInput, datumReader);
}
- private void checkSchemaFile(final Schema schema) throws IOException {
+ protected void checkSchemaFile(final Schema schema) throws IOException {
final File schemaFile = new File(schema.getName() + ".avsc");
assertTrue(schemaFile.exists());
assertEquals(schema, new Schema.Parser().parse(schemaFile));