blob: 52c8d7f8b9874c8ea85ffd3dae08f5c5bb6c67ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.calcite;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalDistribution;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalExpand;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalIntersect;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacySink;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMinus;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSink;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableAggregate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalUnion;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalValues;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowTableAggregate;
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
import org.apache.flink.table.planner.plan.trait.RelWindowProperties;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalCalc;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexPatternFieldRef;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.isProctimeIndicatorType;
import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.isRowtimeIndicatorType;
import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.isTimeIndicatorType;
import static org.apache.flink.table.planner.plan.utils.MatchUtil.isFinalOnMatchTimeIndicator;
import static org.apache.flink.table.planner.plan.utils.WindowUtil.groupingContainsWindowStartEnd;
/**
* Traverses a {@link RelNode} tree and converts fields with {@link TimeIndicatorRelDataType} type.
* If a time attribute is accessed for a calculation, it will be materialized. Forwarding is allowed
* in some cases, but not all.
*/
public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle {
private final RexBuilder rexBuilder;
private RelTimeIndicatorConverter(RexBuilder rexBuilder) {
this.rexBuilder = rexBuilder;
}
public static RelNode convert(
RelNode rootRel, RexBuilder rexBuilder, boolean needFinalTimeIndicatorConversion) {
RelTimeIndicatorConverter converter = new RelTimeIndicatorConverter(rexBuilder);
RelNode convertedRoot = rootRel.accept(converter);
// FlinkLogicalLegacySink and FlinkLogicalSink are already converted
if (rootRel instanceof FlinkLogicalLegacySink
|| rootRel instanceof FlinkLogicalSink
|| !needFinalTimeIndicatorConversion) {
return convertedRoot;
}
// materialize remaining procTime indicators
return converter.materializeProcTime(convertedRoot);
}
@Override
public RelNode visit(RelNode node) {
if (node instanceof FlinkLogicalValues || node instanceof TableScan) {
return node;
} else if (node instanceof FlinkLogicalIntersect
|| node instanceof FlinkLogicalUnion
|| node instanceof FlinkLogicalMinus) {
return visitSetOp((SetOp) node);
} else if (node instanceof FlinkLogicalTableFunctionScan
|| node instanceof FlinkLogicalSnapshot
|| node instanceof FlinkLogicalRank
|| node instanceof FlinkLogicalDistribution
|| node instanceof FlinkLogicalWatermarkAssigner
|| node instanceof FlinkLogicalSort
|| node instanceof FlinkLogicalOverAggregate
|| node instanceof FlinkLogicalExpand) {
return visitSimpleRel(node);
} else if (node instanceof FlinkLogicalWindowAggregate) {
return visitWindowAggregate((FlinkLogicalWindowAggregate) node);
} else if (node instanceof FlinkLogicalWindowTableAggregate) {
return visitWindowTableAggregate((FlinkLogicalWindowTableAggregate) node);
} else if (node instanceof FlinkLogicalAggregate) {
return visitAggregate((FlinkLogicalAggregate) node);
} else if (node instanceof FlinkLogicalTableAggregate) {
return visitTableAggregate((FlinkLogicalTableAggregate) node);
} else if (node instanceof FlinkLogicalMatch) {
return visitMatch((FlinkLogicalMatch) node);
} else if (node instanceof FlinkLogicalCalc) {
return visitCalc((FlinkLogicalCalc) node);
} else if (node instanceof FlinkLogicalCorrelate) {
return visitCorrelate((FlinkLogicalCorrelate) node);
} else if (node instanceof FlinkLogicalJoin) {
return visitJoin((FlinkLogicalJoin) node);
} else if (node instanceof FlinkLogicalSink) {
return visitSink((FlinkLogicalSink) node);
} else if (node instanceof FlinkLogicalLegacySink) {
return visitSink((FlinkLogicalLegacySink) node);
} else {
return visitInvalidRel(node);
}
}
@Override
public RelNode visit(LogicalCalc calc) {
return visitInvalidRel(calc);
}
@Override
public RelNode visit(LogicalTableModify modify) {
return visitInvalidRel(modify);
}
private RelNode visitMatch(FlinkLogicalMatch match) {
RelNode newInput = match.getInput().accept(this);
RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(newInput);
Function<Map<String, RexNode>, Map<String, RexNode>> materializeExprs =
rexNodesMap ->
rexNodesMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().accept(materializer),
(e1, e2) -> e1,
LinkedHashMap::new));
// update input expressions
Map<String, RexNode> newPatternDefs = materializeExprs.apply(match.getPatternDefinitions());
Map<String, RexNode> newMeasures = materializeExprs.apply(match.getMeasures());
RexNode newInterval = null;
if (match.getInterval() != null) {
newInterval = match.getInterval().accept(materializer);
}
Predicate<String> isNoLongerTimeIndicator =
fieldName -> {
RexNode newMeasure = newMeasures.get(fieldName);
if (newMeasure == null) {
return false;
} else {
return !isTimeIndicatorType(newMeasure.getType());
}
};
// materialize all output types
RelDataType newOutputType =
getRowTypeWithoutTimeIndicator(match.getRowType(), isNoLongerTimeIndicator);
return new FlinkLogicalMatch(
match.getCluster(),
match.getTraitSet(),
newInput,
newOutputType,
match.getPattern(),
match.isStrictStart(),
match.isStrictEnd(),
newPatternDefs,
newMeasures,
match.getAfter(),
match.getSubsets(),
match.isAllRows(),
match.getPartitionKeys(),
match.getOrderKeys(),
newInterval);
}
private RelNode visitCalc(FlinkLogicalCalc calc) {
// visit children and update inputs
RelNode newInput = calc.getInput().accept(this);
RexProgram program = calc.getProgram();
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(newInput);
List<RexNode> newProjects =
program.getProjectList().stream()
.map(project -> program.expandLocalRef(project).accept(materializer))
.collect(Collectors.toList());
// materialize condition due to filter will validate condition type
RexNode newCondition = null;
if (program.getCondition() != null) {
newCondition = program.expandLocalRef(program.getCondition()).accept(materializer);
}
RexProgram newProgram =
RexProgram.create(
newInput.getRowType(),
newProjects,
newCondition,
program.getOutputRowType().getFieldNames(),
rexBuilder);
return calc.copy(calc.getTraitSet(), newInput, newProgram);
}
private RelNode visitJoin(FlinkLogicalJoin join) {
RelNode newLeft = join.getLeft().accept(this);
RelNode newRight = join.getRight().accept(this);
int leftFieldCount = newLeft.getRowType().getFieldCount();
// temporal table join
if (TemporalJoinUtil.satisfyTemporalJoin(join)) {
RelNode rewrittenTemporalJoin =
join.copy(
join.getTraitSet(),
join.getCondition(),
newLeft,
newRight,
join.getJoinType(),
join.isSemiJoinDone());
// Materialize all of the time attributes from the right side of temporal join
Set<Integer> rightIndices =
IntStream.range(0, newRight.getRowType().getFieldCount())
.mapToObj(startIdx -> leftFieldCount + startIdx)
.collect(Collectors.toSet());
return createCalcToMaterializeTimeIndicators(rewrittenTemporalJoin, rightIndices);
} else {
if (JoinUtil.satisfyRegularJoin(join, join.getRight())) {
// materialize time attribute fields of regular join's inputs
newLeft = materializeTimeIndicators(newLeft);
newRight = materializeTimeIndicators(newRight);
}
List<RelDataTypeField> leftRightFields = new ArrayList<>();
leftRightFields.addAll(newLeft.getRowType().getFieldList());
leftRightFields.addAll(newRight.getRowType().getFieldList());
RexNode newCondition =
join.getCondition()
.accept(
new RexShuttle() {
@Override
public RexNode visitInputRef(RexInputRef inputRef) {
if (isTimeIndicatorType(inputRef.getType())) {
return RexInputRef.of(
inputRef.getIndex(), leftRightFields);
} else {
return super.visitInputRef(inputRef);
}
}
});
return FlinkLogicalJoin.create(newLeft, newRight, newCondition, join.getJoinType());
}
}
private RelNode visitCorrelate(FlinkLogicalCorrelate correlate) {
// visit children and update inputs
RelNode newLeft = correlate.getLeft().accept(this);
RelNode newRight = correlate.getRight().accept(this);
if (newRight instanceof FlinkLogicalTableFunctionScan) {
FlinkLogicalTableFunctionScan newScan = (FlinkLogicalTableFunctionScan) newRight;
List<RelNode> newScanInputs =
newScan.getInputs().stream()
.map(input -> input.accept(this))
.collect(Collectors.toList());
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(newLeft);
RexNode newScanCall = newScan.getCall().accept(materializer);
newRight =
newScan.copy(
newScan.getTraitSet(),
newScanInputs,
newScanCall,
newScan.getElementType(),
newScan.getRowType(),
newScan.getColumnMappings());
}
return FlinkLogicalCorrelate.create(
newLeft,
newRight,
correlate.getCorrelationId(),
correlate.getRequiredColumns(),
correlate.getJoinType());
}
private RelNode visitSimpleRel(RelNode node) {
List<RelNode> newInputs =
node.getInputs().stream()
.map(input -> input.accept(this))
.collect(Collectors.toList());
return node.copy(node.getTraitSet(), newInputs);
}
private RelNode visitSetOp(SetOp setOp) {
RelNode convertedSetOp = visitSimpleRel(setOp);
// make sure that time indicator types match
List<RelDataTypeField> headInputFields =
convertedSetOp.getInputs().get(0).getRowType().getFieldList();
int fieldCnt = headInputFields.size();
for (int inputIdx = 1; inputIdx < convertedSetOp.getInputs().size(); inputIdx++) {
List<RelDataTypeField> currentInputFields =
convertedSetOp.getInputs().get(inputIdx).getRowType().getFieldList();
for (int fieldIdx = 0; fieldIdx < fieldCnt; fieldIdx++) {
RelDataType headFieldType = headInputFields.get(fieldIdx).getType();
RelDataType currentInputFieldType = currentInputFields.get(fieldIdx).getType();
validateType(currentInputFieldType, headFieldType);
}
}
return convertedSetOp;
}
private RelNode visitSink(SingleRel sink) {
Preconditions.checkArgument(
sink instanceof FlinkLogicalLegacySink || sink instanceof FlinkLogicalSink);
RelNode newInput = sink.getInput().accept(this);
newInput = materializeProcTime(newInput);
return sink.copy(sink.getTraitSet(), Collections.singletonList(newInput));
}
private FlinkLogicalAggregate visitAggregate(FlinkLogicalAggregate agg) {
RelNode newInput = convertAggInput(agg);
List<AggregateCall> updatedAggCalls = convertAggregateCalls(agg);
return (FlinkLogicalAggregate)
agg.copy(
agg.getTraitSet(),
newInput,
agg.getGroupSet(),
agg.getGroupSets(),
updatedAggCalls);
}
private RelNode convertAggInput(Aggregate agg) {
RelNode newInput = agg.getInput().accept(this);
// materialize aggregation arguments/grouping keys
Set<Integer> timeIndicatorIndices = gatherIndicesToMaterialize(agg, newInput);
return materializeTimeIndicators(newInput, timeIndicatorIndices);
}
private Set<Integer> gatherIndicesToMaterialize(Aggregate agg, RelNode newInput) {
List<RelDataType> inputFieldTypes = RelOptUtil.getFieldTypeList(newInput.getRowType());
Predicate<Integer> isTimeIndicator = idx -> isTimeIndicatorType(inputFieldTypes.get(idx));
// add arguments of agg calls
Set<Integer> aggCallArgs =
agg.getAggCallList().stream()
.map(AggregateCall::getArgList)
.flatMap(List::stream)
.filter(isTimeIndicator)
.collect(Collectors.toSet());
FlinkRelMetadataQuery fmq =
FlinkRelMetadataQuery.reuseOrCreate(agg.getCluster().getMetadataQuery());
RelWindowProperties windowProps = fmq.getRelWindowProperties(newInput);
// add grouping sets
Set<Integer> groupSets =
agg.getGroupSets().stream()
.map(
grouping -> {
if (windowProps != null
&& groupingContainsWindowStartEnd(
grouping, windowProps)) {
// for window aggregate we should reserve the time attribute
// of window_time column
return grouping.except(windowProps.getWindowTimeColumns());
} else {
return grouping;
}
})
.flatMap(set -> set.asList().stream())
.filter(isTimeIndicator)
.collect(Collectors.toSet());
Set<Integer> timeIndicatorIndices = new HashSet<>(aggCallArgs);
timeIndicatorIndices.addAll(groupSets);
return timeIndicatorIndices;
}
private List<AggregateCall> convertAggregateCalls(Aggregate agg) {
// remove time indicator type as agg call return type
return agg.getAggCallList().stream()
.map(
call -> {
if (isTimeIndicatorType(call.getType())) {
RelDataType callType =
timestamp(
call.getType().isNullable(),
isTimestampLtzType(call.getType()));
return AggregateCall.create(
call.getAggregation(),
call.isDistinct(),
false,
false,
call.getArgList(),
call.filterArg,
RelCollations.EMPTY,
callType,
call.name);
} else {
return call;
}
})
.collect(Collectors.toList());
}
private RelNode visitTableAggregate(FlinkLogicalTableAggregate tableAgg) {
FlinkLogicalAggregate correspondingAgg =
FlinkLogicalAggregate.create(
tableAgg.getInput(),
tableAgg.getGroupSet(),
tableAgg.getGroupSets(),
tableAgg.getAggCallList());
FlinkLogicalAggregate convertedAgg = visitAggregate(correspondingAgg);
return new FlinkLogicalTableAggregate(
tableAgg.getCluster(),
tableAgg.getTraitSet(),
convertedAgg.getInput(),
convertedAgg.getGroupSet(),
convertedAgg.getGroupSets(),
convertedAgg.getAggCallList());
}
private FlinkLogicalWindowAggregate visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
RelNode newInput = convertAggInput(agg);
List<AggregateCall> updatedAggCalls = convertAggregateCalls(agg);
return new FlinkLogicalWindowAggregate(
agg.getCluster(),
agg.getTraitSet(),
newInput,
agg.getGroupSet(),
updatedAggCalls,
agg.getWindow(),
agg.getNamedProperties());
}
private RelNode visitWindowTableAggregate(FlinkLogicalWindowTableAggregate tableAgg) {
FlinkLogicalWindowAggregate correspondingAgg =
new FlinkLogicalWindowAggregate(
tableAgg.getCluster(),
tableAgg.getTraitSet(),
tableAgg.getInput(),
tableAgg.getGroupSet(),
tableAgg.getAggCallList(),
tableAgg.getWindow(),
tableAgg.getNamedProperties());
FlinkLogicalWindowAggregate convertedWindowAgg = visitWindowAggregate(correspondingAgg);
return new FlinkLogicalWindowTableAggregate(
tableAgg.getCluster(),
tableAgg.getTraitSet(),
convertedWindowAgg.getInput(),
tableAgg.getGroupSet(),
tableAgg.getGroupSets(),
convertedWindowAgg.getAggCallList(),
tableAgg.getWindow(),
tableAgg.getNamedProperties());
}
private RelNode visitInvalidRel(RelNode node) {
throw new TableException(
String.format(
"This is a bug and should not happen. Please file an issue. Unknown node %s.",
node.getRelTypeName()));
}
// ----------------------------------------------------------------------------------------
// Utility
// ----------------------------------------------------------------------------------------
private RelNode materializeProcTime(RelNode node) {
// there is no need to add a redundant calc to materialize proc-time if input is empty
// values. Otherwise we need add a PruneEmptyRules after the RelTimeIndicatorConverter to
// remove the redundant calc.
if (node instanceof FlinkLogicalValues
&& FlinkLogicalValues.isEmpty((FlinkLogicalValues) node)) {
return node;
}
Set<Integer> procTimeFieldIndices = gatherProcTimeIndices(node);
return materializeTimeIndicators(node, procTimeFieldIndices);
}
private RelNode materializeTimeIndicators(RelNode node) {
Set<Integer> timeFieldIndices = gatherTimeAttributeIndices(node);
return materializeTimeIndicators(node, timeFieldIndices);
}
private RelNode materializeTimeIndicators(RelNode node, Set<Integer> timeIndicatorIndices) {
if (timeIndicatorIndices.isEmpty()) {
return node;
}
// insert or merge with input calc if
// a time attribute is accessed and needs to be materialized
if (node instanceof FlinkLogicalCalc) {
// merge original calc
return mergeCalcToMaterializeTimeIndicators(
(FlinkLogicalCalc) node, timeIndicatorIndices);
} else {
return createCalcToMaterializeTimeIndicators(node, timeIndicatorIndices);
}
}
private RelNode mergeCalcToMaterializeTimeIndicators(
FlinkLogicalCalc calc, Set<Integer> refIndices) {
RexProgram program = calc.getProgram();
RexProgramBuilder newProgramBuilder =
new RexProgramBuilder(program.getInputRowType(), rexBuilder);
for (int idx = 0; idx < program.getNamedProjects().size(); idx++) {
Pair<RexLocalRef, String> pair = program.getNamedProjects().get(idx);
RexNode project = program.expandLocalRef(pair.left);
if (refIndices.contains(idx)) {
project = materializeTimeIndicators(project);
}
newProgramBuilder.addProject(project, pair.right);
}
if (program.getCondition() != null) {
newProgramBuilder.addCondition(program.expandLocalRef(program.getCondition()));
}
RexProgram newProgram = newProgramBuilder.getProgram();
return FlinkLogicalCalc.create(calc.getInput(), newProgram);
}
private RelNode createCalcToMaterializeTimeIndicators(RelNode input, Set<Integer> refIndices) {
// create new calc
List<RexNode> projects =
input.getRowType().getFieldList().stream()
.map(
field -> {
RexNode project =
new RexInputRef(field.getIndex(), field.getType());
if (refIndices.contains(field.getIndex())) {
project = materializeTimeIndicators(project);
}
return project;
})
.collect(Collectors.toList());
RexProgram newProgram =
RexProgram.create(
input.getRowType(),
projects,
null,
input.getRowType().getFieldNames(),
rexBuilder);
return FlinkLogicalCalc.create(input, newProgram);
}
private RexNode materializeTimeIndicators(RexNode expr) {
if (isRowtimeIndicatorType(expr.getType())) {
// cast rowTime indicator to regular timestamp
return rexBuilder.makeAbstractCast(
timestamp(expr.getType().isNullable(), isTimestampLtzType(expr.getType())),
expr);
} else if (isProctimeIndicatorType(expr.getType())) {
// generate procTime access
return rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr);
} else {
return expr;
}
}
private void validateType(RelDataType l, RelDataType r) {
boolean isValid;
// check if time indicators match
if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) {
boolean leftIsEventTime = ((TimeIndicatorRelDataType) l).isEventTime();
boolean rightIsEventTime = ((TimeIndicatorRelDataType) r).isEventTime();
isValid = leftIsEventTime == rightIsEventTime;
} else {
isValid = !isTimeIndicatorType(l) && !isTimeIndicatorType(r);
}
if (!isValid) {
throw new ValidationException(
String.format(
"Union fields with time attributes requires same types, but the types are %s and %s.",
l, r));
}
}
private RelDataType getRowTypeWithoutTimeIndicator(
RelDataType relType, Predicate<String> shouldMaterialize) {
Map<String, RelDataType> convertedFields =
relType.getFieldList().stream()
.map(
field -> {
RelDataType fieldType = field.getType();
if (isTimeIndicatorType(fieldType)
&& shouldMaterialize.test(field.getName())) {
fieldType =
timestamp(
fieldType.isNullable(),
isTimestampLtzType(fieldType));
}
return Tuple2.of(field.getName(), fieldType);
})
.collect(
Collectors.toMap(
t -> t.f0, t -> t.f1, (e1, e2) -> e1, LinkedHashMap::new));
return rexBuilder.getTypeFactory().builder().addAll(convertedFields.entrySet()).build();
}
private Set<Integer> gatherProcTimeIndices(RelNode node) {
return gatherTimeAttributeIndices(node, f -> isProctimeIndicatorType(f.getType()));
}
private Set<Integer> gatherTimeAttributeIndices(RelNode node) {
return gatherTimeAttributeIndices(node, f -> isTimeIndicatorType(f.getType()));
}
private Set<Integer> gatherTimeAttributeIndices(
RelNode node, Predicate<RelDataTypeField> predicate) {
return node.getRowType().getFieldList().stream()
.filter(predicate)
.map(RelDataTypeField::getIndex)
.collect(Collectors.toSet());
}
private RelDataType timestamp(boolean isNullable, boolean isTimestampLtzIndicator) {
LogicalType logicalType;
if (isTimestampLtzIndicator) {
logicalType = new LocalZonedTimestampType(isNullable, 3);
} else {
logicalType = new TimestampType(isNullable, 3);
}
return ((FlinkTypeFactory) rexBuilder.getTypeFactory())
.createFieldTypeFromLogicalType(logicalType);
}
private boolean isTimestampLtzType(RelDataType type) {
return type.getSqlTypeName().equals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
}
// ----------------------------------------------------------------------------------------
// Materializer for RexNode including time indicator
// ----------------------------------------------------------------------------------------
private class RexTimeIndicatorMaterializer extends RexShuttle {
private final List<RelDataType> inputFieldTypes;
private RexTimeIndicatorMaterializer(RelNode node) {
this(RelOptUtil.getFieldTypeList(node.getRowType()));
}
private RexTimeIndicatorMaterializer(List<RelDataType> inputFieldTypes) {
this.inputFieldTypes = inputFieldTypes;
}
@Override
public RexNode visitCall(RexCall call) {
RexCall updatedCall = (RexCall) super.visitCall(call);
// materialize operands with time indicators
List<RexNode> materializedOperands;
SqlOperator updatedCallOp = updatedCall.getOperator();
if (updatedCallOp == FlinkSqlOperatorTable.SESSION_OLD
|| updatedCallOp == FlinkSqlOperatorTable.HOP_OLD
|| updatedCallOp == FlinkSqlOperatorTable.TUMBLE_OLD) {
// skip materialization for special operators
materializedOperands = updatedCall.getOperands();
} else {
materializedOperands =
updatedCall.getOperands().stream()
.map(RelTimeIndicatorConverter.this::materializeTimeIndicators)
.collect(Collectors.toList());
}
// All calls in MEASURES and DEFINE are wrapped with FINAL/RUNNING, therefore
// we should treat FINAL(MATCH_ROWTIME) and FINAL(MATCH_PROCTIME) as a time attribute
// extraction
if (isFinalOnMatchTimeIndicator(call)) {
return updatedCall;
} else if (isTimeIndicatorType(updatedCall.getType())) {
// do not modify window time attributes and some special operators
if (updatedCallOp == FlinkSqlOperatorTable.TUMBLE_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.TUMBLE_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.HOP_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.HOP_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.SESSION_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.SESSION_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.MATCH_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.MATCH_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.PROCTIME
|| updatedCallOp == SqlStdOperatorTable.AS
|| updatedCallOp == SqlStdOperatorTable.CAST
|| updatedCallOp == FlinkSqlOperatorTable.REINTERPRET) {
return updatedCall;
} else {
// materialize function's result and operands
return updatedCall.clone(
timestamp(
updatedCall.getType().isNullable(),
isTimestampLtzType(updatedCall.getType())),
materializedOperands);
}
} else {
// materialize function's operands only
return updatedCall.clone(updatedCall.getType(), materializedOperands);
}
}
@Override
public RexNode visitInputRef(RexInputRef inputRef) {
RelDataType oldType = inputRef.getType();
if (isTimeIndicatorType(oldType)) {
RelDataType resolvedRefType = inputFieldTypes.get(inputRef.getIndex());
if (!isTimeIndicatorType(resolvedRefType)) {
// input has been materialized
return new RexInputRef(inputRef.getIndex(), resolvedRefType);
}
}
return super.visitInputRef(inputRef);
}
@Override
public RexNode visitPatternFieldRef(RexPatternFieldRef fieldRef) {
RelDataType oldType = fieldRef.getType();
if (isTimeIndicatorType(oldType)) {
RelDataType resolvedRefType = inputFieldTypes.get(fieldRef.getIndex());
if (!isTimeIndicatorType(resolvedRefType)) {
// input has been materialized
return new RexPatternFieldRef(
fieldRef.getAlpha(), fieldRef.getIndex(), resolvedRefType);
}
}
return super.visitPatternFieldRef(fieldRef);
}
}
}