blob: bf52652b34d3a69b052721d01a0ce4868f999185 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl.rule;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.joda.time.Duration;
/** Rule to detect the window/trigger settings. */
public class BeamAggregationRule extends RelOptRule {
public static final BeamAggregationRule INSTANCE =
new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
public BeamAggregationRule(
Class<? extends Aggregate> aggregateClass,
Class<? extends Project> projectClass,
RelBuilderFactory relBuilderFactory) {
super(operand(aggregateClass, operand(projectClass, any())), relBuilderFactory, null);
}
@Override
public void onMatch(RelOptRuleCall call) {
final Aggregate aggregate = call.rel(0);
final Project project = call.rel(1);
RelNode x = updateWindow(call, aggregate, project);
call.transformTo(x);
}
private static RelNode updateWindow(RelOptRuleCall call, Aggregate aggregate, Project project) {
ImmutableBitSet groupByFields = aggregate.getGroupSet();
ArrayList<RexNode> projects = new ArrayList(project.getProjects());
WindowFn windowFn = null;
int windowFieldIndex = -1;
for (int groupFieldIndex : groupByFields.asList()) {
RexNode projNode = projects.get(groupFieldIndex);
if (!(projNode instanceof RexCall)) {
continue;
}
RexCall rexCall = (RexCall) projNode;
WindowFn fn = createWindowFn(rexCall.getOperands(), rexCall.op.kind);
if (fn != null) {
windowFn = fn;
windowFieldIndex = groupFieldIndex;
projects.set(groupFieldIndex, rexCall.getOperands().get(0));
}
}
final Project newProject =
project.copy(project.getTraitSet(), project.getInput(), projects, project.getRowType());
return new BeamAggregationRel(
aggregate.getCluster(),
aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
convert(newProject, newProject.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
aggregate.indicator,
aggregate.getGroupSet(),
aggregate.getGroupSets(),
aggregate.getAggCallList(),
windowFn,
windowFieldIndex);
}
/**
* Returns a {@link WindowFn} based on the SQL windowing function defined by {#code operatorKind}.
* Supported {@link SqlKind}s:
*
* <ul>
* <li>{@link SqlKind#TUMBLE}, mapped to {@link FixedWindows};
* <li>{@link SqlKind#HOP}, mapped to {@link SlidingWindows};
* <li>{@link SqlKind#SESSION}, mapped to {@link Sessions};
* </ul>
*
* <p>For example:
*
* <pre>{@code
* SELECT event_timestamp, COUNT(*)
* FROM PCOLLECTION
* GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR)
* }</pre>
*
* <p>SQL window functions support optional window_offset parameter which indicates a how window
* definition is offset from the event time. Offset is zero if not specified.
*
* <p>Beam model does not support offset for session windows, so this method will throw {@link
* UnsupportedOperationException} if offset is specified in SQL query for {@link SqlKind#SESSION}.
*/
private static @Nullable WindowFn createWindowFn(List<RexNode> parameters, SqlKind operatorKind) {
switch (operatorKind) {
case TUMBLE:
// Fixed-size, non-intersecting time-based windows, for example:
// every hour aggregate elements from the previous hour;
//
// SQL Syntax:
// TUMBLE(monotonic_field, window_size [, window_offset])
//
// Example:
// TUMBLE(event_timestamp_field, INTERVAL '1' HOUR)
FixedWindows fixedWindows = FixedWindows.of(durationParameter(parameters, 1));
if (parameters.size() == 3) {
fixedWindows = fixedWindows.withOffset(durationParameter(parameters, 2));
}
return fixedWindows;
case HOP:
// Sliding, fixed-size, intersecting time-based windows, for example:
// every minute aggregate elements from the previous hour;
//
// SQL Syntax:
// HOP(monotonic_field, emit_frequency, window_size [, window_offset])
SlidingWindows slidingWindows =
SlidingWindows.of(durationParameter(parameters, 2))
.every(durationParameter(parameters, 1));
if (parameters.size() == 4) {
slidingWindows = slidingWindows.withOffset(durationParameter(parameters, 3));
}
return slidingWindows;
case SESSION:
// Session windows, for example:
// aggregate events after a gap of 1 minute of no events;
//
// SQL Syntax:
// SESSION(monotonic_field, session_gap)
//
// Example:
// SESSION(event_timestamp_field, INTERVAL '1' MINUTE)
Sessions sessions = Sessions.withGapDuration(durationParameter(parameters, 1));
if (parameters.size() == 3) {
throw new UnsupportedOperationException(
"Specifying alignment (offset) is not supported for session windows");
}
return sessions;
default:
return null;
}
}
private static Duration durationParameter(List<RexNode> parameters, int parameterIndex) {
return Duration.millis(longValue(parameters.get(parameterIndex)));
}
private static long longValue(RexNode operand) {
if (operand instanceof RexLiteral) {
// @TODO: this can be simplified after CALCITE-2837
return ((Number) RexLiteral.value(operand)).longValue();
} else {
throw new IllegalArgumentException(String.format("[%s] is not valid.", operand));
}
}
}