blob: 0967f2e3d240532c31df0863cc988418aa46d408 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_FLOAT;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT32;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIME;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
import static java.util.stream.Collectors.toList;
import com.google.zetasql.ArrayType;
import com.google.zetasql.StructType;
import com.google.zetasql.StructType.StructField;
import com.google.zetasql.Type;
import com.google.zetasql.TypeFactory;
import com.google.zetasql.ZetaSQLType.TypeKind;
import java.util.List;
import java.util.function.Function;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
/** Utility to convert types from Calcite Schema types. */
@Internal
public class TypeUtils {
private static final ImmutableMap<SqlTypeName, Type> CALCITE_TO_ZETA_SIMPLE_TYPES =
ImmutableMap.<SqlTypeName, Type>builder()
.put(SqlTypeName.BIGINT, TypeFactory.createSimpleType(TYPE_INT64))
.put(SqlTypeName.INTEGER, TypeFactory.createSimpleType(TYPE_INT32))
.put(SqlTypeName.VARCHAR, TypeFactory.createSimpleType(TYPE_STRING))
.put(SqlTypeName.BOOLEAN, TypeFactory.createSimpleType(TYPE_BOOL))
.put(SqlTypeName.FLOAT, TypeFactory.createSimpleType(TYPE_FLOAT))
.put(SqlTypeName.DOUBLE, TypeFactory.createSimpleType(TYPE_DOUBLE))
.put(SqlTypeName.VARBINARY, TypeFactory.createSimpleType(TYPE_BYTES))
.put(SqlTypeName.TIMESTAMP, TypeFactory.createSimpleType(TYPE_TIMESTAMP))
.put(SqlTypeName.DATE, TypeFactory.createSimpleType(TYPE_DATE))
.put(SqlTypeName.TIME, TypeFactory.createSimpleType(TYPE_TIME))
.build();
private static final ImmutableMap<TypeKind, Function<RexBuilder, RelDataType>>
ZETA_TO_CALCITE_SIMPLE_TYPES =
ImmutableMap.<TypeKind, Function<RexBuilder, RelDataType>>builder()
.put(TYPE_NUMERIC, relDataTypeFactory(SqlTypeName.DECIMAL))
.put(TYPE_INT32, relDataTypeFactory(SqlTypeName.INTEGER))
.put(TYPE_INT64, relDataTypeFactory(SqlTypeName.BIGINT))
.put(TYPE_FLOAT, relDataTypeFactory(SqlTypeName.FLOAT))
.put(TYPE_DOUBLE, relDataTypeFactory(SqlTypeName.DOUBLE))
.put(TYPE_STRING, relDataTypeFactory(SqlTypeName.VARCHAR))
.put(TYPE_BOOL, relDataTypeFactory(SqlTypeName.BOOLEAN))
.put(TYPE_BYTES, relDataTypeFactory(SqlTypeName.VARBINARY))
.put(TYPE_DATE, relDataTypeFactory(SqlTypeName.DATE))
.put(TYPE_TIME, relDataTypeFactory(SqlTypeName.TIME))
// TODO: handle timestamp with time zone.
.put(TYPE_TIMESTAMP, relDataTypeFactory(SqlTypeName.TIMESTAMP))
.build();
/** Returns a type matching the corresponding Calcite type. */
static Type toZetaType(RelDataType calciteType) {
if (CALCITE_TO_ZETA_SIMPLE_TYPES.containsKey(calciteType.getSqlTypeName())) {
return CALCITE_TO_ZETA_SIMPLE_TYPES.get(calciteType.getSqlTypeName());
}
switch (calciteType.getSqlTypeName()) {
case ARRAY:
return TypeFactory.createArrayType(toZetaType(calciteType.getComponentType()));
case MAP:
// it is ok to return a simple type for MAP because MAP only exists in pubsub table which
// used to save table attribute.
// TODO: have a better way to handle MAP given the fact that ZetaSQL has no MAP type.
return TypeFactory.createSimpleType(TypeKind.TYPE_STRING);
case ROW:
List<StructField> structFields =
calciteType.getFieldList().stream()
.map(f -> new StructField(f.getName(), toZetaType(f.getType())))
.collect(toList());
return TypeFactory.createStructType(structFields);
default:
throw new RuntimeException("Unsupported RelDataType: " + calciteType);
}
}
public static RelDataType toRelDataType(RexBuilder rexBuilder, Type type, boolean isNullable) {
if (type.getKind().equals(TypeKind.TYPE_ARRAY)) {
return toArrayRelDataType(rexBuilder, type.asArray(), isNullable);
} else if (type.getKind().equals(TypeKind.TYPE_STRUCT)) {
return toStructRelDataType(rexBuilder, type.asStruct(), isNullable);
} else {
// TODO: Check type's nullability?
return toSimpleRelDataType(type.getKind(), rexBuilder, isNullable);
}
}
public static RelDataType toArrayRelDataType(
RexBuilder rexBuilder, ArrayType arrayType, boolean isNullable) {
// -1 cardinality means unlimited array size.
// TODO: is unlimited array size right for general case?
// TODO: whether isNullable should be ArrayType's nullablity (not its element type's?)
return rexBuilder
.getTypeFactory()
.createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1);
}
private static RelDataType toStructRelDataType(
RexBuilder rexBuilder, StructType structType, boolean isNullable) {
List<StructField> fields = structType.getFieldList();
List<String> fieldNames = fields.stream().map(StructField::getName).collect(toList());
List<RelDataType> fieldTypes =
fields.stream()
.map(f -> toRelDataType(rexBuilder, f.getType(), isNullable))
.collect(toList());
return rexBuilder.getTypeFactory().createStructType(fieldTypes, fieldNames);
}
// TODO: convert TIMESTAMP with/without TIMEZONE and DATETIME.
public static RelDataType toSimpleRelDataType(TypeKind kind, RexBuilder rexBuilder) {
return toSimpleRelDataType(kind, rexBuilder, true);
}
public static RelDataType toSimpleRelDataType(
TypeKind kind, RexBuilder rexBuilder, boolean isNullable) {
if (!ZETA_TO_CALCITE_SIMPLE_TYPES.containsKey(kind)) {
throw new RuntimeException("Unsupported column type: " + kind);
}
RelDataType relDataType = ZETA_TO_CALCITE_SIMPLE_TYPES.get(kind).apply(rexBuilder);
return nullable(rexBuilder, relDataType, isNullable);
}
private static RelDataType nullable(RexBuilder r, RelDataType relDataType, boolean isNullable) {
return r.getTypeFactory().createTypeWithNullability(relDataType, isNullable);
}
private static Function<RexBuilder, RelDataType> relDataTypeFactory(SqlTypeName typeName) {
return (RexBuilder r) -> r.getTypeFactory().createSqlType(typeName);
}
}