blob: 10d90d4b7cebd01dcaf54ca2ded4544fc6f1e2ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.avro;
import java.io.IOException;
import java.util.List;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.ExtendableRelDataType;
import org.apache.drill.exec.planner.types.ExtendableRelDataTypeHolder;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.hadoop.fs.Path;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class AvroDrillTable extends DrillTable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroDrillTable.class);
private final DataFileReader<GenericContainer> reader;
private final SchemaConfig schemaConfig;
private ExtendableRelDataTypeHolder holder;
public AvroDrillTable(String storageEngineName,
FileSystemPlugin plugin,
SchemaConfig schemaConfig,
FormatSelection selection) {
super(storageEngineName, plugin, schemaConfig.getUserName(), selection);
List<Path> asFiles = selection.getAsFiles();
Path path = asFiles.get(0);
this.schemaConfig = schemaConfig;
try {
reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader<GenericContainer>());
} catch (IOException e) {
throw UserException.dataReadError(e).build(logger);
}
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
// ExtendableRelDataTypeHolder is reused to preserve previously added implicit columns
if (holder == null) {
List<RelDataType> typeList = Lists.newArrayList();
List<String> fieldNameList = Lists.newArrayList();
// adds partition columns to RowType since they always present in star queries
List<String> partitions =
ColumnExplorer.getPartitionColumnNames(((FormatSelection) getSelection()).getSelection(), schemaConfig);
for (String partitionName : partitions) {
fieldNameList.add(partitionName);
typeList.add(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true));
}
// adds non-partition table columns to RowType
Schema schema = reader.getSchema();
for (Field field : schema.getFields()) {
fieldNameList.add(field.name());
typeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
}
holder = new ExtendableRelDataTypeHolder(
typeFactory.createStructType(typeList, fieldNameList).getFieldList(),
ColumnExplorer.getImplicitColumnsNames(schemaConfig));
}
return new ExtendableRelDataType(holder, typeFactory);
}
private RelDataType getNullableRelDataTypeFromAvroType(
RelDataTypeFactory typeFactory, Schema fieldSchema) {
LogicalType logicalType = fieldSchema.getLogicalType();
String logicalTypeName = logicalType != null ? logicalType.getName() : "";
RelDataType relDataType = null;
switch (fieldSchema.getType()) {
case ARRAY:
RelDataType eleType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getElementType());
relDataType = typeFactory.createArrayType(eleType, -1);
break;
case BOOLEAN:
relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
break;
case BYTES:
switch (logicalTypeName) {
case "decimal":
relDataType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
break;
default:
relDataType = typeFactory.createSqlType(SqlTypeName.BINARY);
}
break;
case DOUBLE:
relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
break;
case FIXED:
switch (logicalTypeName) {
case "decimal":
relDataType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
break;
default:
logger.error("{} type not supported", fieldSchema.getType());
throw UserException.unsupportedError().message("FIXED type not supported yet").build(logger);
}
break;
case FLOAT:
relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
break;
case INT:
switch (logicalTypeName) {
case "date":
relDataType = typeFactory.createSqlType(SqlTypeName.DATE);
break;
case "time-millis":
relDataType = typeFactory.createSqlType(SqlTypeName.TIME);
break;
default:
relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
}
break;
case LONG:
switch (logicalTypeName) {
case "date":
relDataType = typeFactory.createSqlType(SqlTypeName.DATE);
break;
case "time-micros":
relDataType = typeFactory.createSqlType(SqlTypeName.TIME);
break;
case "timestamp-millis":
case "timestamp-micros":
relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
break;
default:
relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
}
break;
case MAP:
RelDataType valueType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getValueType());
RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
relDataType = typeFactory.createMapType(keyType, valueType);
break;
case NULL:
relDataType = typeFactory.createSqlType(SqlTypeName.NULL);
break;
case RECORD:
// List<String> fieldNameList = Lists.newArrayList();
// List<RelDataType> fieldRelDataTypeList = Lists.newArrayList();
// for(Field field : fieldSchema.getFields()) {
// fieldNameList.add(field.name());
// fieldRelDataTypeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
// }
// relDataType = typeFactory.createStructType(fieldRelDataTypeList, fieldNameList);
//TODO This has to be mapped to struct type but because of calcite issue,
//for now mapping it to map type.
keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
valueType = typeFactory.createSqlType(SqlTypeName.ANY);
relDataType = typeFactory.createMapType(keyType, valueType);
break;
case ENUM:
case STRING:
relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
break;
case UNION:
RelDataType optinalType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getTypes().get(1));
relDataType = typeFactory.createTypeWithNullability(optinalType, true);
break;
}
return relDataType;
}
}