blob: dfe6af8df75d36e2c8f22c51e1ca5e968509ab04 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.parquet;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import java.util.ArrayList;
import java.util.List;
/**
* Converts between Parquet and Tajo schemas. See package documentation for
* details on the mapping.
*/
public class TajoSchemaConverter {
private static final String TABLE_SCHEMA = "table_schema";
/**
* Creates a new TajoSchemaConverter.
*/
public TajoSchemaConverter() {
}
/**
* Converts a Parquet schema to a Tajo schema.
*
* @param parquetSchema The Parquet schema to convert.
* @return The resulting Tajo schema.
*/
public Schema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getFields());
}
private Schema convertFields(List<Type> parquetFields) {
List<Column> columns = new ArrayList<>();
for (Type fieldType : parquetFields) {
if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
throw new RuntimeException("REPEATED not supported outside LIST or" +
" MAP. Type: " + fieldType);
}
columns.add(convertField(fieldType));
}
Column[] columnsArray = new Column[columns.size()];
columnsArray = columns.toArray(columnsArray);
return new Schema(columnsArray);
}
private Column convertField(final Type fieldType) {
if (fieldType.isPrimitive()) {
return convertPrimitiveField(fieldType);
} else {
return convertComplexField(fieldType);
}
}
private Column convertPrimitiveField(final Type fieldType) {
final String fieldName = fieldType.getName();
final PrimitiveTypeName parquetPrimitiveTypeName =
fieldType.asPrimitiveType().getPrimitiveTypeName();
final OriginalType originalType = fieldType.getOriginalType();
return parquetPrimitiveTypeName.convert(
new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() {
@Override
public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
return new Column(fieldName, TajoDataTypes.Type.BOOLEAN);
}
@Override
public Column convertINT32(PrimitiveTypeName primitiveTypeName) {
return new Column(fieldName, TajoDataTypes.Type.INT4);
}
@Override
public Column convertINT64(PrimitiveTypeName primitiveTypeName) {
return new Column(fieldName, TajoDataTypes.Type.INT8);
}
@Override
public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) {
return new Column(fieldName, TajoDataTypes.Type.FLOAT4);
}
@Override
public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
return new Column(fieldName, TajoDataTypes.Type.FLOAT8);
}
@Override
public Column convertFIXED_LEN_BYTE_ARRAY(
PrimitiveTypeName primitiveTypeName) {
return new Column(fieldName, TajoDataTypes.Type.BLOB);
}
@Override
public Column convertBINARY(PrimitiveTypeName primitiveTypeName) {
if (originalType == OriginalType.UTF8) {
return new Column(fieldName, TajoDataTypes.Type.TEXT);
} else {
return new Column(fieldName, TajoDataTypes.Type.BLOB);
}
}
@Override
public Column convertINT96(PrimitiveTypeName primitiveTypeName) {
throw new RuntimeException("Converting from INT96 not supported.");
}
});
}
private Column convertComplexField(final Type fieldType) {
throw new RuntimeException("Complex types not supported.");
}
/**
* Converts a Tajo schema to a Parquet schema.
*
* @param tajoSchema The Tajo schema to convert.
* @return The resulting Parquet schema.
*/
public MessageType convert(Schema tajoSchema) {
List<Type> types = new ArrayList<>();
for (int i = 0; i < tajoSchema.size(); ++i) {
Column column = tajoSchema.getColumn(i);
if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
continue;
}
types.add(convertColumn(column));
}
return new MessageType(TABLE_SCHEMA, types);
}
private Type convertColumn(Column column) {
TajoDataTypes.Type type = column.getDataType().getType();
switch (type) {
case BOOLEAN:
return primitive(column.getSimpleName(),
PrimitiveTypeName.BOOLEAN);
case BIT:
case INT2:
case INT4:
return primitive(column.getSimpleName(),
PrimitiveTypeName.INT32);
case INT8:
return primitive(column.getSimpleName(),
PrimitiveTypeName.INT64);
case FLOAT4:
return primitive(column.getSimpleName(),
PrimitiveTypeName.FLOAT);
case FLOAT8:
return primitive(column.getSimpleName(),
PrimitiveTypeName.DOUBLE);
case CHAR:
case TEXT:
return primitive(column.getSimpleName(),
PrimitiveTypeName.BINARY,
OriginalType.UTF8);
case PROTOBUF:
return primitive(column.getSimpleName(),
PrimitiveTypeName.BINARY);
case BLOB:
return primitive(column.getSimpleName(),
PrimitiveTypeName.BINARY);
case INET4:
case INET6:
return primitive(column.getSimpleName(),
PrimitiveTypeName.BINARY);
default:
throw new RuntimeException("Cannot convert Tajo type: " + type);
}
}
private PrimitiveType primitive(String name,
PrimitiveTypeName primitive) {
return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null);
}
private PrimitiveType primitive(String name,
PrimitiveTypeName primitive,
OriginalType originalType) {
return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name,
originalType);
}
}