blob: e125a3065e1ceb7267be559f179ccac990ade40d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.catalog;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.doris.flink.catalog.doris.DorisType;
import static org.apache.doris.flink.catalog.doris.DorisType.ARRAY;
import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT;
import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN;
import static org.apache.doris.flink.catalog.doris.DorisType.CHAR;
import static org.apache.doris.flink.catalog.doris.DorisType.DATE;
import static org.apache.doris.flink.catalog.doris.DorisType.DATETIME;
import static org.apache.doris.flink.catalog.doris.DorisType.DATETIME_V2;
import static org.apache.doris.flink.catalog.doris.DorisType.DATE_V2;
import static org.apache.doris.flink.catalog.doris.DorisType.DECIMAL;
import static org.apache.doris.flink.catalog.doris.DorisType.DECIMAL_V3;
import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
import static org.apache.doris.flink.catalog.doris.DorisType.INT;
import static org.apache.doris.flink.catalog.doris.DorisType.JSON;
import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
import static org.apache.doris.flink.catalog.doris.DorisType.MAP;
import static org.apache.doris.flink.catalog.doris.DorisType.SMALLINT;
import static org.apache.doris.flink.catalog.doris.DorisType.STRING;
import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT;
import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
public class DorisTypeMapper {
/** Max size of char type of Doris. */
public static final int MAX_CHAR_SIZE = 255;
/** Max size of varchar type of Doris. */
public static final int MAX_VARCHAR_SIZE = 65533;
/* Max precision of datetime type of Doris. */
public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
public static DataType toFlinkType(
String columnName, String columnType, int precision, int scale) {
columnType = columnType.toUpperCase();
switch (columnType) {
case BOOLEAN:
return DataTypes.BOOLEAN();
case TINYINT:
if (precision == 0) {
// The boolean type will become tinyint when queried in information_schema, and
// precision=0
return DataTypes.BOOLEAN();
} else {
return DataTypes.TINYINT();
}
case SMALLINT:
return DataTypes.SMALLINT();
case INT:
return DataTypes.INT();
case BIGINT:
return DataTypes.BIGINT();
case DECIMAL:
case DECIMAL_V3:
return DataTypes.DECIMAL(precision, scale);
case FLOAT:
return DataTypes.FLOAT();
case DOUBLE:
return DataTypes.DOUBLE();
case CHAR:
return DataTypes.CHAR(precision);
case VARCHAR:
return DataTypes.VARCHAR(precision);
case LARGEINT:
case STRING:
case JSONB:
case JSON:
// Currently, the subtype of the generic cannot be obtained,
// so it is mapped to string
case ARRAY:
case MAP:
case STRUCT:
return DataTypes.STRING();
case DATE:
case DATE_V2:
return DataTypes.DATE();
case DATETIME:
case DATETIME_V2:
return DataTypes.TIMESTAMP(0);
default:
throw new UnsupportedOperationException(
String.format(
"Doesn't support Doris type '%s' on column '%s'",
columnType, columnName));
}
}
public static String toDorisType(DataType flinkType) {
LogicalType logicalType = flinkType.getLogicalType();
return logicalType.accept(new LogicalTypeVisitor(logicalType));
}
private static class LogicalTypeVisitor extends LogicalTypeDefaultVisitor<String> {
private final LogicalType type;
LogicalTypeVisitor(LogicalType type) {
this.type = type;
}
@Override
public String visit(CharType charType) {
long length = charType.getLength() * 3L;
if (length <= MAX_CHAR_SIZE) {
return String.format("%s(%s)", DorisType.CHAR, length);
} else {
return visit(new VarCharType(charType.getLength()));
}
}
@Override
public String visit(VarCharType varCharType) {
// Flink varchar length max value is int, it may overflow after multiplying by 3
long length = varCharType.getLength() * 3L;
return length >= MAX_VARCHAR_SIZE ? STRING : String.format("%s(%s)", VARCHAR, length);
}
@Override
public String visit(BooleanType booleanType) {
return BOOLEAN;
}
@Override
public String visit(VarBinaryType varBinaryType) {
return STRING;
}
@Override
public String visit(DecimalType decimalType) {
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
return precision <= 38
? String.format(
"%s(%s,%s)", DorisType.DECIMAL_V3, precision, Math.max(scale, 0))
: DorisType.STRING;
}
@Override
public String visit(TinyIntType tinyIntType) {
return TINYINT;
}
@Override
public String visit(SmallIntType smallIntType) {
return SMALLINT;
}
@Override
public String visit(IntType intType) {
return INT;
}
@Override
public String visit(BigIntType bigIntType) {
return BIGINT;
}
@Override
public String visit(FloatType floatType) {
return FLOAT;
}
@Override
public String visit(DoubleType doubleType) {
return DOUBLE;
}
@Override
public String visit(DateType dateType) {
return DATE_V2;
}
@Override
public String visit(TimestampType timestampType) {
int precision = timestampType.getPrecision();
return String.format(
"%s(%s)", DorisType.DATETIME_V2, Math.min(Math.max(precision, 0), 6));
}
@Override
public String visit(ArrayType arrayType) {
return STRING;
}
@Override
public String visit(MapType mapType) {
return STRING;
}
@Override
public String visit(RowType rowType) {
return STRING;
}
@Override
protected String defaultMethod(LogicalType logicalType) {
throw new UnsupportedOperationException(
String.format(
"Flink doesn't support converting type %s to Doris type yet.",
type.toString()));
}
}
}