blob: 773c3f1ba9b330cb60a5705a70fa833116cf3756 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.IntNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.MapType;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.StructField;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.Type;
import com.google.common.collect.Lists;
/**
* Utility class to generate an Impala-compatible Avro Schema from other schemas, e.g.,
* an Impala table, a list of Impala columns, a list of Hive field schemas, etc.
*
* Error behavior: These functions throw an UnsupportedOperationException when failing
* to generate an Impala-compatible Avro schema, e.g., because of an unknown type or a
* type not supported by Impala.
*
* Behavior for TIMESTAMP:
* A TIMESTAMP column definition maps to an Avro STRING and is created as a STRING column,
* because Avro has no binary TIMESTAMP representation. As a result, no Avro table may
* have a TIMESTAMP column.
*/
public class AvroSchemaConverter {
// Arbitrarily chosen schema name and record prefix. Note that
// record names must be unique within an Avro schema.
private static final String DEFAULT_SCHEMA_NAME = "baseRecord";
private static final String RECORD_NAME_PREFIX = "record_";
// Constants for Avro logical types, in particular, for DECIMAL and DATE.
private static final String AVRO_LOGICAL_TYPE = "logicalType";
private static final String PRECISION_PROP_NAME = "precision";
private static final String SCALE_PROP_NAME = "scale";
private static final String AVRO_DECIMAL_TYPE = "decimal";
private static final String AVRO_DATE_TYPE = "date";
// Used to generate unique record names as required by Avro.
private int recordCounter_ = 0;
public static Schema convertColumns(
List<Column> columns, String schemaName) {
AvroSchemaConverter converter = new AvroSchemaConverter();
return converter.convertColumnsImpl(columns, schemaName);
}
public static Schema convertColumnDefs(
List<ColumnDef> colDefs, String schemaName) {
AvroSchemaConverter converter = new AvroSchemaConverter();
return converter.convertColumnDefsImpl(colDefs, schemaName);
}
public static Schema convertFieldSchemas(
List<FieldSchema> fieldSchemas, String schemaName) {
AvroSchemaConverter converter = new AvroSchemaConverter();
return converter.convertFieldSchemasImpl(fieldSchemas, schemaName);
}
private Schema convertColumnsImpl(List<Column> columns, String schemaName) {
List<Schema.Field> avroFields = Lists.newArrayList();
for (Column column: columns) {
final Schema.Field avroField = new Schema.Field(column.getName(),
createAvroSchema(column.getType()), column.getComment(), null);
avroFields.add(avroField);
}
return createAvroRecord(avroFields, schemaName);
}
private Schema convertColumnDefsImpl(List<ColumnDef> colDefs, String schemaName) {
List<Schema.Field> avroFields = Lists.newArrayList();
for (ColumnDef colDef: colDefs) {
final Schema.Field avroField = new Schema.Field(colDef.getColName(),
createAvroSchema(colDef.getType()), colDef.getComment(), null);
avroFields.add(avroField);
}
return createAvroRecord(avroFields, schemaName);
}
private Schema convertFieldSchemasImpl(
List<FieldSchema> fieldSchemas, String schemaName) {
List<Schema.Field> avroFields = Lists.newArrayList();
JsonNode nullDefault = JsonNodeFactory.instance.nullNode();
for (FieldSchema fs: fieldSchemas) {
Type impalaType = Type.parseColumnType(fs.getType());
if (impalaType == null) {
throw new UnsupportedOperationException(
fs.getType() + " is not a suppported Impala type");
}
final Schema.Field avroField = new Schema.Field(fs.getName(),
createAvroSchema(impalaType), fs.getComment(), nullDefault);
avroFields.add(avroField);
}
return createAvroRecord(avroFields, schemaName);
}
private Schema createAvroRecord(List<Schema.Field> avroFields, String schemaName) {
// Name is a required property for an Avro Record.
if (schemaName == null || schemaName.isEmpty()) schemaName = DEFAULT_SCHEMA_NAME;
Schema schema = Schema.createRecord(schemaName, null, null, false);
schema.setFields(avroFields);
return schema;
}
private Schema createAvroSchema(Type impalaType) {
Schema schema = null;
if (impalaType.isScalarType()) {
schema = createScalarSchema((ScalarType) impalaType);
} else if (impalaType.isArrayType()) {
schema = createArraySchema((ArrayType) impalaType);
} else if (impalaType.isMapType()) {
schema = createMapSchema((MapType) impalaType);
} else if (impalaType.isStructType()) {
schema = createRecordSchema((StructType) impalaType);
} else {
throw new UnsupportedOperationException(
impalaType.toSql() + " cannot be converted to an Avro type");
}
// Make the Avro schema nullable.
Schema nullSchema = Schema.create(Schema.Type.NULL);
return Schema.createUnion(Arrays.asList(nullSchema, schema));
}
private Schema createScalarSchema(ScalarType impalaScalarType) {
switch (impalaScalarType.getPrimitiveType()) {
case STRING: return Schema.create(Schema.Type.STRING);
case CHAR: return Schema.create(Schema.Type.STRING);
case VARCHAR: return Schema.create(Schema.Type.STRING);
case TINYINT: return Schema.create(Schema.Type.INT);
case SMALLINT: return Schema.create(Schema.Type.INT);
case INT: return Schema.create(Schema.Type.INT);
case BIGINT: return Schema.create(Schema.Type.LONG);
case BOOLEAN: return Schema.create(Schema.Type.BOOLEAN);
case FLOAT: return Schema.create(Schema.Type.FLOAT);
case DOUBLE: return Schema.create(Schema.Type.DOUBLE);
case TIMESTAMP: return Schema.create(Schema.Type.STRING);
case DATE: return createDateSchema();
case DECIMAL: return createDecimalSchema(impalaScalarType);
default:
throw new UnsupportedOperationException(
impalaScalarType.toSql() + " cannot be converted to an Avro type");
}
}
private Schema createDateSchema() {
Schema dateSchema = Schema.create(Schema.Type.INT);
dateSchema.addProp(AVRO_LOGICAL_TYPE, AVRO_DATE_TYPE);
return dateSchema;
}
private Schema createDecimalSchema(ScalarType impalaDecimalType) {
Schema decimalSchema = Schema.create(Schema.Type.BYTES);
decimalSchema.addProp(AVRO_LOGICAL_TYPE, AVRO_DECIMAL_TYPE);
// precision and scale must be integer values
decimalSchema.addProp(PRECISION_PROP_NAME,
new IntNode(impalaDecimalType.decimalPrecision()));
decimalSchema.addProp(SCALE_PROP_NAME,
new IntNode(impalaDecimalType.decimalScale()));
return decimalSchema;
}
private Schema createArraySchema(ArrayType impalaArrayType) {
Schema elementSchema = createAvroSchema(impalaArrayType.getItemType());
return Schema.createArray(elementSchema);
}
private Schema createMapSchema(MapType impalaMapType) {
// Map keys are always STRING according to the Avro spec.
Schema valueSchema = createAvroSchema(impalaMapType.getValueType());
return Schema.createMap(valueSchema);
}
private Schema createRecordSchema(StructType impalaStructType) {
List<Schema.Field> schemaFields = Lists.newArrayList();
for (StructField structField : impalaStructType.getFields()) {
Schema.Field avroField = new Schema.Field(structField.getName(),
createAvroSchema(structField.getType()), structField.getComment(), null);
schemaFields.add(avroField);
}
// All Avro records in a table must have the name property.
Schema structSchema = Schema.createRecord(
RECORD_NAME_PREFIX + recordCounter_, null, null, false);
++recordCounter_;
structSchema.setFields(schemaFields);
return structSchema;
}
}