blob: 7b580ef0a1c0c80370288145fcc5d291cb5d0dcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl.utils;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Date;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.Instant;
import org.joda.time.base.AbstractInstant;
/** Utility methods for Calcite related operations. */
public class CalciteUtils {
private static final long UNLIMITED_ARRAY_SIZE = -1L;
// SQL has schema types that do not directly correspond to Beam Schema types. We define
// LogicalTypes to represent each of these types.
/** A LogicalType corresponding to TIME_WITH_LOCAL_TIME_ZONE. */
public static class TimeWithLocalTzType extends PassThroughLogicalType<Instant> {
public static final String IDENTIFIER = "SqlTimeWithLocalTzType";
public TimeWithLocalTzType() {
super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME);
}
}
/** A LogicalType corresponding to CHAR. */
public static class CharType extends PassThroughLogicalType<String> {
public static final String IDENTIFIER = "SqlCharType";
public CharType() {
super(IDENTIFIER, FieldType.STRING, "", FieldType.STRING);
}
}
/** Returns true if the type is any of the various date time types. */
public static boolean isDateTimeType(FieldType fieldType) {
if (fieldType.getTypeName() == TypeName.DATETIME) {
return true;
}
if (fieldType.getTypeName().isLogicalType()) {
Schema.LogicalType logicalType = fieldType.getLogicalType();
Preconditions.checkArgumentNotNull(logicalType);
String logicalId = logicalType.getIdentifier();
return logicalId.equals(SqlTypes.DATE.getIdentifier())
|| logicalId.equals(SqlTypes.TIME.getIdentifier())
|| logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
|| logicalId.equals(SqlTypes.DATETIME.getIdentifier());
}
return false;
}
public static boolean isStringType(FieldType fieldType) {
if (fieldType.getTypeName() == TypeName.STRING) {
return true;
}
if (fieldType.getTypeName().isLogicalType()) {
Schema.LogicalType logicalType = fieldType.getLogicalType();
Preconditions.checkArgumentNotNull(logicalType);
String logicalId = logicalType.getIdentifier();
return logicalId.equals(CharType.IDENTIFIER);
}
return false;
}
// The list of field type names used in SQL as Beam field types.
public static final FieldType TINY_INT = FieldType.BYTE;
public static final FieldType SMALL_INT = FieldType.INT16;
public static final FieldType INTEGER = FieldType.INT32;
public static final FieldType BIG_INT = FieldType.INT64;
public static final FieldType FLOAT = FieldType.FLOAT;
public static final FieldType DOUBLE = FieldType.DOUBLE;
public static final FieldType DECIMAL = FieldType.DECIMAL;
public static final FieldType BOOLEAN = FieldType.BOOLEAN;
public static final FieldType VARBINARY = FieldType.BYTES;
public static final FieldType VARCHAR = FieldType.STRING;
public static final FieldType CHAR = FieldType.logicalType(new CharType());
public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE);
public static final FieldType NULLABLE_DATE =
FieldType.logicalType(SqlTypes.DATE).withNullable(true);
public static final FieldType TIME = FieldType.logicalType(SqlTypes.TIME);
public static final FieldType NULLABLE_TIME =
FieldType.logicalType(SqlTypes.TIME).withNullable(true);
public static final FieldType TIME_WITH_LOCAL_TZ =
FieldType.logicalType(new TimeWithLocalTzType());
public static final FieldType TIMESTAMP = FieldType.DATETIME;
public static final FieldType NULLABLE_TIMESTAMP = FieldType.DATETIME.withNullable(true);
public static final FieldType TIMESTAMP_WITH_LOCAL_TZ = FieldType.logicalType(SqlTypes.DATETIME);
public static final FieldType NULLABLE_TIMESTAMP_WITH_LOCAL_TZ =
FieldType.logicalType(SqlTypes.DATETIME).withNullable(true);
private static final BiMap<FieldType, SqlTypeName> BEAM_TO_CALCITE_TYPE_MAPPING =
ImmutableBiMap.<FieldType, SqlTypeName>builder()
.put(TINY_INT, SqlTypeName.TINYINT)
.put(SMALL_INT, SqlTypeName.SMALLINT)
.put(INTEGER, SqlTypeName.INTEGER)
.put(BIG_INT, SqlTypeName.BIGINT)
.put(FLOAT, SqlTypeName.FLOAT)
.put(DOUBLE, SqlTypeName.DOUBLE)
.put(DECIMAL, SqlTypeName.DECIMAL)
.put(BOOLEAN, SqlTypeName.BOOLEAN)
.put(VARBINARY, SqlTypeName.VARBINARY)
.put(VARCHAR, SqlTypeName.VARCHAR)
.put(CHAR, SqlTypeName.CHAR)
.put(DATE, SqlTypeName.DATE)
.put(TIME, SqlTypeName.TIME)
.put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE)
.put(TIMESTAMP, SqlTypeName.TIMESTAMP)
.put(TIMESTAMP_WITH_LOCAL_TZ, SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
.build();
private static final ImmutableMap<SqlTypeName, FieldType> CALCITE_TO_BEAM_TYPE_MAPPING =
ImmutableMap.<SqlTypeName, FieldType>builder()
.put(SqlTypeName.TINYINT, TINY_INT)
.put(SqlTypeName.SMALLINT, SMALL_INT)
.put(SqlTypeName.INTEGER, INTEGER)
.put(SqlTypeName.BIGINT, BIG_INT)
.put(SqlTypeName.FLOAT, FLOAT)
.put(SqlTypeName.DOUBLE, DOUBLE)
.put(SqlTypeName.DECIMAL, DECIMAL)
.put(SqlTypeName.BOOLEAN, BOOLEAN)
.put(SqlTypeName.VARBINARY, VARBINARY)
.put(SqlTypeName.BINARY, VARBINARY)
.put(SqlTypeName.VARCHAR, VARCHAR)
.put(SqlTypeName.CHAR, CHAR)
.put(SqlTypeName.DATE, DATE)
.put(SqlTypeName.TIME, TIME)
.put(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, TIME_WITH_LOCAL_TZ)
.put(SqlTypeName.TIMESTAMP, TIMESTAMP)
.put(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TZ)
.build();
// Since there are multiple Calcite type that correspond to a single Beam type, this is the
// default mapping.
private static final Map<FieldType, SqlTypeName> BEAM_TO_CALCITE_DEFAULT_MAPPING =
ImmutableMap.of(
FieldType.DATETIME, SqlTypeName.TIMESTAMP,
FieldType.STRING, SqlTypeName.VARCHAR);
/** Generate {@link Schema} from {@code RelDataType} which is used to create table. */
public static Schema toSchema(RelDataType tableInfo) {
return tableInfo.getFieldList().stream().map(CalciteUtils::toField).collect(Schema.toSchema());
}
public static SqlTypeName toSqlTypeName(FieldType type) {
switch (type.getTypeName()) {
case ROW:
return SqlTypeName.ROW;
case ARRAY:
case ITERABLE:
return SqlTypeName.ARRAY;
case MAP:
return SqlTypeName.MAP;
default:
SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type.withNullable(false));
if (typeName == null) {
// This will happen e.g. if looking up a STRING type, and metadata isn't set to say which
// type of SQL string we want. In this case, use the default mapping.
typeName = BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
}
if (typeName == null) {
throw new IllegalArgumentException(
String.format("Cannot find a matching Calcite SqlTypeName for Beam type: %s", type));
} else {
return typeName;
}
}
}
public static FieldType toFieldType(SqlTypeName sqlTypeName) {
switch (sqlTypeName) {
case MAP:
case MULTISET:
case ARRAY:
case ROW:
throw new IllegalArgumentException(
String.format(
"%s is a type constructor that takes parameters, not a type,"
+ "so it cannot be converted to a %s",
sqlTypeName, Schema.FieldType.class.getSimpleName()));
default:
FieldType fieldType = CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName);
if (fieldType == null) {
throw new IllegalArgumentException(
"Cannot find a matching Beam FieldType for Calcite type: " + sqlTypeName);
}
return fieldType;
}
}
public static Schema.Field toField(RelDataTypeField calciteField) {
return toField(calciteField.getName(), calciteField.getType());
}
public static Schema.Field toField(String name, RelDataType calciteType) {
return Schema.Field.of(name, toFieldType(calciteType)).withNullable(calciteType.isNullable());
}
public static FieldType toFieldType(RelDataType calciteType) {
switch (calciteType.getSqlTypeName()) {
case ARRAY:
case MULTISET:
return FieldType.array(toFieldType(calciteType.getComponentType()));
case MAP:
return FieldType.map(
toFieldType(calciteType.getKeyType()), toFieldType(calciteType.getValueType()));
case ROW:
return FieldType.row(toSchema(calciteType));
default:
try {
return toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Cannot find a matching Beam FieldType for Calcite type: " + calciteType, e);
}
}
}
/** Create an instance of {@code RelDataType} so it can be used to create a table. */
public static RelDataType toCalciteRowType(Schema schema, RelDataTypeFactory dataTypeFactory) {
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(dataTypeFactory);
IntStream.range(0, schema.getFieldCount())
.forEach(
idx ->
builder.add(
schema.getField(idx).getName(), toRelDataType(dataTypeFactory, schema, idx)));
return builder.build();
}
public static RelDataType toRelDataType(RelDataTypeFactory dataTypeFactory, FieldType fieldType) {
switch (fieldType.getTypeName()) {
case ARRAY:
case ITERABLE:
FieldType collectionElementType = fieldType.getCollectionElementType();
Preconditions.checkArgumentNotNull(collectionElementType);
return dataTypeFactory.createArrayType(
toRelDataType(dataTypeFactory, collectionElementType), UNLIMITED_ARRAY_SIZE);
case MAP:
FieldType mapKeyType = fieldType.getMapKeyType();
FieldType mapValueType = fieldType.getMapValueType();
Preconditions.checkArgumentNotNull(mapKeyType);
Preconditions.checkArgumentNotNull(mapValueType);
RelDataType componentKeyType = toRelDataType(dataTypeFactory, mapKeyType);
RelDataType componentValueType = toRelDataType(dataTypeFactory, mapValueType);
return dataTypeFactory.createMapType(componentKeyType, componentValueType);
case ROW:
Schema schema = fieldType.getRowSchema();
Preconditions.checkArgumentNotNull(schema);
return toCalciteRowType(schema, dataTypeFactory);
default:
return dataTypeFactory.createSqlType(toSqlTypeName(fieldType));
}
}
private static RelDataType toRelDataType(
RelDataTypeFactory dataTypeFactory, Schema schema, int fieldIndex) {
Schema.Field field = schema.getField(fieldIndex);
RelDataType type = toRelDataType(dataTypeFactory, field.getType());
return dataTypeFactory.createTypeWithNullability(type, field.getType().getNullable());
}
/**
* SQL-Java type mapping, with specified Beam rules: <br>
* 1. redirect {@link AbstractInstant} to {@link Date} so Calcite can recognize it. <br>
* 2. For a list, the component type is needed to create a Sql array type. <br>
* 3. For a Map, the component type is needed to create a Sql map type.
*
* @param type
* @return Calcite RelDataType
*/
public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type type) {
// For Joda time types, return SQL type for java.util.Date.
if (type instanceof Class && AbstractInstant.class.isAssignableFrom((Class<?>) type)) {
return typeFactory.createJavaType(Date.class);
} else if (type instanceof Class && ByteString.class.isAssignableFrom((Class<?>) type)) {
return typeFactory.createJavaType(byte[].class);
} else if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
if (java.util.List.class.isAssignableFrom((Class<?>) parameterizedType.getRawType())) {
Class<?> genericType = (Class<?>) parameterizedType.getActualTypeArguments()[0];
RelDataType collectionElementType = typeFactory.createJavaType(genericType);
return typeFactory.createArrayType(collectionElementType, UNLIMITED_ARRAY_SIZE);
} else if (java.util.Map.class.isAssignableFrom((Class<?>) parameterizedType.getRawType())) {
Class<?> genericKeyType = (Class<?>) parameterizedType.getActualTypeArguments()[0];
Class<?> genericValueType = (Class<?>) parameterizedType.getActualTypeArguments()[1];
RelDataType mapElementKeyType = typeFactory.createJavaType(genericKeyType);
RelDataType mapElementValueType = typeFactory.createJavaType(genericValueType);
return typeFactory.createMapType(mapElementKeyType, mapElementValueType);
}
}
return typeFactory.createJavaType((Class) type);
}
}