blob: 1437aad67d19ef8919ec131fd72fbbb054542bee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.orc;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.DoubleFunction;
import java.util.function.Function;
import java.util.function.LongFunction;
/**
* A class that provides utility methods for orc file reading.
*/
class OrcBatchReader {
private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
/**
* Converts an ORC schema to a Flink TypeInformation.
*
* @param schema The ORC schema.
* @return The TypeInformation that corresponds to the ORC schema.
*/
static TypeInformation schemaToTypeInfo(TypeDescription schema) {
switch (schema.getCategory()) {
case BOOLEAN:
return BasicTypeInfo.BOOLEAN_TYPE_INFO;
case BYTE:
return BasicTypeInfo.BYTE_TYPE_INFO;
case SHORT:
return BasicTypeInfo.SHORT_TYPE_INFO;
case INT:
return BasicTypeInfo.INT_TYPE_INFO;
case LONG:
return BasicTypeInfo.LONG_TYPE_INFO;
case FLOAT:
return BasicTypeInfo.FLOAT_TYPE_INFO;
case DOUBLE:
return BasicTypeInfo.DOUBLE_TYPE_INFO;
case DECIMAL:
return BasicTypeInfo.BIG_DEC_TYPE_INFO;
case STRING:
case CHAR:
case VARCHAR:
return BasicTypeInfo.STRING_TYPE_INFO;
case DATE:
return SqlTimeTypeInfo.DATE;
case TIMESTAMP:
return SqlTimeTypeInfo.TIMESTAMP;
case BINARY:
return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
case STRUCT:
List<TypeDescription> fieldSchemas = schema.getChildren();
TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()];
for (int i = 0; i < fieldSchemas.size(); i++) {
fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i));
}
String[] fieldNames = schema.getFieldNames().toArray(new String[]{});
return new RowTypeInfo(fieldTypes, fieldNames);
case LIST:
TypeDescription elementSchema = schema.getChildren().get(0);
TypeInformation<?> elementType = schemaToTypeInfo(elementSchema);
// arrays of primitive types are handled as object arrays to support null values
return ObjectArrayTypeInfo.getInfoFor(elementType);
case MAP:
TypeDescription keySchema = schema.getChildren().get(0);
TypeDescription valSchema = schema.getChildren().get(1);
TypeInformation<?> keyType = schemaToTypeInfo(keySchema);
TypeInformation<?> valType = schemaToTypeInfo(valSchema);
return new MapTypeInfo<>(keyType, valType);
case UNION:
throw new UnsupportedOperationException("UNION type is not supported yet.");
default:
throw new IllegalArgumentException("Unknown type " + schema);
}
}
/**
* Fills an ORC batch into an array of Row.
*
* @param rows The batch of rows need to be filled.
* @param schema The schema of the ORC data.
* @param batch The ORC data.
* @param selectedFields The list of selected ORC fields.
* @return The number of rows that were filled.
*/
static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) {
int rowsToRead = Math.min((int) batch.count(), rows.length);
List<TypeDescription> fieldTypes = schema.getChildren();
// read each selected field
for (int fieldIdx = 0; fieldIdx < selectedFields.length; fieldIdx++) {
int orcIdx = selectedFields[fieldIdx];
readField(rows, fieldIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], rowsToRead);
}
return rowsToRead;
}
/**
* Reads a vector of data into an array of objects.
*
* @param vals The array that needs to be filled.
* @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled.
* Otherwise a -1 must be passed and the data is directly filled into the array.
* @param schema The schema of the vector to read.
* @param vector The vector to read.
* @param childCount The number of vector entries to read.
*/
private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, int childCount) {
// check the type of the vector to decide how to read it.
switch (schema.getCategory()) {
case BOOLEAN:
if (vector.noNulls) {
readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readBoolean);
} else {
readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readBoolean);
}
break;
case BYTE:
if (vector.noNulls) {
readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readByte);
} else {
readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readByte);
}
break;
case SHORT:
if (vector.noNulls) {
readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readShort);
} else {
readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readShort);
}
break;
case INT:
if (vector.noNulls) {
readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readInt);
} else {
readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readInt);
}
break;
case LONG:
if (vector.noNulls) {
readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readLong);
} else {
readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readLong);
}
break;
case FLOAT:
if (vector.noNulls) {
readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readFloat);
} else {
readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readFloat);
}
break;
case DOUBLE:
if (vector.noNulls) {
readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readDouble);
} else {
readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readDouble);
}
break;
case CHAR:
case VARCHAR:
case STRING:
if (vector.noNulls) {
readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, childCount);
} else {
readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, childCount);
}
break;
case DATE:
if (vector.noNulls) {
readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, childCount);
} else {
readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, childCount);
}
break;
case TIMESTAMP:
if (vector.noNulls) {
readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, childCount);
} else {
readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, childCount);
}
break;
case BINARY:
if (vector.noNulls) {
readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, childCount);
} else {
readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, childCount);
}
break;
case DECIMAL:
if (vector.noNulls) {
readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, childCount);
} else {
readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, childCount);
}
break;
case STRUCT:
if (vector.noNulls) {
readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, childCount);
} else {
readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, childCount);
}
break;
case LIST:
if (vector.noNulls) {
readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, childCount);
} else {
readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, childCount);
}
break;
case MAP:
if (vector.noNulls) {
readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, childCount);
} else {
readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, childCount);
}
break;
case UNION:
throw new UnsupportedOperationException("UNION type not supported yet");
default:
throw new IllegalArgumentException("Unknown type " + schema);
}
}
private static <T> void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector,
int childCount, LongFunction<T> reader) {
if (vector.isRepeating) { // fill complete column with first value
T repeatingValue = reader.apply(vector.vector[0]);
fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
} else {
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
vals[i] = reader.apply(vector.vector[i]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
}
}
}
}
private static <T> void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector,
int childCount, DoubleFunction<T> reader) {
if (vector.isRepeating) { // fill complete column with first value
T repeatingValue = reader.apply(vector.vector[0]);
fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
} else {
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
vals[i] = reader.apply(vector.vector[i]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
}
}
}
}
private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
if (bytes.isRepeating) { // fill complete column with first value
String repeatingValue = readString(bytes.vector[0], bytes.start[0], bytes.length[0]);
fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
} else {
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, readString(bytes.vector[i], bytes.start[i], bytes.length[i]));
}
}
}
}
private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
if (bytes.isRepeating) { // fill complete column with first value
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
// don't reuse repeating val to avoid object mutation
vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
// don't reuse repeating val to avoid object mutation
rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
}
}
} else {
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
}
}
}
}
private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) {
if (vector.isRepeating) { // fill complete column with first value
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
// do not reuse repeated value due to mutability of Date
vals[i] = readDate(vector.vector[0]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
// do not reuse repeated value due to mutability of Date
rows[i].setField(fieldIdx, readDate(vector.vector[0]));
}
}
} else {
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
vals[i] = readDate(vector.vector[i]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, readDate(vector.vector[i]));
}
}
}
}
private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) {
if (vector.isRepeating) { // fill complete column with first value
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
// do not reuse value to prevent object mutation
vals[i] = readTimestamp(vector.time[0], vector.nanos[0]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
// do not reuse value to prevent object mutation
rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0]));
}
}
} else {
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
vals[i] = readTimestamp(vector.time[i], vector.nanos[i]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i]));
}
}
}
}
private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) {
if (vector.isRepeating) { // fill complete column with first value
fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
} else {
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
vals[i] = readBigDecimal(vector.vector[i]);
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
}
}
}
}
private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, int childCount) {
List<TypeDescription> childrenTypes = schema.getChildren();
int numFields = childrenTypes.size();
// create a batch of Rows to read the structs
Row[] structs = new Row[childCount];
// TODO: possible improvement: reuse existing Row objects
for (int i = 0; i < childCount; i++) {
structs[i] = new Row(numFields);
}
// read struct fields
// we don't have to handle isRepeating because ORC assumes that it is propagated into the children.
for (int i = 0; i < numFields; i++) {
readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount);
}
if (fieldIdx == -1) { // set struct as an object
System.arraycopy(structs, 0, vals, 0, childCount);
} else { // set struct as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, structs[i]);
}
}
}
private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, int childCount) {
TypeDescription fieldType = schema.getChildren().get(0);
// get class of list elements
Class<?> classType = getClassForType(fieldType);
if (list.isRepeating) {
int offset = (int) list.offsets[0];
int length = (int) list.lengths[0];
// we only need to read until offset + length.
int entriesToRead = offset + length;
// read children
Object[] children = (Object[]) Array.newInstance(classType, entriesToRead);
readField(children, -1, fieldType, list.child, entriesToRead);
// get function to copy list
Function<Object, Object> copyList = getCopyFunction(schema);
// create first list that will be copied
Object[] first;
if (offset == 0) {
first = children;
} else {
first = (Object[]) Array.newInstance(classType, length);
System.arraycopy(children, offset, first, 0, length);
}
// create copies of first list and set copies as result
for (int i = 0; i < childCount; i++) {
Object[] copy = (Object[]) copyList.apply(first);
if (fieldIdx == -1) {
vals[i] = copy;
} else {
((Row) vals[i]).setField(fieldIdx, copy);
}
}
} else {
// read children
Object[] children = (Object[]) Array.newInstance(classType, list.childCount);
readField(children, -1, fieldType, list.child, list.childCount);
// fill lists with children
for (int i = 0; i < childCount; i++) {
int offset = (int) list.offsets[i];
int length = (int) list.lengths[i];
Object[] temp = (Object[]) Array.newInstance(classType, length);
System.arraycopy(children, offset, temp, 0, length);
if (fieldIdx == -1) {
vals[i] = temp;
} else {
((Row) vals[i]).setField(fieldIdx, temp);
}
}
}
}
private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, int childCount) {
List<TypeDescription> fieldType = schema.getChildren();
TypeDescription keyType = fieldType.get(0);
TypeDescription valueType = fieldType.get(1);
ColumnVector keys = mapsVector.keys;
ColumnVector values = mapsVector.values;
if (mapsVector.isRepeating) {
// first map is repeated
// get map copy function
Function<Object, Object> copyMap = getCopyFunction(schema);
// set all key and value entries except those of the first map to null
int offset = (int) mapsVector.offsets[0];
int length = (int) mapsVector.lengths[0];
// we only need to read until offset + length.
int entriesToRead = offset + length;
Object[] keyRows = new Object[entriesToRead];
Object[] valueRows = new Object[entriesToRead];
// read map keys and values
readField(keyRows, -1, keyType, keys, entriesToRead);
readField(valueRows, -1, valueType, values, entriesToRead);
// create first map that will be copied
HashMap map = readHashMap(keyRows, valueRows, offset, length);
// copy first map and set copy as result
for (int i = 0; i < childCount; i++) {
if (fieldIdx == -1) {
vals[i] = copyMap.apply(map);
} else {
((Row) vals[i]).setField(fieldIdx, copyMap.apply(map));
}
}
} else {
Object[] keyRows = new Object[mapsVector.childCount];
Object[] valueRows = new Object[mapsVector.childCount];
// read map keys and values
readField(keyRows, -1, keyType, keys, keyRows.length);
readField(valueRows, -1, valueType, values, valueRows.length);
long[] lengthVectorMap = mapsVector.lengths;
int offset = 0;
for (int i = 0; i < childCount; i++) {
long numMapEntries = lengthVectorMap[i];
HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries);
offset += numMapEntries;
if (fieldIdx == -1) {
vals[i] = map;
} else {
((Row) vals[i]).setField(fieldIdx, map);
}
}
}
}
private static <T> void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector,
int childCount, LongFunction<T> reader) {
if (vector.isRepeating) { // fill complete column with first value
if (vector.isNull[0]) {
// fill vals with null values
fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
} else {
// read repeating non-null value by forwarding call.
readNonNullLongColumn(vals, fieldIdx, vector, childCount, reader);
}
} else {
boolean[] isNullVector = vector.isNull;
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = reader.apply(vector.vector[i]);
}
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
}
}
}
}
}
private static <T> void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector,
int childCount, DoubleFunction<T> reader) {
if (vector.isRepeating) { // fill complete column with first value
if (vector.isNull[0]) {
// fill vals with null values
fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
} else {
// read repeating non-null value by forwarding call
readNonNullDoubleColumn(vals, fieldIdx, vector, childCount, reader);
}
} else {
boolean[] isNullVector = vector.isNull;
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = reader.apply(vector.vector[i]);
}
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
}
}
}
}
}
private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
if (bytes.isRepeating) { // fill complete column with first value
if (bytes.isNull[0]) {
// fill vals with null values
fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
} else {
// read repeating non-null value by forwarding call
readNonNullBytesColumnAsString(vals, fieldIdx, bytes, childCount);
}
} else {
boolean[] isNullVector = bytes.isNull;
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]);
}
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, readString(bytes.vector[i], bytes.start[i], bytes.length[i]));
}
}
}
}
}
private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
if (bytes.isRepeating) { // fill complete column with first value
if (bytes.isNull[0]) {
// fill vals with null values
fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
} else {
// read repeating non-null value by forwarding call
readNonNullBytesColumnAsBinary(vals, fieldIdx, bytes, childCount);
}
} else {
boolean[] isNullVector = bytes.isNull;
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
}
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
}
}
}
}
}
private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) {
if (vector.isRepeating) { // fill complete column with first value
if (vector.isNull[0]) {
// fill vals with null values
fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
} else {
// read repeating non-null value by forwarding call
readNonNullLongColumnAsDate(vals, fieldIdx, vector, childCount);
}
} else {
boolean[] isNullVector = vector.isNull;
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = readDate(vector.vector[i]);
}
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, readDate(vector.vector[i]));
}
}
}
}
}
private static void readTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) {
if (vector.isRepeating) { // fill complete column with first value
if (vector.isNull[0]) {
// fill vals with null values
fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
} else {
// read repeating non-null value by forwarding call
readNonNullTimestampColumn(vals, fieldIdx, vector, childCount);
}
} else {
boolean[] isNullVector = vector.isNull;
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
vals[i] = ts;
}
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
rows[i].setField(fieldIdx, ts);
}
}
}
}
}
private static void readDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) {
if (vector.isRepeating) { // fill complete column with first value
if (vector.isNull[0]) {
// fill vals with null values
fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
} else {
// read repeating non-null value by forwarding call
readNonNullDecimalColumn(vals, fieldIdx, vector, childCount);
}
} else {
boolean[] isNullVector = vector.isNull;
if (fieldIdx == -1) { // set as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = readBigDecimal(vector.vector[i]);
}
}
} else { // set as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
}
}
}
}
}
private static void readStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, int childCount) {
List<TypeDescription> childrenTypes = schema.getChildren();
int numFields = childrenTypes.size();
// Early out if struct column is repeating and always null.
// This is the only repeating case we need to handle.
// ORC assumes that repeating values have been pushed to the children.
if (structVector.isRepeating && structVector.isNull[0]) {
if (fieldIdx < 0) {
for (int i = 0; i < childCount; i++) {
vals[i] = null;
}
} else {
for (int i = 0; i < childCount; i++) {
((Row) vals[i]).setField(fieldIdx, null);
}
}
return;
}
// create a batch of Rows to read the structs
Row[] structs = new Row[childCount];
// TODO: possible improvement: reuse existing Row objects
for (int i = 0; i < childCount; i++) {
structs[i] = new Row(numFields);
}
// read struct fields
for (int i = 0; i < numFields; i++) {
ColumnVector fieldVector = structVector.fields[i];
if (!fieldVector.isRepeating) {
// Reduce fieldVector reads by setting all entries null where struct is null.
if (fieldVector.noNulls) {
// fieldVector had no nulls. Just use struct null information.
System.arraycopy(structVector.isNull, 0, fieldVector.isNull, 0, structVector.isNull.length);
structVector.fields[i].noNulls = false;
} else {
// fieldVector had nulls. Merge field nulls with struct nulls.
for (int j = 0; j < structVector.isNull.length; j++) {
structVector.fields[i].isNull[j] = structVector.isNull[j] || structVector.fields[i].isNull[j];
}
}
}
readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount);
}
boolean[] isNullVector = structVector.isNull;
if (fieldIdx == -1) { // set struct as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = structs[i];
}
}
} else { // set struct as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, structs[i]);
}
}
}
}
private static void readListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, int childCount) {
TypeDescription fieldType = schema.getChildren().get(0);
// get class of list elements
Class<?> classType = getClassForType(fieldType);
if (list.isRepeating) {
// list values are repeating. we only need to read the first list and copy it.
if (list.isNull[0]) {
// Even better. The first list is null and so are all lists are null
for (int i = 0; i < childCount; i++) {
if (fieldIdx == -1) {
vals[i] = null;
} else {
((Row) vals[i]).setField(fieldIdx, null);
}
}
} else {
// Get function to copy list
Function<Object, Object> copyList = getCopyFunction(schema);
int offset = (int) list.offsets[0];
int length = (int) list.lengths[0];
// we only need to read until offset + length.
int entriesToRead = offset + length;
// read entries
Object[] children = (Object[]) Array.newInstance(classType, entriesToRead);
readField(children, -1, fieldType, list.child, entriesToRead);
// create first list which will be copied
Object[] temp;
if (offset == 0) {
temp = children;
} else {
temp = (Object[]) Array.newInstance(classType, length);
System.arraycopy(children, offset, temp, 0, length);
}
// copy repeated list and set copy as result
for (int i = 0; i < childCount; i++) {
Object[] copy = (Object[]) copyList.apply(temp);
if (fieldIdx == -1) {
vals[i] = copy;
} else {
((Row) vals[i]).setField(fieldIdx, copy);
}
}
}
} else {
if (!list.child.isRepeating) {
boolean[] childIsNull = new boolean[list.childCount];
Arrays.fill(childIsNull, true);
// forward info of null lists into child vector
for (int i = 0; i < childCount; i++) {
// preserve isNull info of entries of non-null lists
if (!list.isNull[i]) {
int offset = (int) list.offsets[i];
int length = (int) list.lengths[i];
System.arraycopy(list.child.isNull, offset, childIsNull, offset, length);
}
}
// override isNull of children vector
list.child.isNull = childIsNull;
list.child.noNulls = false;
}
// read children
Object[] children = (Object[]) Array.newInstance(classType, list.childCount);
readField(children, -1, fieldType, list.child, list.childCount);
Object[] temp;
// fill lists with children
for (int i = 0; i < childCount; i++) {
if (list.isNull[i]) {
temp = null;
} else {
int offset = (int) list.offsets[i];
int length = (int) list.lengths[i];
temp = (Object[]) Array.newInstance(classType, length);
System.arraycopy(children, offset, temp, 0, length);
}
if (fieldIdx == -1) {
vals[i] = temp;
} else {
((Row) vals[i]).setField(fieldIdx, temp);
}
}
}
}
private static void readMapColumn(Object[] vals, int fieldIdx, MapColumnVector map, TypeDescription schema, int childCount) {
List<TypeDescription> fieldType = schema.getChildren();
TypeDescription keyType = fieldType.get(0);
TypeDescription valueType = fieldType.get(1);
ColumnVector keys = map.keys;
ColumnVector values = map.values;
if (map.isRepeating) {
// map values are repeating. we only need to read the first map and copy it.
if (map.isNull[0]) {
// Even better. The first map is null and so are all maps are null
for (int i = 0; i < childCount; i++) {
if (fieldIdx == -1) {
vals[i] = null;
} else {
((Row) vals[i]).setField(fieldIdx, null);
}
}
} else {
// Get function to copy map
Function<Object, Object> copyMap = getCopyFunction(schema);
int offset = (int) map.offsets[0];
int length = (int) map.lengths[0];
// we only need to read until offset + length.
int entriesToRead = offset + length;
Object[] keyRows = new Object[entriesToRead];
Object[] valueRows = new Object[entriesToRead];
// read map keys and values
readField(keyRows, -1, keyType, keys, entriesToRead);
readField(valueRows, -1, valueType, values, entriesToRead);
// create first map which will be copied
HashMap temp = readHashMap(keyRows, valueRows, offset, length);
// copy repeated map and set copy as result
for (int i = 0; i < childCount; i++) {
if (fieldIdx == -1) {
vals[i] = copyMap.apply(temp);
} else {
((Row) vals[i]).setField(fieldIdx, copyMap.apply(temp));
}
}
}
} else {
// ensure only keys and values that are referenced by non-null maps are set to non-null
if (!keys.isRepeating) {
// propagate is null info of map into keys vector
boolean[] keyIsNull = new boolean[map.childCount];
Arrays.fill(keyIsNull, true);
for (int i = 0; i < childCount; i++) {
// preserve isNull info for keys of non-null maps
if (!map.isNull[i]) {
int offset = (int) map.offsets[i];
int length = (int) map.lengths[i];
System.arraycopy(keys.isNull, offset, keyIsNull, offset, length);
}
}
// override isNull of keys vector
keys.isNull = keyIsNull;
keys.noNulls = false;
}
if (!values.isRepeating) {
// propagate is null info of map into values vector
boolean[] valIsNull = new boolean[map.childCount];
Arrays.fill(valIsNull, true);
for (int i = 0; i < childCount; i++) {
// preserve isNull info for vals of non-null maps
if (!map.isNull[i]) {
int offset = (int) map.offsets[i];
int length = (int) map.lengths[i];
System.arraycopy(values.isNull, offset, valIsNull, offset, length);
}
}
// override isNull of values vector
values.isNull = valIsNull;
values.noNulls = false;
}
Object[] keyRows = new Object[map.childCount];
Object[] valueRows = new Object[map.childCount];
// read map keys and values
readField(keyRows, -1, keyType, keys, keyRows.length);
readField(valueRows, -1, valueType, values, valueRows.length);
boolean[] isNullVector = map.isNull;
long[] lengths = map.lengths;
long[] offsets = map.offsets;
if (fieldIdx == -1) { // set map as an object
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
vals[i] = null;
} else {
vals[i] = readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i]);
}
}
} else { // set map as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
if (isNullVector[i]) {
rows[i].setField(fieldIdx, null);
} else {
rows[i].setField(fieldIdx, readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i]));
}
}
}
}
}
/**
* Sets a repeating value to all objects or row fields of the passed vals array.
*
* @param vals The array of objects or Rows.
* @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
* Otherwise a -1 must be passed and the data is directly filled into the array.
* @param repeatingValue The value that is set.
* @param childCount The number of times the value is set.
*/
private static void fillColumnWithRepeatingValue(Object[] vals, int fieldIdx, Object repeatingValue, int childCount) {
if (fieldIdx == -1) {
// set value as an object
Arrays.fill(vals, 0, childCount, repeatingValue);
} else {
// set value as a field of Row
Row[] rows = (Row[]) vals;
for (int i = 0; i < childCount; i++) {
rows[i].setField(fieldIdx, repeatingValue);
}
}
}
private static Class<?> getClassForType(TypeDescription schema) {
// check the type of the vector to decide how to read it.
switch (schema.getCategory()) {
case BOOLEAN:
return Boolean.class;
case BYTE:
return Byte.class;
case SHORT:
return Short.class;
case INT:
return Integer.class;
case LONG:
return Long.class;
case FLOAT:
return Float.class;
case DOUBLE:
return Double.class;
case CHAR:
case VARCHAR:
case STRING:
return String.class;
case DATE:
return Date.class;
case TIMESTAMP:
return Timestamp.class;
case BINARY:
return byte[].class;
case DECIMAL:
return BigDecimal.class;
case STRUCT:
return Row.class;
case LIST:
Class<?> childClass = getClassForType(schema.getChildren().get(0));
return Array.newInstance(childClass, 0).getClass();
case MAP:
return HashMap.class;
case UNION:
throw new UnsupportedOperationException("UNION type not supported yet");
default:
throw new IllegalArgumentException("Unknown type " + schema);
}
}
private static Boolean readBoolean(long l) {
return l != 0;
}
private static Byte readByte(long l) {
return (byte) l;
}
private static Short readShort(long l) {
return (short) l;
}
private static Integer readInt(long l) {
return (int) l;
}
private static Long readLong(long l) {
return l;
}
private static Float readFloat(double d) {
return (float) d;
}
private static Double readDouble(double d) {
return d;
}
private static Date readDate(long l) {
// day to milliseconds
final long t = l * MILLIS_PER_DAY;
// adjust by local timezone
return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
}
private static String readString(byte[] bytes, int start, int length) {
return new String(bytes, start, length, StandardCharsets.UTF_8);
}
private static byte[] readBinary(byte[] src, int srcPos, int length) {
byte[] result = new byte[length];
System.arraycopy(src, srcPos, result, 0, length);
return result;
}
private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal();
return hiveDecimal.bigDecimalValue();
}
private static Timestamp readTimestamp(long time, int nanos) {
Timestamp ts = new Timestamp(time);
ts.setNanos(nanos);
return ts;
}
private static HashMap readHashMap(Object[] keyRows, Object[] valueRows, int offset, long length) {
HashMap<Object, Object> resultMap = new HashMap<>();
for (int j = 0; j < length; j++) {
resultMap.put(keyRows[offset], valueRows[offset]);
offset++;
}
return resultMap;
}
@SuppressWarnings("unchecked")
private static Function<Object, Object> getCopyFunction(TypeDescription schema) {
// check the type of the vector to decide how to read it.
switch (schema.getCategory()) {
case BOOLEAN:
case BYTE:
case SHORT:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case CHAR:
case VARCHAR:
case STRING:
case DECIMAL:
return OrcBatchReader::returnImmutable;
case DATE:
return OrcBatchReader::copyDate;
case TIMESTAMP:
return OrcBatchReader::copyTimestamp;
case BINARY:
return OrcBatchReader::copyBinary;
case STRUCT:
List<TypeDescription> fieldTypes = schema.getChildren();
Function<Object, Object>[] copyFields = new Function[fieldTypes.size()];
for (int i = 0; i < fieldTypes.size(); i++) {
copyFields[i] = getCopyFunction(fieldTypes.get(i));
}
return new CopyStruct(copyFields);
case LIST:
TypeDescription entryType = schema.getChildren().get(0);
Function<Object, Object> copyEntry = getCopyFunction(entryType);
Class entryClass = getClassForType(entryType);
return new CopyList(copyEntry, entryClass);
case MAP:
TypeDescription keyType = schema.getChildren().get(0);
TypeDescription valueType = schema.getChildren().get(1);
Function<Object, Object> copyKey = getCopyFunction(keyType);
Function<Object, Object> copyValue = getCopyFunction(valueType);
return new CopyMap(copyKey, copyValue);
case UNION:
throw new UnsupportedOperationException("UNION type not supported yet");
default:
throw new IllegalArgumentException("Unknown type " + schema);
}
}
private static Object returnImmutable(Object o) {
return o;
}
private static Date copyDate(Object o) {
if (o == null) {
return null;
} else {
long date = ((Date) o).getTime();
return new Date(date);
}
}
private static Timestamp copyTimestamp(Object o) {
if (o == null) {
return null;
} else {
long millis = ((Timestamp) o).getTime();
int nanos = ((Timestamp) o).getNanos();
Timestamp copy = new Timestamp(millis);
copy.setNanos(nanos);
return copy;
}
}
private static byte[] copyBinary(Object o) {
if (o == null) {
return null;
} else {
int length = ((byte[]) o).length;
return Arrays.copyOf((byte[]) o, length);
}
}
private static class CopyStruct implements Function<Object, Object> {
private final Function<Object, Object>[] copyFields;
CopyStruct(Function<Object, Object>[] copyFields) {
this.copyFields = copyFields;
}
@Override
public Object apply(Object o) {
if (o == null) {
return null;
} else {
Row r = (Row) o;
Row copy = new Row(copyFields.length);
for (int i = 0; i < copyFields.length; i++) {
copy.setField(i, copyFields[i].apply(r.getField(i)));
}
return copy;
}
}
}
private static class CopyList implements Function<Object, Object> {
private final Function<Object, Object> copyEntry;
private final Class entryClass;
CopyList(Function<Object, Object> copyEntry, Class entryClass) {
this.copyEntry = copyEntry;
this.entryClass = entryClass;
}
@Override
public Object apply(Object o) {
if (o == null) {
return null;
} else {
Object[] l = (Object[]) o;
Object[] copy = (Object[]) Array.newInstance(entryClass, l.length);
for (int i = 0; i < l.length; i++) {
copy[i] = copyEntry.apply(l[i]);
}
return copy;
}
}
}
@SuppressWarnings("unchecked")
private static class CopyMap implements Function<Object, Object> {
private final Function<Object, Object> copyKey;
private final Function<Object, Object> copyValue;
CopyMap(Function<Object, Object> copyKey, Function<Object, Object> copyValue) {
this.copyKey = copyKey;
this.copyValue = copyValue;
}
@Override
public Object apply(Object o) {
if (o == null) {
return null;
} else {
Map<Object, Object> m = (Map<Object, Object>) o;
HashMap<Object, Object> copy = new HashMap<>(m.size());
for (Map.Entry<Object, Object> e : m.entrySet()) {
Object keyCopy = copyKey.apply(e.getKey());
Object valueCopy = copyValue.apply(e.getValue());
copy.put(keyCopy, valueCopy);
}
return copy;
}
}
}
}