blob: b574744818ef35d8f1e2f6ee4175b6daac00aee8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.presto.readers;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import io.prestosql.spi.block.ArrayBlock;
import io.prestosql.spi.block.RowBlock;
import io.prestosql.spi.type.*;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import org.apache.carbondata.presto.CarbonVectorBatch;
import org.apache.carbondata.presto.ColumnarVectorWrapperDirect;
/**
* Class to read the complex type Stream [array/struct/map]
*/
public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
implements PrestoVectorBlockBuilder {
protected int batchSize;
protected Type type;
protected BlockBuilder builder;
public ComplexTypeStreamReader(int batchSize, StructField field) {
super(batchSize, field.getDataType());
this.batchSize = batchSize;
this.type = getType(field);
List<CarbonColumnVector> childrenList = new ArrayList<>();
for (StructField child : field.getChildren()) {
childrenList.add(new ColumnarVectorWrapperDirect(Objects.requireNonNull(
CarbonVectorBatch.createDirectStreamReader(this.batchSize, child.getDataType(), child))));
}
setChildrenVector(childrenList);
this.builder = type.createBlockBuilder(null, batchSize);
}
Type getType(StructField field) {
DataType dataType = field.getDataType();
if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
return VarcharType.VARCHAR;
} else if (dataType == DataTypes.SHORT) {
return SmallintType.SMALLINT;
} else if (dataType == DataTypes.INT) {
return IntegerType.INTEGER;
} else if (dataType == DataTypes.LONG) {
return BigintType.BIGINT;
} else if (dataType == DataTypes.DOUBLE) {
return DoubleType.DOUBLE;
} else if (dataType == DataTypes.FLOAT) {
return RealType.REAL;
} else if (dataType == DataTypes.BOOLEAN) {
return BooleanType.BOOLEAN;
} else if (dataType == DataTypes.BINARY) {
return VarbinaryType.VARBINARY;
} else if (dataType == DataTypes.DATE) {
return DateType.DATE;
} else if (dataType == DataTypes.TIMESTAMP) {
return TimestampType.TIMESTAMP;
} else if (dataType == DataTypes.BYTE) {
return TinyintType.TINYINT;
} else if (DataTypes.isDecimal(dataType)) {
org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
(org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
} else if (DataTypes.isArrayType(dataType)) {
return new ArrayType(getType(field.getChildren().get(0)));
} else if (DataTypes.isStructType(dataType)) {
List<RowType.Field> children = new ArrayList<>();
for (StructField child : field.getChildren()) {
children.add(new RowType.Field(Optional.of(child.getFieldName()), getType(child)));
}
return RowType.from(children);
} else {
throw new UnsupportedOperationException("Unsupported type: " + dataType);
}
}
@Override public Block buildBlock() {
return builder.build();
}
@Override public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
@Override
public void putComplexObject(List<Integer> offsetVector) {
if (type instanceof ArrayType) {
// build child block
Block childBlock = buildChildBlock(getChildrenVector().get(0));
// prepare an offset vector with 0 as initial offset
int[] offsetVectorArray = new int[offsetVector.size() + 1];
for (int i = 1; i <= offsetVector.size(); i++) {
offsetVectorArray[i] = offsetVectorArray[i - 1] + offsetVector.get(i - 1);
}
// prepare Array block
Block arrayBlock = ArrayBlock
.fromElementBlock(offsetVector.size(), Optional.empty(), offsetVectorArray,
childBlock);
for (int position = 0; position < offsetVector.size(); position++) {
type.writeObject(builder, arrayBlock.getObject(position, Block.class));
}
getChildrenVector().get(0).getColumnVector().reset();
} else {
// build child blocks
List<Block> childBlocks = new ArrayList<>(getChildrenVector().size());
for (CarbonColumnVector child : getChildrenVector()) {
childBlocks.add(buildChildBlock(child));
}
// prepare ROW block
Block rowBlock = RowBlock
.fromFieldBlocks(offsetVector.size(), Optional.empty(),
childBlocks.toArray(new Block[0]));
for (int position = 0; position < offsetVector.size(); position++) {
type.writeObject(builder, rowBlock.getObject(position, Block.class));
}
for (CarbonColumnVector child : getChildrenVector()) {
child.getColumnVector().reset();
}
}
}
private Block buildChildBlock(CarbonColumnVector carbonColumnVector) {
DataType dataType = carbonColumnVector.getType();
carbonColumnVector = carbonColumnVector.getColumnVector();
if (dataType == DataTypes.STRING || dataType == DataTypes.BINARY
|| dataType == DataTypes.VARCHAR) {
return ((SliceStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.SHORT) {
return ((ShortStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
return ((IntegerStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.LONG) {
return ((LongStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.DOUBLE) {
return ((DoubleStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.FLOAT) {
return ((FloatStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.TIMESTAMP) {
return ((TimestampStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.BOOLEAN) {
return ((BooleanStreamReader) carbonColumnVector).buildBlock();
} else if (DataTypes.isDecimal(dataType)) {
return ((DecimalSliceStreamReader) carbonColumnVector).buildBlock();
} else if (dataType == DataTypes.BYTE) {
return ((ByteStreamReader) carbonColumnVector).buildBlock();
} else if (DataTypes.isArrayType(dataType) || (DataTypes.isStructType(dataType))) {
return ((ComplexTypeStreamReader) carbonColumnVector).buildBlock();
} else {
throw new UnsupportedOperationException("unsupported for type :" + dataType);
}
}
@Override public void putNull(int rowId) {
builder.appendNull();
}
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
@Override public void putNulls(int rowId, int count) {
for (int i = 0; i < count; i++) {
builder.appendNull();
}
}
}