blob: 11bf8d6c6ca68cf0466f4f6267eb1766eeffd151 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
import org.apache.flink.table.api.OperatorType.OperatorType
import org.apache.flink.table.api.functions.AggregateFunction
import org.apache.flink.table.calcite.CalciteConfig
import org.apache.flink.util.Preconditions
import _root_.java.util.TimeZone
import org.apache.flink.service.ServiceDescriptor
import org.apache.flink.table.plan.nodes.process.ChainedDAGProcessors
import org.apache.flink.table.temptable.FlinkTableServiceFactoryDescriptor
import org.apache.flink.table.temptable.util.TableServiceUtil
/**
* A config to define the runtime behavior of the Table API.
*/
class TableConfig {
/**
* Defines the timezone for date/time/timestamp conversions.
*/
private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
/**
* Defines if all fields need to be checked for NULL first.
*/
private var nullCheck: Boolean = true
/**
* Defines the configuration of Calcite for Table API and SQL queries.
*/
private var calciteConfig = CalciteConfig.createBuilder().build()
/**
* Defines whether supports subsection optimization
*/
private var subsectionOptimization: Boolean = false
/**
* Defines user-defined configuration
*/
private var conf = GlobalConfiguration.loadConfiguration()
/**
* Defines user-defined chained batch DAG processors
*/
private var batchDAGProcessors: ChainedDAGProcessors = ChainedDAGProcessors.buildBatchProcessors()
/**
* Defines user-defined chained stream DAG processors
*/
private var streamDAGProcessors: ChainedDAGProcessors =
ChainedDAGProcessors.buildStreamProcessors()
/**
* Get chained batch DAG processors
*/
def getBatchDAGProcessors: ChainedDAGProcessors = batchDAGProcessors
/**
* Get chained stream DAG processors
*/
def getStreamDAGProcessors: ChainedDAGProcessors = streamDAGProcessors
/**
* Set chained batch DAG processors
*/
def setBatchDAGProcessors(dagProcessors: ChainedDAGProcessors): Unit = {
this.batchDAGProcessors = dagProcessors
}
/**
* Set chained stream DAG processors
*/
def setStreamDAGProcessors(dagProcessors: ChainedDAGProcessors): Unit = {
this.streamDAGProcessors = dagProcessors
}
/**
* Sets the timezone for date/time/timestamp conversions.
*/
def setTimeZone(timeZone: TimeZone): Unit = {
require(timeZone != null, "timeZone must not be null.")
this.timeZone = timeZone
// Hack, for Timestamp Literal, constants folding
org.apache.calcite.avatica.util.DateTimeUtils.setUserZone(timeZone)
}
/**
* Returns the timezone for date/time/timestamp conversions.
*/
def getTimeZone: TimeZone = timeZone
/**
* Returns the NULL check. If enabled, all fields need to be checked for NULL first.
*/
def getNullCheck: Boolean = nullCheck
/**
* Sets the NULL check. If enabled, all fields need to be checked for NULL first.
*/
def setNullCheck(nullCheck: Boolean): Unit = {
this.nullCheck = nullCheck
}
/**
* Returns the current configuration of Calcite for Table API and SQL queries.
*/
def getCalciteConfig: CalciteConfig = calciteConfig
/**
* Sets the configuration of Calcite for Table API and SQL queries.
* Changing the configuration has no effect after the first query has been defined.
*/
def setCalciteConfig(calciteConfig: CalciteConfig): Unit = {
this.calciteConfig = Preconditions.checkNotNull(calciteConfig)
}
/**
* Returns user-defined configuration
*/
def getConf: Configuration = conf
/**
* Sets user-defined configuration
*/
def setConf(conf: Configuration): Unit = {
this.conf = GlobalConfiguration.loadConfiguration()
this.conf.addAll(conf)
}
/**
* Returns whether supports subsection optimization.
*/
def getSubsectionOptimization: Boolean = subsectionOptimization
/**
* Sets the subsection optimization. If enabled, we will reuse DataSet/DataSteam plan as much
* as possible. The whole plan will be decomposed into multiple blocks, and each block will be
* optimized alone.
*/
def setSubsectionOptimization(subsectionOptimization: Boolean): Unit = {
this.subsectionOptimization = subsectionOptimization
}
def enabledGivenOpType(operator: OperatorType): Boolean = {
val disableOperators = conf.getString(TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS)
.split(",")
.map(_.trim)
if (disableOperators.contains("HashJoin") &&
(operator == OperatorType.BroadcastHashJoin ||
operator == OperatorType.ShuffleHashJoin)) {
false
} else {
!disableOperators.contains(operator.toString)
}
}
private val DEFAULT_FIRE_INTERVAL = Long.MinValue
/**
* The early firing interval in milli second, early fire is the emit strategy
* before watermark advanced to end of window.
*
* < 0 means no early fire
* 0 means no delay (fire on every element).
* > 0 means the fire interval
*/
private var earlyFireInterval = DEFAULT_FIRE_INTERVAL
/**
* The late firing interval in milli second, late fire is the emit strategy
* after watermark advanced to end of window.
*
* < 0 means no late fire, drop every late elements
* 0 means no delay (fire on every element).
* > 0 means the fire interval
*
* NOTE: late firing strategy is only enabled when allowLateness > 0
*/
private var lateFireInterval = DEFAULT_FIRE_INTERVAL
/**
* Stores managed table name to aggregate state writer mapping.
* The key is managed table name, value is aggregate function identifier and function itself.
*/
private var queryableState2AggFunctionMap: Map[String, JTuple2[String, AggregateFunction[_, _]]] =
Map.empty
/**
* keep track of user defined conf for each state table.
*/
var queryableState2ParamMap: Map[String, Map[String, String]] = Map.empty
/**
* Specifies the time interval for how long idle state, i.e., state which was not updated, will
* be retained. When state was not updated for the specified interval of time, it will be cleared
* and removed.
*
* When new data arrives for previously cleaned-up state, the new data will be handled as if it
* was the first data. This can result in previous results being overwritten.
*
* Note: [[withIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
* maximum time for state to be retained. This method is more efficient, because the system has
* to do less bookkeeping to identify the time at which state must be cleared.
*
* @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
* clean-up the state.
*/
def withIdleStateRetentionTime(time: Time): TableConfig = {
withIdleStateRetentionTime(time, time)
this
}
/**
* Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
* was not updated, will be retained.
* State will never be cleared until it was idle for less than the minimum time and will never
* be kept if it was idle for more than the maximum time.
*
* When new data arrives for previously cleaned-up state, the new data will be handled as if it
* was the first data. This can result in previous results being overwritten.
*
* Set to 0 (zero) to never clean-up the state.
*
* @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
* never clean-up the state.
* @param maxTime The maximum time interval for which idle state is retained. May not be smaller
* than than minTime. Set to 0 (zero) to never clean-up the state.
*/
def withIdleStateRetentionTime(minTime: Time, maxTime: Time): TableConfig = {
if (maxTime.toMilliseconds < minTime.toMilliseconds) {
throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
}
this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MS, minTime.toMilliseconds)
this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS, maxTime.toMilliseconds)
this
}
/**
* Specifies the early firing interval in milli second, early fire is the emit strategy
* before watermark advanced to end of window.
*/
def withEarlyFireInterval(interval: Time): TableConfig = {
if (this.earlyFireInterval != DEFAULT_FIRE_INTERVAL
&& this.earlyFireInterval != interval.toMilliseconds) {
// earlyFireInterval of the two query config is not equal and not the default
throw new RuntimeException(
"Currently not support different earlyFireInterval configs in one job")
}
earlyFireInterval = interval.toMilliseconds
this
}
/**
* Specifies the late firing interval in milli second, early fire is the emit strategy
* after watermark advanced to end of window.
*/
def withLateFireInterval(interval: Time): TableConfig = {
if (this.lateFireInterval != DEFAULT_FIRE_INTERVAL
&& this.lateFireInterval != interval.toMilliseconds) {
// lateFireInterval of the two query config is not equal and not the default
throw new RuntimeException(
"Currently not support different lateFireInterval configs in one job")
}
lateFireInterval = interval.toMilliseconds
this
}
def getMinIdleStateRetentionTime: Long = {
this.conf.getLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MS)
}
def getMaxIdleStateRetentionTime: Long = {
// only min idle ttl provided.
if (this.conf.contains(TableConfigOptions.SQL_EXEC_STATE_TTL_MS)
&& !this.conf.contains(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS)) {
this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS,
getMinIdleStateRetentionTime * 2)
}
this.conf.getLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS)
}
def getEarlyFireInterval: Long = earlyFireInterval
def getLateFireInterval: Long = lateFireInterval
def addQueryableState2AggFunctionMapping(queryableName: String, udagg: JTuple2[String,
AggregateFunction[_, _]]): TableConfig = {
queryableState2AggFunctionMap += (queryableName -> udagg)
this
}
def getAggFunctionByQueryableStateName(queryableName: String): JTuple2[String,
AggregateFunction[_, _]] = queryableState2AggFunctionMap.get(queryableName).orNull
/**
* Defines the ServiceDescriptor for TableService.
*/
private var tableServiceDescriptor: ServiceDescriptor = _
def getTableServiceDescriptor(): ServiceDescriptor = {
if (tableServiceDescriptor == null) {
TableServiceUtil.createTableServiceDescriptor(this.conf)
} else {
tableServiceDescriptor
}
}
def setTableServiceDescriptor(descriptor: ServiceDescriptor): Unit = {
tableServiceDescriptor = descriptor
}
/**
* Defines the FlinkTableServiceFactoryDescriptor for TableService.
*/
private var tableServiceFactoryDescriptor: FlinkTableServiceFactoryDescriptor = {
val desc = TableServiceUtil.getDefaultTableServiceFactoryDescriptor
desc.getTableProperties.addAll(getConf)
desc
}
def getTableServiceFactoryDescriptor(): FlinkTableServiceFactoryDescriptor =
tableServiceFactoryDescriptor
def setTableServiceFactoryDescriptor(descriptor: FlinkTableServiceFactoryDescriptor): Unit = {
tableServiceFactoryDescriptor = descriptor
}
}
object OperatorType extends Enumeration {
type OperatorType = Value
val NestedLoopJoin, ShuffleHashJoin, BroadcastHashJoin, SortMergeJoin, HashAgg, SortAgg = Value
}
object AggPhaseEnforcer extends Enumeration {
type AggPhaseEnforcer = Value
val NONE, ONE_PHASE, TWO_PHASE = Value
}
object TableConfig {
def DEFAULT = new TableConfig()
}