blob: 9f0e948d6d77bedad1dcab4c27cfda3292d90dde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.Function;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.FunctionParameter;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.ScalarFunction;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlIdentifier;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlOperator;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.FamilyOperandTypeChecker;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.InferTypes;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.OperandTypes;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.Util;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
/**
* A separate SqlOperators table for those functions that do not exist or not compatible with
* Calcite. Most of functions within this class is copied from Calcite.
*/
public class SqlOperators {
public static final RelDataType TIMESTAMP_WITH_NULLABILITY =
createSqlType(SqlTypeName.TIMESTAMP, true);
public static final RelDataType OTHER = createSqlType(SqlTypeName.OTHER, false);
public static final RelDataType TIMESTAMP = createSqlType(SqlTypeName.TIMESTAMP, false);
public static final RelDataType BIGINT = createSqlType(SqlTypeName.BIGINT, false);
public static SqlUserDefinedFunction createUdfOperator(
String name,
Class<?> methodClass,
String methodName,
SqlReturnTypeInference returnTypeInference,
List<RelDataType> paramTypes) {
return new SqlUserDefinedFunction(
new SqlIdentifier(name, SqlParserPos.ZERO),
returnTypeInference,
null,
null,
paramTypes,
ScalarFunctionImpl.create(methodClass, methodName));
}
// Helper function to create SqlUserDefinedFunction based on a function name and a method.
// SqlUserDefinedFunction will be able to pass through Calcite codegen and get proper function
// called.
public static SqlUserDefinedFunction createUdfOperator(String name, Method method) {
Function function = ScalarFunctionImpl.create(method);
final RelDataTypeFactory typeFactory = createTypeFactory();
List<RelDataType> argTypes = new ArrayList<>();
List<SqlTypeFamily> typeFamilies = new ArrayList<>();
for (FunctionParameter o : function.getParameters()) {
final RelDataType type = o.getType(typeFactory);
argTypes.add(type);
typeFamilies.add(Util.first(type.getSqlTypeName().getFamily(), SqlTypeFamily.ANY));
}
final FamilyOperandTypeChecker typeChecker =
OperandTypes.family(typeFamilies, i -> function.getParameters().get(i).isOptional());
final List<RelDataType> paramTypes = toSql(typeFactory, argTypes);
return new SqlUserDefinedFunction(
new SqlIdentifier(name, SqlParserPos.ZERO),
infer((ScalarFunction) function),
InferTypes.explicit(argTypes),
typeChecker,
paramTypes,
function);
}
private static RelDataType createSqlType(SqlTypeName typeName, boolean withNullability) {
final RelDataTypeFactory typeFactory = createTypeFactory();
RelDataType type = typeFactory.createSqlType(typeName);
if (withNullability) {
type = typeFactory.createTypeWithNullability(type, true);
}
return type;
}
private static RelDataTypeFactory createTypeFactory() {
return new SqlTypeFactoryImpl(BeamRelDataTypeSystem.INSTANCE);
}
private static SqlReturnTypeInference infer(final ScalarFunction function) {
return opBinding -> {
final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
final RelDataType type;
if (function instanceof ScalarFunctionImpl) {
type = ((ScalarFunctionImpl) function).getReturnType(typeFactory, opBinding);
} else {
type = function.getReturnType(typeFactory);
}
return toSql(typeFactory, type);
};
}
private static List<RelDataType> toSql(
final RelDataTypeFactory typeFactory, List<RelDataType> types) {
return Lists.transform(types, type -> toSql(typeFactory, type));
}
private static RelDataType toSql(RelDataTypeFactory typeFactory, RelDataType type) {
if (type instanceof RelDataTypeFactoryImpl.JavaType
&& ((RelDataTypeFactoryImpl.JavaType) type).getJavaClass() == Object.class) {
return typeFactory.createTypeWithNullability(
typeFactory.createSqlType(SqlTypeName.ANY), true);
}
return JavaTypeFactoryImpl.toSql(typeFactory, type);
}
private static final RelDataType BIGINT_WITH_NULLABILITY =
createSqlType(SqlTypeName.BIGINT, true);
public static final SqlOperator START_WITHS =
createUdfOperator("STARTS_WITH", BeamBuiltinMethods.STARTS_WITH_METHOD);
public static final SqlOperator CONCAT =
createUdfOperator("CONCAT", BeamBuiltinMethods.CONCAT_METHOD);
public static final SqlOperator REPLACE =
createUdfOperator("REPLACE", BeamBuiltinMethods.REPLACE_METHOD);
public static final SqlOperator TRIM = createUdfOperator("TRIM", BeamBuiltinMethods.TRIM_METHOD);
public static final SqlOperator LTRIM =
createUdfOperator("LTRIM", BeamBuiltinMethods.LTRIM_METHOD);
public static final SqlOperator RTRIM =
createUdfOperator("RTRIM", BeamBuiltinMethods.RTRIM_METHOD);
public static final SqlOperator SUBSTR =
createUdfOperator("SUBSTR", BeamBuiltinMethods.SUBSTR_METHOD);
public static final SqlOperator REVERSE =
createUdfOperator("REVERSE", BeamBuiltinMethods.REVERSE_METHOD);
public static final SqlOperator CHAR_LENGTH =
createUdfOperator("CHAR_LENGTH", BeamBuiltinMethods.CHAR_LENGTH_METHOD);
public static final SqlOperator ENDS_WITH =
createUdfOperator("ENDS_WITH", BeamBuiltinMethods.ENDS_WITH_METHOD);
public static final SqlOperator LIKE = createUdfOperator("LIKE", BeamBuiltinMethods.LIKE_METHOD);
public static final SqlOperator VALIDATE_TIMESTAMP =
createUdfOperator(
"validateTimestamp",
DateTimeUtils.class,
"validateTimestamp",
x -> TIMESTAMP_WITH_NULLABILITY,
ImmutableList.of(TIMESTAMP));
public static final SqlOperator VALIDATE_TIME_INTERVAL =
createUdfOperator(
"validateIntervalArgument",
DateTimeUtils.class,
"validateTimeInterval",
x -> BIGINT_WITH_NULLABILITY,
ImmutableList.of(BIGINT, OTHER));
public static final SqlOperator TIMESTAMP_OP =
createUdfOperator("TIMESTAMP", BeamBuiltinMethods.TIMESTAMP_METHOD);
public static final SqlOperator DATE_OP =
createUdfOperator("DATE", BeamBuiltinMethods.DATE_METHOD);
}