blob: d3bbed10e56b756967283644a11df4a39508bb83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.parquet;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.*;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Converter to convert a Parquet record into a Tajo Tuple.
*/
public class TajoRecordConverter extends GroupConverter {
private final GroupType parquetSchema;
private final Schema tajoReadSchema;
private final int[] projectionMap;
private final int tupleSize;
private final Converter[] converters;
private Tuple currentTuple;
/**
* Creates a new TajoRecordConverter.
*
* @param parquetSchema The Parquet schema of the projection.
* @param tajoReadSchema The Tajo schema of the table.
* @param projectionMap An array mapping the projection column to the column
* index in the table.
*/
public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema,
int[] projectionMap) {
this.parquetSchema = parquetSchema;
this.tajoReadSchema = tajoReadSchema;
this.projectionMap = projectionMap;
this.tupleSize = tajoReadSchema.size();
// The projectionMap.length does not match parquetSchema.getFieldCount()
// when the projection contains NULL_TYPE columns. We will skip over the
// NULL_TYPE columns when we construct the converters and populate the
// NULL_TYPE columns with NullDatums in start().
int index = 0;
this.converters = new Converter[parquetSchema.getFieldCount()];
for (int i = 0; i < projectionMap.length; ++i) {
final int projectionIndex = projectionMap[i];
Column column = tajoReadSchema.getColumn(projectionIndex);
if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
continue;
}
Type type = parquetSchema.getType(index);
final int writeIndex = i;
converters[index] = newConverter(column, type, new ParentValueContainer() {
@Override
void add(Object value) {
TajoRecordConverter.this.set(writeIndex, value);
}
});
++index;
}
}
private void set(int index, Object value) {
currentTuple.put(index, (Datum)value);
}
private Converter newConverter(Column column, Type type,
ParentValueContainer parent) {
DataType dataType = column.getDataType();
switch (dataType.getType()) {
case BOOLEAN:
return new FieldBooleanConverter(parent);
case BIT:
return new FieldBitConverter(parent);
case CHAR:
return new FieldCharConverter(parent);
case INT2:
return new FieldInt2Converter(parent);
case INT4:
return new FieldInt4Converter(parent);
case INT8:
return new FieldInt8Converter(parent);
case FLOAT4:
return new FieldFloat4Converter(parent);
case FLOAT8:
return new FieldFloat8Converter(parent);
case INET4:
return new FieldInet4Converter(parent);
case INET6:
throw new RuntimeException("No converter for INET6");
case TEXT:
return new FieldTextConverter(parent);
case PROTOBUF:
return new FieldProtobufConverter(parent, dataType);
case BLOB:
return new FieldBlobConverter(parent);
case NULL_TYPE:
throw new RuntimeException("No converter for NULL_TYPE.");
default:
throw new RuntimeException("Unsupported data type");
}
}
/**
* Gets the converter for a specific field.
*
* @param fieldIndex Index of the field in the projection.
* @return The converter for the field.
*/
@Override
public Converter getConverter(int fieldIndex) {
return converters[fieldIndex];
}
/**
* Called before processing fields. This method fills any fields that have
* NULL values or have type NULL_TYPE with a NullDatum.
*/
@Override
public void start() {
Datum[] datums = new Datum[projectionMap.length];
Arrays.fill(datums, NullDatum.get());
currentTuple = new VTuple(datums);
}
/**
* Called after all fields have been processed.
*/
@Override
public void end() {
}
/**
* Returns the current record converted by this converter.
*
* @return The current record.
*/
public Tuple getCurrentRecord() {
return currentTuple;
}
static abstract class ParentValueContainer {
/**
* Adds the value to the parent.
*
* @param value The value to add.
*/
abstract void add(Object value);
}
static final class FieldBooleanConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldBooleanConverter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addBoolean(boolean value) {
parent.add(DatumFactory.createBool(value));
}
}
static final class FieldBitConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldBitConverter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addInt(int value) {
parent.add(DatumFactory.createBit((byte)(value & 0xff)));
}
}
static final class FieldCharConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldCharConverter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addBinary(Binary value) {
parent.add(DatumFactory.createChar(value.getBytes()));
}
}
static final class FieldInt2Converter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldInt2Converter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addInt(int value) {
parent.add(DatumFactory.createInt2((short)value));
}
}
static final class FieldInt4Converter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldInt4Converter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addInt(int value) {
parent.add(DatumFactory.createInt4(value));
}
}
static final class FieldInt8Converter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldInt8Converter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addLong(long value) {
parent.add(DatumFactory.createInt8(value));
}
@Override
final public void addInt(int value) {
parent.add(DatumFactory.createInt8(Long.valueOf(value)));
}
}
static final class FieldFloat4Converter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldFloat4Converter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addInt(int value) {
parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
}
@Override
final public void addLong(long value) {
parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
}
@Override
final public void addFloat(float value) {
parent.add(DatumFactory.createFloat4(value));
}
}
static final class FieldFloat8Converter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldFloat8Converter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addInt(int value) {
parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
}
@Override
final public void addLong(long value) {
parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
}
@Override
final public void addFloat(float value) {
parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
}
@Override
final public void addDouble(double value) {
parent.add(DatumFactory.createFloat8(value));
}
}
static final class FieldInet4Converter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldInet4Converter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addBinary(Binary value) {
parent.add(DatumFactory.createInet4(value.getBytes()));
}
}
static final class FieldTextConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldTextConverter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addBinary(Binary value) {
parent.add(DatumFactory.createText(value.getBytes()));
}
}
static final class FieldBlobConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
public FieldBlobConverter(ParentValueContainer parent) {
this.parent = parent;
}
@Override
final public void addBinary(Binary value) {
parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes())));
}
}
static final class FieldProtobufConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
private final DataType dataType;
public FieldProtobufConverter(ParentValueContainer parent,
DataType dataType) {
this.parent = parent;
this.dataType = dataType;
}
@Override
final public void addBinary(Binary value) {
try {
ProtobufDatumFactory factory =
ProtobufDatumFactory.get(dataType.getCode());
Message.Builder builder = factory.newBuilder();
builder.mergeFrom(value.getBytes());
parent.add(factory.createDatum(builder));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
}