blob: 6d20abddefe3a77e70267d6ea07f4fef06c24ec6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.operations.utils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionUtils;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.operations.CalculatedQueryOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeUtils;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
/** Utility class for creating a valid {@link CalculatedQueryOperation} operation. */
@Internal
final class CalculatedTableFactory {
/**
* Creates a valid {@link CalculatedQueryOperation} operation.
*
* @param callExpr call to table function as expression
* @return valid calculated table
*/
QueryOperation create(ResolvedExpression callExpr, List<String> leftTableFieldNames) {
FunctionTableCallVisitor calculatedTableCreator =
new FunctionTableCallVisitor(leftTableFieldNames);
return callExpr.accept(calculatedTableCreator);
}
private static class FunctionTableCallVisitor
extends ResolvedExpressionDefaultVisitor<CalculatedQueryOperation> {
private final List<String> leftTableFieldNames;
private static final String ATOMIC_FIELD_NAME = "f0";
public FunctionTableCallVisitor(List<String> leftTableFieldNames) {
this.leftTableFieldNames = leftTableFieldNames;
}
@Override
public CalculatedQueryOperation visit(CallExpression call) {
FunctionDefinition definition = call.getFunctionDefinition();
if (definition.equals(AS)) {
return unwrapFromAlias(call);
}
return createFunctionCall(call, Collections.emptyList(), call.getResolvedChildren());
}
private CalculatedQueryOperation unwrapFromAlias(CallExpression call) {
List<Expression> children = call.getChildren();
List<String> aliases =
children.subList(1, children.size()).stream()
.map(
alias ->
ExpressionUtils.extractValue(alias, String.class)
.orElseThrow(
() ->
new ValidationException(
"Unexpected alias: "
+ alias)))
.collect(toList());
if (!isFunctionOfKind(children.get(0), FunctionKind.TABLE)) {
throw fail();
}
CallExpression tableCall = (CallExpression) children.get(0);
return createFunctionCall(tableCall, aliases, tableCall.getResolvedChildren());
}
private CalculatedQueryOperation createFunctionCall(
CallExpression callExpression,
List<String> aliases,
List<ResolvedExpression> parameters) {
final ResolvedSchema resolvedSchema =
adjustNames(
extractSchema(callExpression.getOutputDataType()),
aliases,
callExpression.getFunctionName());
return new CalculatedQueryOperation(
ContextResolvedFunction.fromCallExpression(callExpression),
parameters,
resolvedSchema);
}
private ResolvedSchema extractSchema(DataType resultDataType) {
if (LogicalTypeChecks.isCompositeType(resultDataType.getLogicalType())) {
return DataTypeUtils.expandCompositeTypeToSchema(resultDataType);
}
int i = 0;
String fieldName = ATOMIC_FIELD_NAME;
while (leftTableFieldNames.contains(fieldName)) {
fieldName = ATOMIC_FIELD_NAME + "_" + i++;
}
return ResolvedSchema.physical(
Collections.singletonList(fieldName),
Collections.singletonList(resultDataType));
}
private ResolvedSchema adjustNames(
ResolvedSchema resolvedSchema, List<String> aliases, String functionName) {
int aliasesSize = aliases.size();
if (aliasesSize == 0) {
return resolvedSchema;
}
int callArity = resolvedSchema.getColumnCount();
if (callArity != aliasesSize) {
throw new ValidationException(
String.format(
"List of column aliases must have same degree as table; "
+ "the returned table of function '%s' has "
+ "%d columns, whereas alias list has %d columns",
functionName, callArity, aliasesSize));
}
return ResolvedSchema.physical(aliases, resolvedSchema.getColumnDataTypes());
}
@Override
protected CalculatedQueryOperation defaultMethod(ResolvedExpression expression) {
throw fail();
}
private ValidationException fail() {
return new ValidationException(
"A lateral join only accepts an expression which defines a table function "
+ "call that might be followed by some alias.");
}
}
}