blob: cd45287c8526df5c6ac98c16521032890f88fa35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api
import _root_.java.io.Serializable
import org.apache.flink.api.common.time.Time
class QueryConfig private[table] extends Serializable {
/**
* The [[QueryConfig]] holds parameters that exits in [[TableConfig]] also. If the user gives
* a [[QueryConfig]], put all parameters of it to override [[TableConfig]].
*/
private[flink] def overrideTableConfig(tableConfig: TableConfig): Unit = {}
}
/**
* The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries.
*/
class BatchQueryConfig private[table] extends QueryConfig
/**
* The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries.
*
* An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.queryConfig]]
* method.
*/
class StreamQueryConfig private[table] extends QueryConfig {
/**
* The minimum time until state which was not updated will be retained.
* State might be cleared and removed if it was not updated for the defined period of time.
*/
private var minIdleStateRetentionTime: Long = 0L
/**
* The maximum time until state which was not updated will be retained.
* State will be cleared and removed if it was not updated for the defined period of time.
*/
private var maxIdleStateRetentionTime: Long = 0L
/**
* Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
* was not updated, will be retained.
* State will never be cleared until it was idle for less than the minimum time and will never
* be kept if it was idle for more than the maximum time.
*
* When new data arrives for previously cleaned-up state, the new data will be handled as if it
* was the first data. This can result in previous results being overwritten.
*
* Set to 0 (zero) to never clean-up the state.
*
* NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
* larger differences of minTime and maxTime. The difference between minTime and maxTime must be
* at least 5 minutes.
*
* @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
* never clean-up the state.
* @param maxTime The maximum time interval for which idle state is retained. Must be at least
* 5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
*/
def withIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
if (maxTime.toMilliseconds - minTime.toMilliseconds < 300000 &&
!(maxTime.toMilliseconds == 0 && minTime.toMilliseconds == 0)) {
throw new IllegalArgumentException(
s"Difference between minTime: ${minTime.toString} and maxTime: ${maxTime.toString} " +
s"shoud be at least 5 minutes.")
}
minIdleStateRetentionTime = minTime.toMilliseconds
maxIdleStateRetentionTime = maxTime.toMilliseconds
this
}
def getMinIdleStateRetentionTime: Long = {
minIdleStateRetentionTime
}
def getMaxIdleStateRetentionTime: Long = {
maxIdleStateRetentionTime
}
override private[flink] def overrideTableConfig(tableConfig: TableConfig): Unit = {
if (minIdleStateRetentionTime != 0 || maxIdleStateRetentionTime != 0) {
tableConfig.withIdleStateRetentionTime(Time.milliseconds(minIdleStateRetentionTime),
Time.milliseconds(maxIdleStateRetentionTime))
}
}
}