blob: a0f6708589a59194880068e3d9db42f6c4c639d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.dataformat.vector;
import org.apache.flink.table.api.types.DecimalType;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.api.types.Types;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.util.TimeConvertUtils;
import java.io.Serializable;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
/**
* A VectorizedColumnBatch is a set of rows, organized with each column
* as a vector. It is the unit of query execution, organized to minimize
* the cost per row and achieve high cycles-per-instruction.
* The major fields are public by design to allow fast and convenient
* access by the vectorized query execution code.
*/
public class VectorizedColumnBatch implements Serializable {
private static final long serialVersionUID = 8180323238728166155L;
/**
* This number is carefully chosen to minimize overhead and typically allows
* one VectorizedColumnBatch to fit in cache.
*/
public static final int MAX_SIZE = 2048;
protected InternalType[] fieldTypes;
private final int capacity;
private int numRows;
public final ColumnVector[] columns;
private final boolean copyToInternal;
public static VectorizedColumnBatch allocate(InternalType[] fieldTypes) {
return new VectorizedColumnBatch(fieldTypes, MAX_SIZE);
}
/**
* Return a batch with the specified number of columns and rows.
* Only call this constructor directly for testing purposes.
* Batch size should normally always be defaultSize.
*
*/
private VectorizedColumnBatch(InternalType[] fieldTypes, int maxRows) {
this.fieldTypes = fieldTypes;
this.capacity = maxRows;
this.columns = new ColumnVector[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
createColumn(this.columns, i, fieldTypes[i], maxRows);
}
this.copyToInternal = true;
}
public VectorizedColumnBatch(InternalType[] fieldTypes, int maxRows, ColumnVector[] columns) {
this.fieldTypes = fieldTypes;
this.capacity = maxRows;
this.columns = columns;
this.copyToInternal = false;
}
private void createColumn(ColumnVector[] columns, int index, InternalType fieldType, int maxRows) {
if (fieldType.equals(Types.BOOLEAN)) {
columns[index] = new BooleanColumnVector(maxRows);
} else if (fieldType.equals(Types.BYTE)) {
columns[index] = new ByteColumnVector(maxRows);
} else if (fieldType.equals(Types.DOUBLE)) {
columns[index] = new DoubleColumnVector(maxRows);
} else if (fieldType.equals(Types.FLOAT)) {
columns[index] = new FloatColumnVector(maxRows);
} else if (fieldType.equals(Types.INT) ||
(fieldType instanceof DecimalType && Decimal.is32BitDecimal(((DecimalType) fieldType).precision()))) {
columns[index] = new IntegerColumnVector(maxRows);
} else if (fieldType.equals(Types.LONG) ||
(fieldType instanceof DecimalType && Decimal.is64BitDecimal(((DecimalType) fieldType).precision()))) {
columns[index] = new LongColumnVector(maxRows);
} else if (fieldType.equals(Types.SHORT)) {
columns[index] = new ShortColumnVector(maxRows);
} else if (fieldType.equals(Types.STRING)) {
columns[index] = new StringColumnVector(maxRows);
} else if (fieldType.equals(Types.BYTE_ARRAY) ||
(fieldType instanceof DecimalType && Decimal.isByteArrayDecimal(((DecimalType) fieldType).precision()))) {
columns[index] = new BytesColumnVector(maxRows);
} else if (fieldType.equals(Types.DATE)) {
columns[index] = new DateColumnVector(maxRows);
} else if (fieldType.equals(Types.TIME)) {
columns[index] = new TimeColumnVector(maxRows);
} else if (fieldType.equals(Types.TIMESTAMP)) {
columns[index] = new TimestampColumnVector(maxRows);
} else {
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
}
public void close() {
}
/**
* Resets the batch for writing.
*/
public void reset() {
for (ColumnVector column : columns) {
column.reset();
}
this.numRows = 0;
}
public int capacity() {
return this.capacity;
}
public void setNumRows(int numRows) {
this.numRows = numRows;
}
public int getNumRows() {
return numRows;
}
public int getArity() {
return columns.length;
}
public boolean isNullAt(int rowId, int colId) {
return !columns[colId].noNulls && columns[colId].isNull[rowId];
}
public boolean getBoolean(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getBoolean(rowId);
}
ColumnVector columnVector = columns[colId];
if (columnVector.dictionary == null) {
BooleanColumnVector booleanColumnVector = (BooleanColumnVector) columnVector;
return booleanColumnVector.vector[rowId];
} else {
return columnVector.dictionary.decodeToBoolean(columnVector.dictionaryIds.vector[rowId]);
}
}
public byte getByte(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getByte(rowId);
}
ColumnVector columnVector = columns[colId];
if (columnVector.dictionary == null) {
ByteColumnVector byteColumnVector = (ByteColumnVector) columnVector;
return byteColumnVector.vector[rowId];
} else {
return (byte) columnVector.dictionary.decodeToInt(columnVector.dictionaryIds.vector[rowId]);
}
}
public short getShort(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getShort(rowId);
}
ColumnVector columnVector = columns[colId];
if (columnVector.dictionary == null) {
ShortColumnVector shortColumnVector = (ShortColumnVector) columnVector;
return shortColumnVector.vector[rowId];
} else {
return (short) columnVector.dictionary.decodeToInt(columnVector.dictionaryIds.vector[rowId]);
}
}
public int getInt(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getInt(rowId);
}
ColumnVector columnVector = columns[colId];
if (columnVector.dictionary == null) {
IntegerColumnVector integerColumnVector = (IntegerColumnVector) columns[colId];
return integerColumnVector.vector[rowId];
} else {
return columnVector.dictionary.decodeToInt(columnVector.dictionaryIds.vector[rowId]);
}
}
public long getLong(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getLong(rowId);
}
ColumnVector columnVector = columns[colId];
if (columnVector.dictionary == null) {
LongColumnVector longColumnVector = (LongColumnVector) columns[colId];
return longColumnVector.vector[rowId];
} else {
return columnVector.dictionary.decodeToLong(columnVector.dictionaryIds.vector[rowId]);
}
}
public float getFloat(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getFloat(rowId);
}
ColumnVector columnVector = columns[colId];
if (columns[colId].dictionary == null) {
FloatColumnVector floatColumnVector = (FloatColumnVector) columns[colId];
return floatColumnVector.vector[rowId];
} else {
return columnVector.dictionary.decodeToFloat(columnVector.dictionaryIds.vector[rowId]);
}
}
public double getDouble(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getDouble(rowId);
}
ColumnVector columnVector = columns[colId];
if (columns[colId].dictionary == null) {
DoubleColumnVector doubleColumnVector = (DoubleColumnVector) columns[colId];
return doubleColumnVector.vector[rowId];
} else {
return columnVector.dictionary.decodeToDouble(columnVector.dictionaryIds.vector[rowId]);
}
}
public ByteArray getByteArray(int rowId, int colId) {
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getByteArray(rowId);
}
ColumnVector columnVector = columns[colId];
if (columns[colId].dictionary == null) {
BytesColumnVector bytesColumnVector = (BytesColumnVector) columns[colId];
return new ByteArray(bytesColumnVector.buffer,
bytesColumnVector.start[rowId],
bytesColumnVector.length[rowId]);
} else {
byte[] bytes = columnVector.dictionary.decodeToBinary(columnVector.dictionaryIds.vector[rowId]);
return new ByteArray(bytes, 0, bytes.length);
}
}
private byte[] getBytes(int rowId, int colId) {
ByteArray byteArray = getByteArray(rowId, colId);
if (byteArray.len == byteArray.data.length) {
return byteArray.data;
} else {
return byteArray.getBytes();
}
}
public String getString(int rowId, int colId) {
ByteArray byteArray = getByteArray(rowId, colId);
return new String(byteArray.data, byteArray.offset, byteArray.len);
}
public Date getDate(int rowId, int colId) {
return TimeConvertUtils.internalToDate(getInt(rowId, colId));
}
public Time getTime(int rowId, int colId) {
return TimeConvertUtils.internalToTime(getInt(rowId, colId));
}
public Timestamp getTimestamp(int rowId, int colId) {
return TimeConvertUtils.internalToTimestamp(getLong(rowId, colId));
}
public Decimal getDecimal(int rowId, int colId) {
if (isNullAt(rowId, colId)) {
return null;
}
DecimalType decimalTypeInfo = (DecimalType) fieldTypes[colId];
int precision = decimalTypeInfo.precision();
int scale = decimalTypeInfo.scale();
if (!copyToInternal) {
return ((TypeGetVector) columns[colId]).getDecimal(rowId, precision, scale);
}
if (Decimal.is32BitDecimal(precision)) {
return Decimal.fromUnscaledLong(precision, scale, getInt(rowId, colId));
} else if (Decimal.is64BitDecimal(precision)) {
return Decimal.fromUnscaledLong(precision, scale, getLong(rowId, colId));
} else {
byte[] bytes = getBytes(rowId, colId);
return Decimal.fromUnscaledBytes(precision, scale, bytes);
}
}
public Object getInternalObject(int rowId, int colId) {
if (!columns[colId].noNulls && columns[colId].isNull[rowId]) {
return null;
} else if (Types.INT.equals(fieldTypes[colId])) {
return getInt(rowId, colId);
} else if (Types.SHORT.equals(fieldTypes[colId])) {
return getShort(rowId, colId);
} else if (Types.BOOLEAN.equals(fieldTypes[colId])) {
return getBoolean(rowId, colId);
} else if (Types.BYTE.equals(fieldTypes[colId])) {
return getByte(rowId, colId);
} else if (Types.DOUBLE.equals(fieldTypes[colId])) {
return getDouble(rowId, colId);
} else if (Types.FLOAT.equals(fieldTypes[colId])) {
return getFloat(rowId, colId);
} else if (Types.LONG.equals(fieldTypes[colId])) {
return getLong(rowId, colId);
} else if (Types.STRING.equals(fieldTypes[colId])) {
return BinaryString.fromBytes(getBytes(rowId, colId));
} else if (Types.TIMESTAMP.equals(fieldTypes[colId])) {
return getLong(rowId, colId);
} else if (Types.DATE.equals(fieldTypes[colId])) {
return getInt(rowId, colId);
} else if (Types.TIME.equals(fieldTypes[colId])) {
return getInt(rowId, colId);
} else if (Types.BYTE_ARRAY.equals(fieldTypes[colId])) {
return getBytes(rowId, colId);
} else if (fieldTypes[colId] instanceof DecimalType) {
return getDecimal(rowId, colId);
} else {
throw new RuntimeException(fieldTypes[colId] + " is not supported.");
}
}
public Object getObject(int rowId, int colId) {
if (!columns[colId].noNulls && columns[colId].isNull[rowId]) {
return null;
} else if (Types.INT.equals(fieldTypes[colId])) {
return getInt(rowId, colId);
} else if (Types.SHORT.equals(fieldTypes[colId])) {
return getShort(rowId, colId);
} else if (Types.BOOLEAN.equals(fieldTypes[colId])) {
return getBoolean(rowId, colId);
} else if (Types.BYTE.equals(fieldTypes[colId])) {
return getByte(rowId, colId);
} else if (Types.DOUBLE.equals(fieldTypes[colId])) {
return getDouble(rowId, colId);
} else if (Types.FLOAT.equals(fieldTypes[colId])) {
return getFloat(rowId, colId);
} else if (Types.LONG.equals(fieldTypes[colId])) {
return getLong(rowId, colId);
} else if (Types.STRING.equals(fieldTypes[colId])) {
return getString(rowId, colId);
} else if (Types.TIMESTAMP.equals(fieldTypes[colId])) {
return getTimestamp(rowId, colId);
} else if (Types.DATE.equals(fieldTypes[colId])) {
return getDate(rowId, colId);
} else if (Types.TIME.equals(fieldTypes[colId])) {
return getTime(rowId, colId);
} else if (Types.BYTE_ARRAY.equals(fieldTypes[colId])) {
return getBytes(rowId, colId);
} else if (fieldTypes[colId] instanceof DecimalType) {
return getDecimal(rowId, colId);
} else {
throw new RuntimeException(fieldTypes[colId] + " is not supported.");
}
}
/**
* For nested byte array.
*/
public static class ByteArray{
public final byte[] data;
public final int offset;
public final int len;
public ByteArray(byte[] data, int offset, int len) {
this.data = data;
this.offset = offset;
this.len = len;
}
public byte[] getBytes() {
byte[] res = new byte[len];
System.arraycopy(data, offset, res, 0, len);
return res;
}
}
}