blob: 7e80b0e87207540053aeeb230ee6a810d4e17ac5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.dataformat.util;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.BigDecimalTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.table.api.types.ArrayType;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.api.types.DateType;
import org.apache.flink.table.api.types.DecimalType;
import org.apache.flink.table.api.types.GenericType;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.api.types.MapType;
import org.apache.flink.table.api.types.RowType;
import org.apache.flink.table.api.types.TimestampType;
import org.apache.flink.table.api.types.TypeInfoWrappedDataType;
import org.apache.flink.table.dataformat.BaseArray;
import org.apache.flink.table.dataformat.BaseMap;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryArray;
import org.apache.flink.table.dataformat.BinaryMap;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.BinaryWriter;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.NestedRow;
import org.apache.flink.table.dataformat.TypeGetterSetters;
import org.apache.flink.table.typeutils.BaseArraySerializer;
import org.apache.flink.table.typeutils.BaseMapSerializer;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
import org.apache.flink.table.typeutils.TypeUtils;
import java.io.IOException;
import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Util for base row.
*/
public final class BaseRowUtil {
/**
* Indicates the row as an accumulate message.
*/
public static final byte ACCUMULATE_MSG = 0;
/**
* Indicates the row as a retraction message.
*/
public static final byte RETRACT_MSG = 1;
public static boolean isAccumulateMsg(BaseRow baseRow) {
return baseRow.getHeader() == ACCUMULATE_MSG;
}
public static boolean isRetractMsg(BaseRow baseRow) {
return baseRow.getHeader() == RETRACT_MSG;
}
public static BaseRow setAccumulate(BaseRow baseRow) {
baseRow.setHeader(ACCUMULATE_MSG);
return baseRow;
}
public static BaseRow setRetract(BaseRow baseRow) {
baseRow.setHeader(RETRACT_MSG);
return baseRow;
}
public static GenericRow toGenericRow(
BaseRow baseRow,
TypeInformation[] typeInfos,
TypeSerializer[] typeSerializers) {
if (baseRow instanceof GenericRow) {
return (GenericRow) baseRow;
} else {
GenericRow row = new GenericRow(baseRow.getArity());
row.setHeader(baseRow.getHeader());
for (int i = 0; i < row.getArity(); i++) {
if (baseRow.isNullAt(i)) {
row.update(i, null);
} else {
row.update(i, get(baseRow, i, typeInfos[i], typeSerializers[i]));
}
}
return row;
}
}
public static GenericRow toGenericRow(
BaseRow baseRow,
InternalType[] types) {
if (baseRow instanceof GenericRow) {
return (GenericRow) baseRow;
} else {
GenericRow row = new GenericRow(baseRow.getArity());
row.setHeader(baseRow.getHeader());
for (int i = 0; i < row.getArity(); i++) {
if (baseRow.isNullAt(i)) {
row.update(i, null);
} else {
row.update(i, TypeGetterSetters.get(baseRow, i, types[i]));
}
}
return row;
}
}
public static Object get(BaseRow row, int ordinal, TypeInformation type, TypeSerializer serializer) {
if (type.equals(Types.BOOLEAN)) {
return row.getBoolean(ordinal);
} else if (type.equals(Types.BYTE)) {
return row.getByte(ordinal);
} else if (type.equals(Types.SHORT)) {
return row.getShort(ordinal);
} else if (type.equals(Types.INT)) {
return row.getInt(ordinal);
} else if (type.equals(Types.LONG)) {
return row.getLong(ordinal);
} else if (type.equals(Types.FLOAT)) {
return row.getFloat(ordinal);
} else if (type.equals(Types.DOUBLE)) {
return row.getDouble(ordinal);
} else if (type instanceof BigDecimalTypeInfo) {
BigDecimalTypeInfo dt = (BigDecimalTypeInfo) type;
return row.getDecimal(ordinal, dt.precision(), dt.scale());
} else if (type.equals(Types.STRING)) {
return row.getBinaryString(ordinal);
} else if (type.equals(BasicTypeInfo.CHAR_TYPE_INFO)) {
return row.getChar(ordinal);
} else if (type.equals(TimeIndicatorTypeInfo.ROWTIME_INDICATOR())) {
return row.getLong(ordinal);
} else if (type.equals(SqlTimeTypeInfo.DATE)) {
return row.getInt(ordinal);
} else if (type.equals(SqlTimeTypeInfo.TIME)) {
return row.getInt(ordinal);
} else if (type.equals(SqlTimeTypeInfo.TIMESTAMP)) {
return row.getLong(ordinal);
} else if (type.equals(BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
return row.getByteArray(ordinal);
} else if (TypeUtils.isInternalArrayType(type)) {
return row.getBaseArray(ordinal);
} else if (type instanceof MapTypeInfo) {
return row.getBaseMap(ordinal);
} else if (TypeUtils.isInternalCompositeType(type)) {
return row.getBaseRow(ordinal, type.getArity());
} else {
return row.getGeneric(ordinal, serializer);
}
}
public static Object get(BaseRow row, int ordinal, DataType type) {
if (type.equals(DataTypes.BOOLEAN)) {
return row.getBoolean(ordinal);
} else if (type.equals(DataTypes.BYTE)) {
return row.getByte(ordinal);
} else if (type.equals(DataTypes.SHORT)) {
return row.getShort(ordinal);
} else if (type.equals(DataTypes.INT)) {
return row.getInt(ordinal);
} else if (type.equals(DataTypes.LONG)) {
return row.getLong(ordinal);
} else if (type.equals(DataTypes.FLOAT)) {
return row.getFloat(ordinal);
} else if (type.equals(DataTypes.DOUBLE)) {
return row.getDouble(ordinal);
} else if (type instanceof DecimalType) {
DecimalType dt = (DecimalType) type;
return row.getDecimal(ordinal, dt.precision(), dt.scale());
} else if (type.equals(DataTypes.STRING)) {
return row.getBinaryString(ordinal);
} else if (type.equals(DataTypes.CHAR)) {
return row.getChar(ordinal);
} else if (type.equals(DataTypes.ROWTIME_INDICATOR)) {
return row.getLong(ordinal);
} else if (type.equals(DataTypes.DATE)) {
return row.getInt(ordinal);
} else if (type.equals(DataTypes.TIME)) {
return row.getInt(ordinal);
} else if (type.equals(DataTypes.TIMESTAMP)) {
return row.getLong(ordinal);
} else if (type.equals(DataTypes.BYTE_ARRAY)) {
return row.getByteArray(ordinal);
} else if (type instanceof ArrayType) {
return row.getBaseArray(ordinal);
} else if (type instanceof MapType) {
return row.getBaseMap(ordinal);
} else if (type instanceof RowType) {
return row.getBaseRow(ordinal, ((RowType) type).getArity());
} else if (type instanceof GenericType) {
return row.getGeneric(ordinal, (GenericType) type);
} else if (type instanceof TypeInfoWrappedDataType) {
return TypeGetterSetters.get(row, ordinal, type.toInternalType());
} else {
throw new RuntimeException("Not support type: " + type);
}
}
public static String toOriginString(BaseRow row, TypeInformation[] types, TypeSerializer[] serializers) {
checkArgument(types.length == row.getArity());
StringBuilder build = new StringBuilder("[");
build.append(row.getHeader());
for (int i = 0; i < row.getArity(); i++) {
build.append(',');
if (row.isNullAt(i)) {
build.append("null");
} else {
TypeSerializer serializer = serializers != null ? serializers[i] : null;
build.append(get(row, i, types[i], serializer));
}
}
build.append(']');
return build.toString();
}
public static void write(BinaryWriter writer, int pos, Object o, InternalType type, TypeSerializer serializer) {
if (type.equals(DataTypes.BOOLEAN)) {
writer.writeBoolean(pos, (boolean) o);
} else if (type.equals(DataTypes.BYTE)) {
writer.writeByte(pos, (byte) o);
} else if (type.equals(DataTypes.SHORT)) {
writer.writeShort(pos, (short) o);
} else if (type.equals(DataTypes.INT)) {
writer.writeInt(pos, (int) o);
} else if (type.equals(DataTypes.LONG)) {
writer.writeLong(pos, (long) o);
} else if (type.equals(DataTypes.FLOAT)) {
writer.writeFloat(pos, (float) o);
} else if (type.equals(DataTypes.DOUBLE)) {
writer.writeDouble(pos, (double) o);
} else if (type.equals(DataTypes.STRING)) {
writer.writeBinaryString(pos, (BinaryString) o);
} else if (type.equals(DataTypes.CHAR)) {
writer.writeChar(pos, (char) o);
} else if (type instanceof DecimalType) {
DecimalType t = (DecimalType) type;
writer.writeDecimal(pos, (Decimal) o, t.precision(), t.scale());
} else if (type instanceof DateType) {
writer.writeInt(pos, (int) o);
} else if (type.equals(DataTypes.TIME)) {
writer.writeInt(pos, (int) o);
} else if (type instanceof TimestampType) {
writer.writeLong(pos, (long) o);
} else if (type.equals(DataTypes.BYTE_ARRAY)) {
writer.writeByteArray(pos, (byte[]) o);
} else if (type instanceof ArrayType) {
writeBaseArray(writer, pos, (BaseArray) o, (BaseArraySerializer) serializer);
} else if (type instanceof MapType) {
writeBaseMap(writer, pos, (BaseMap) o, (BaseMapSerializer) serializer);
} else if (type instanceof RowType) {
writeBaseRow(writer, pos, (BaseRow) o, (BaseRowSerializer) serializer);
} else {
writer.writeGeneric(pos, o, (GenericType) type);
}
}
public static void writeBaseArray(BinaryWriter writer, int pos, BaseArray input, BaseArraySerializer serializer) {
BinaryArray binaryArray;
if (input instanceof BinaryArray) {
binaryArray = (BinaryArray) input;
} else {
binaryArray = serializer.baseArrayToBinary(input);
}
writer.writeBinaryArray(pos, binaryArray);
}
public static void writeBaseMap(BinaryWriter writer, int pos, BaseMap input, BaseMapSerializer serializer) {
BinaryMap binaryMap;
if (input instanceof BinaryMap) {
binaryMap = (BinaryMap) input;
} else {
binaryMap = serializer.baseMapToBinary(input);
}
writer.writeBinaryMap(pos, binaryMap);
}
public static void writeBaseRow(BinaryWriter writer, int pos, BaseRow input, BaseRowSerializer serializer) {
if (input instanceof BinaryRow) {
BinaryRow row = (BinaryRow) input;
writer.writeSegments(pos, row.getAllSegments(), row.getBaseOffset(), row.getSizeInBytes());
} else if (input instanceof NestedRow) {
NestedRow row = (NestedRow) input;
writer.writeNestedRow(pos, row);
} else {
try {
BinaryRow row = serializer.baseRowToBinary(input);
writer.writeBinaryRow(pos, row);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}