blob: 505f03c40af365a55e3f0e9aa4f835d7ec876bea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.util
import java.time.Duration
import org.apache.flink.table.api.window.TimeWindow
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.expressions.ExpressionUtils
import org.apache.flink.table.plan.logical.{LogicalWindow, SessionGroupWindow}
import org.apache.flink.table.runtime.window.triggers._
class EmitStrategy(
isEventTime: Boolean,
isSessionWindow: Boolean,
earlyFireInterval: Long,
lateFireInterval: Long,
allowLateness: Long) {
def getAllowLateness: Long = allowLateness
def checkValidation(): Unit = {
if (isSessionWindow && (earlyFireInterval >= 0 || lateFireInterval >= 0)) {
throw new TableException("Session window doesn't support EMIT strategy currently.")
}
if (isEventTime && lateFireInterval >= 0L && allowLateness <= 0L) {
throw new TableException("The 'AFTER WATERMARK' emit strategy requires " +
"'sql.exec.state.ttl.ms' config in job config.")
}
}
def produceUpdates: Boolean = {
if (isEventTime) {
allowLateness > 0 || earlyFireInterval >= 0 || lateFireInterval >= 0
} else {
earlyFireInterval >= 0
}
}
def getTrigger: Trigger[TimeWindow] = {
val earlyTrigger = createTriggerFromInterval(earlyFireInterval)
val lateTrigger = createTriggerFromInterval(lateFireInterval)
if (isEventTime) {
val trigger = EventTime.afterEndOfWindow[TimeWindow]()
(earlyTrigger, lateTrigger) match {
case (Some(early), Some(late)) => trigger.withEarlyFirings(early).withLateFirings(late)
case (Some(early), None) => trigger.withEarlyFirings(early)
case (None, Some(late)) => trigger.withLateFirings(late)
case (None, None) => trigger
}
} else {
val trigger = ProcessingTime.afterEndOfWindow[TimeWindow]()
// late trigger is ignored, as no late element in processing time
earlyTrigger match {
case Some(early) => trigger.withEarlyFirings(early)
case None => trigger
}
}
}
override def toString: String = {
val builder = new StringBuilder
val earlyString = intervalToString(earlyFireInterval)
val lateString = intervalToString(lateFireInterval)
if (earlyString != null) {
builder.append("early ").append(earlyString)
}
if (lateString != null) {
if (earlyString != null) {
builder.append(", ")
}
builder.append("late ").append(lateString)
}
builder.toString
}
private def createTriggerFromInterval(interval: Long): Option[Trigger[TimeWindow]] = {
if (interval > 0) {
Some(ProcessingTime.every(Duration.ofMillis(interval)))
} else if (interval == 0) {
Some(Element.every())
} else {
None
}
}
private def intervalToString(interval: Long): String = {
if (interval > 0) {
s"delay $interval millisecond"
} else if (interval == 0) {
"no delay"
} else {
null
}
}
}
object EmitStrategy {
def apply(tableConfig: TableConfig, window: LogicalWindow): EmitStrategy = {
val isEventTime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute)
val isSessionWindow = window.isInstanceOf[SessionGroupWindow]
val allowLateness = if (isSessionWindow) {
// ignore allow lateness in session window because retraction is not supported
0L
} else if (tableConfig.getMinIdleStateRetentionTime < 0) {
// min idle state retention time is not set, use 0L as default which means not allow lateness
0L
} else {
// use min idle state retention time as allow lateness
tableConfig.getMinIdleStateRetentionTime
}
new EmitStrategy(
isEventTime,
isSessionWindow,
tableConfig.getEarlyFireInterval,
tableConfig.getLateFireInterval,
allowLateness)
}
}