blob: 4471779dfadceeb3897249f25e275b90ce477bbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.sources.orc;
import org.apache.flink.table.api.types.BooleanType;
import org.apache.flink.table.api.types.ByteArrayType;
import org.apache.flink.table.api.types.ByteType;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.DateType;
import org.apache.flink.table.api.types.DecimalType;
import org.apache.flink.table.api.types.DoubleType;
import org.apache.flink.table.api.types.FloatType;
import org.apache.flink.table.api.types.IntType;
import org.apache.flink.table.api.types.LongType;
import org.apache.flink.table.api.types.ShortType;
import org.apache.flink.table.api.types.StringType;
import org.apache.flink.table.api.types.TimestampType;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.vector.TypeGetVector;
import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
import java.util.Arrays;
/**
* A column vector class wrapping hive's ColumnVector.
* This column vector is used to adapt hive's ColumnVector to Flink's ColumnVector.
*/
public class OrcColumnVector extends TypeGetVector {
private DataType fieldType;
private ColumnVector baseData;
private LongColumnVector longData;
private DoubleColumnVector doubleData;
private BytesColumnVector bytesData;
private DecimalColumnVector decimalData;
private TimestampColumnVector timestampData;
public OrcColumnVector(DataType fieldType, ColumnVector vector) {
super(vector.isNull.length);
this.fieldType = fieldType;
baseData = vector;
if (vector instanceof LongColumnVector) {
longData = (LongColumnVector) vector;
} else if (vector instanceof DoubleColumnVector) {
doubleData = (DoubleColumnVector) vector;
} else if (vector instanceof BytesColumnVector) {
bytesData = (BytesColumnVector) vector;
} else if (vector instanceof DecimalColumnVector) {
decimalData = (DecimalColumnVector) vector;
} else if (vector instanceof TimestampColumnVector) {
timestampData = (TimestampColumnVector) vector;
} else {
throw new UnsupportedOperationException();
}
}
@Override
public Object get(int index) {
if (baseData.isNull[index]) {
return null;
}
if (baseData.isRepeating) {
index = 0;
}
if (fieldType instanceof BooleanType) {
return longData.vector[index] == 1;
} else if (fieldType instanceof ByteType) {
return (byte) longData.vector[index];
} else if (fieldType instanceof ShortType) {
return (short) longData.vector[index];
} else if (fieldType instanceof IntType) {
return (int) longData.vector[index];
} else if (fieldType instanceof LongType) {
return longData.vector[index];
} else if (fieldType instanceof FloatType) {
return (float) doubleData.vector[index];
} else if (fieldType instanceof DoubleType) {
return doubleData.vector[index];
} else if (fieldType instanceof StringType ||
fieldType instanceof ByteArrayType) {
byte[][] data = bytesData.vector;
int[] start = bytesData.start;
int[] length = bytesData.length;
return Arrays.copyOfRange(data[index], start[index], start[index] + length[index]);
} else if (fieldType instanceof DecimalType) {
HiveDecimalWritable[] data = decimalData.vector;
int precision = ((DecimalType) fieldType).precision();
int scala = ((DecimalType) fieldType).scale();
Decimal decimal = Decimal.fromBigDecimal(data[index].getHiveDecimal().bigDecimalValue(), precision, scala);
if (Decimal.is32BitDecimal(precision)) {
return (int) (decimal.toUnscaledLong());
} else if (Decimal.is64BitDecimal(precision)) {
return decimal.toUnscaledLong();
} else {
return decimal.toUnscaledBytes();
}
} else if (fieldType instanceof DateType) {
return (int) longData.vector[index];
} else if (fieldType instanceof TimestampType) {
return (timestampData.time[index] + timestampData.nanos[index] / 1000000);
} else {
throw new UnsupportedOperationException("Unsupported Data Type: " + fieldType);
}
}
@Override
public boolean getBoolean(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
return longData.vector[rowId] == 1;
}
@Override
public byte getByte(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
return (byte) longData.vector[rowId];
}
@Override
public short getShort(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
return (short) longData.vector[rowId];
}
@Override
public int getInt(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
return (int) longData.vector[rowId];
}
@Override
public long getLong(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
if (fieldType instanceof TimestampType) {
return (timestampData.time[rowId] + timestampData.nanos[rowId] / 1000000);
} else {
return longData.vector[rowId];
}
}
@Override
public float getFloat(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
return (float) doubleData.vector[rowId];
}
@Override
public double getDouble(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
return doubleData.vector[rowId];
}
@Override
public VectorizedColumnBatch.ByteArray getByteArray(int rowId) {
if (baseData.isRepeating) {
rowId = 0;
}
byte[][] data = bytesData.vector;
int[] start = bytesData.start;
int[] length = bytesData.length;
return new VectorizedColumnBatch.ByteArray(data[rowId], start[rowId], length[rowId]);
}
@Override
public Decimal getDecimal(int rowId, int precision, int scala) {
if (baseData.isRepeating) {
rowId = 0;
}
HiveDecimalWritable[] data = decimalData.vector;
return Decimal.fromBigDecimal(data[rowId].getHiveDecimal().bigDecimalValue(), precision, scala);
}
@Override
public void setElement(
int outElementNum,
int inputElementNum,
org.apache.flink.table.dataformat.vector.ColumnVector inputVector) {
throw new UnsupportedOperationException();
}
public void setNullInfo(ColumnVector vector) {
System.arraycopy(vector.isNull, 0, isNull, 0, vector.isNull.length);
noNulls = vector.noNulls;
}
}