| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.zetasql; |
| |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_FLOAT; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT32; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIME; |
| import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP; |
| import static java.util.stream.Collectors.toList; |
| |
| import com.google.zetasql.ArrayType; |
| import com.google.zetasql.StructType; |
| import com.google.zetasql.StructType.StructField; |
| import com.google.zetasql.Type; |
| import com.google.zetasql.TypeFactory; |
| import com.google.zetasql.ZetaSQLType.TypeKind; |
| import java.util.List; |
| import java.util.function.Function; |
| import org.apache.beam.sdk.annotations.Internal; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; |
| |
| /** Utility to convert types from Calcite Schema types. */ |
| @Internal |
| public class TypeUtils { |
| |
| private static final ImmutableMap<SqlTypeName, Type> CALCITE_TO_ZETA_SIMPLE_TYPES = |
| ImmutableMap.<SqlTypeName, Type>builder() |
| .put(SqlTypeName.BIGINT, TypeFactory.createSimpleType(TYPE_INT64)) |
| .put(SqlTypeName.INTEGER, TypeFactory.createSimpleType(TYPE_INT32)) |
| .put(SqlTypeName.VARCHAR, TypeFactory.createSimpleType(TYPE_STRING)) |
| .put(SqlTypeName.BOOLEAN, TypeFactory.createSimpleType(TYPE_BOOL)) |
| .put(SqlTypeName.FLOAT, TypeFactory.createSimpleType(TYPE_FLOAT)) |
| .put(SqlTypeName.DOUBLE, TypeFactory.createSimpleType(TYPE_DOUBLE)) |
| .put(SqlTypeName.VARBINARY, TypeFactory.createSimpleType(TYPE_BYTES)) |
| .put(SqlTypeName.TIMESTAMP, TypeFactory.createSimpleType(TYPE_TIMESTAMP)) |
| .put(SqlTypeName.DATE, TypeFactory.createSimpleType(TYPE_DATE)) |
| .put(SqlTypeName.TIME, TypeFactory.createSimpleType(TYPE_TIME)) |
| .build(); |
| |
| private static final ImmutableMap<TypeKind, Function<RexBuilder, RelDataType>> |
| ZETA_TO_CALCITE_SIMPLE_TYPES = |
| ImmutableMap.<TypeKind, Function<RexBuilder, RelDataType>>builder() |
| .put(TYPE_NUMERIC, relDataTypeFactory(SqlTypeName.DECIMAL)) |
| .put(TYPE_INT32, relDataTypeFactory(SqlTypeName.INTEGER)) |
| .put(TYPE_INT64, relDataTypeFactory(SqlTypeName.BIGINT)) |
| .put(TYPE_FLOAT, relDataTypeFactory(SqlTypeName.FLOAT)) |
| .put(TYPE_DOUBLE, relDataTypeFactory(SqlTypeName.DOUBLE)) |
| .put(TYPE_STRING, relDataTypeFactory(SqlTypeName.VARCHAR)) |
| .put(TYPE_BOOL, relDataTypeFactory(SqlTypeName.BOOLEAN)) |
| .put(TYPE_BYTES, relDataTypeFactory(SqlTypeName.VARBINARY)) |
| .put(TYPE_DATE, relDataTypeFactory(SqlTypeName.DATE)) |
| .put(TYPE_TIME, relDataTypeFactory(SqlTypeName.TIME)) |
| // TODO: handle timestamp with time zone. |
| .put(TYPE_TIMESTAMP, relDataTypeFactory(SqlTypeName.TIMESTAMP)) |
| .build(); |
| |
| /** Returns a type matching the corresponding Calcite type. */ |
| static Type toZetaType(RelDataType calciteType) { |
| |
| if (CALCITE_TO_ZETA_SIMPLE_TYPES.containsKey(calciteType.getSqlTypeName())) { |
| return CALCITE_TO_ZETA_SIMPLE_TYPES.get(calciteType.getSqlTypeName()); |
| } |
| |
| switch (calciteType.getSqlTypeName()) { |
| case ARRAY: |
| return TypeFactory.createArrayType(toZetaType(calciteType.getComponentType())); |
| case MAP: |
| |
| // it is ok to return a simple type for MAP because MAP only exists in pubsub table which |
| // used to save table attribute. |
| // TODO: have a better way to handle MAP given the fact that ZetaSQL has no MAP type. |
| return TypeFactory.createSimpleType(TypeKind.TYPE_STRING); |
| case ROW: |
| List<StructField> structFields = |
| calciteType.getFieldList().stream() |
| .map(f -> new StructField(f.getName(), toZetaType(f.getType()))) |
| .collect(toList()); |
| |
| return TypeFactory.createStructType(structFields); |
| default: |
| throw new RuntimeException("Unsupported RelDataType: " + calciteType); |
| } |
| } |
| |
| public static RelDataType toRelDataType(RexBuilder rexBuilder, Type type, boolean isNullable) { |
| if (type.getKind().equals(TypeKind.TYPE_ARRAY)) { |
| return toArrayRelDataType(rexBuilder, type.asArray(), isNullable); |
| } else if (type.getKind().equals(TypeKind.TYPE_STRUCT)) { |
| return toStructRelDataType(rexBuilder, type.asStruct(), isNullable); |
| } else { |
| // TODO: Check type's nullability? |
| return toSimpleRelDataType(type.getKind(), rexBuilder, isNullable); |
| } |
| } |
| |
| public static RelDataType toArrayRelDataType( |
| RexBuilder rexBuilder, ArrayType arrayType, boolean isNullable) { |
| // -1 cardinality means unlimited array size. |
| // TODO: is unlimited array size right for general case? |
| // TODO: whether isNullable should be ArrayType's nullablity (not its element type's?) |
| return rexBuilder |
| .getTypeFactory() |
| .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1); |
| } |
| |
| private static RelDataType toStructRelDataType( |
| RexBuilder rexBuilder, StructType structType, boolean isNullable) { |
| |
| List<StructField> fields = structType.getFieldList(); |
| List<String> fieldNames = fields.stream().map(StructField::getName).collect(toList()); |
| List<RelDataType> fieldTypes = |
| fields.stream() |
| .map(f -> toRelDataType(rexBuilder, f.getType(), isNullable)) |
| .collect(toList()); |
| |
| return rexBuilder.getTypeFactory().createStructType(fieldTypes, fieldNames); |
| } |
| |
| // TODO: convert TIMESTAMP with/without TIMEZONE and DATETIME. |
| public static RelDataType toSimpleRelDataType(TypeKind kind, RexBuilder rexBuilder) { |
| return toSimpleRelDataType(kind, rexBuilder, true); |
| } |
| |
| public static RelDataType toSimpleRelDataType( |
| TypeKind kind, RexBuilder rexBuilder, boolean isNullable) { |
| if (!ZETA_TO_CALCITE_SIMPLE_TYPES.containsKey(kind)) { |
| throw new RuntimeException("Unsupported column type: " + kind); |
| } |
| |
| RelDataType relDataType = ZETA_TO_CALCITE_SIMPLE_TYPES.get(kind).apply(rexBuilder); |
| return nullable(rexBuilder, relDataType, isNullable); |
| } |
| |
| private static RelDataType nullable(RexBuilder r, RelDataType relDataType, boolean isNullable) { |
| return r.getTypeFactory().createTypeWithNullability(relDataType, isNullable); |
| } |
| |
| private static Function<RexBuilder, RelDataType> relDataTypeFactory(SqlTypeName typeName) { |
| return (RexBuilder r) -> r.getTypeFactory().createSqlType(typeName); |
| } |
| } |