blob: 12054c624bbfb012419c37ab27123031c10a5d24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.calcite
import org.apache.flink.table.plan.optimize._
import org.apache.flink.util.Preconditions
import org.apache.calcite.config.{CalciteConnectionConfig, CalciteConnectionConfigImpl, CalciteConnectionProperty}
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
import org.apache.calcite.sql2rel.SqlToRelConverter
import java.util.Properties
/**
* Builder for creating a Calcite configuration.
*/
class CalciteConfigBuilder {
/**
* Defines the optimize programs for batch table plan.
*/
private var batchPrograms: Option[FlinkChainedPrograms[BatchOptimizeContext]] = None
/**
* Defines the optimize programs for stream table plan.
*/
private var streamPrograms: Option[FlinkChainedPrograms[StreamOptimizeContext]] = None
/**
* Defines the SQL operator tables.
*/
private var replaceOperatorTable: Boolean = false
private var operatorTables: List[SqlOperatorTable] = Nil
/**
* Replaces the default batch table optimize programs with the given programs.
*/
def replaceBatchPrograms(
programs: FlinkChainedPrograms[BatchOptimizeContext]): CalciteConfigBuilder = {
Preconditions.checkNotNull(programs)
batchPrograms = Some(programs)
this
}
/**
* Replaces the default stream table optimize programs with the given programs.
*/
def replaceStreamPrograms(
programs: FlinkChainedPrograms[StreamOptimizeContext]): CalciteConfigBuilder = {
Preconditions.checkNotNull(programs)
streamPrograms = Some(programs)
this
}
/**
* Defines a SQL parser configuration.
*/
private var replaceSqlParserConfig: Option[SqlParser.Config] = None
/**
* Defines a configuration for SqlToRelConverter.
*/
private var replaceSqlToRelConverterConfig: Option[SqlToRelConverter.Config] = None
/**
* Replaces the built-in SQL operator table with the given table.
*/
def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
Preconditions.checkNotNull(replaceSqlOperatorTable)
operatorTables = List(replaceSqlOperatorTable)
replaceOperatorTable = true
this
}
/**
* Appends the given table to the built-in SQL operator table.
*/
def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
Preconditions.checkNotNull(addedSqlOperatorTable)
this.operatorTables = addedSqlOperatorTable :: this.operatorTables
this
}
/**
* Replaces the built-in SQL parser configuration with the given configuration.
*/
def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = {
Preconditions.checkNotNull(sqlParserConfig)
replaceSqlParserConfig = Some(sqlParserConfig)
this
}
def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): CalciteConfigBuilder = {
Preconditions.checkNotNull(config)
replaceSqlToRelConverterConfig = Some(config)
this
}
private class CalciteConfigImpl(
val getBatchPrograms: Option[FlinkChainedPrograms[BatchOptimizeContext]],
val getStreamPrograms: Option[FlinkChainedPrograms[StreamOptimizeContext]],
val getSqlOperatorTable: Option[SqlOperatorTable],
val replacesSqlOperatorTable: Boolean,
val getSqlParserConfig: Option[SqlParser.Config],
val getSqlToRelConverterConfig: Option[SqlToRelConverter.Config])
extends CalciteConfig {
}
/**
* Builds a new [[CalciteConfig]].
*/
def build(): CalciteConfig = new CalciteConfigImpl(
batchPrograms,
streamPrograms,
operatorTables match {
case Nil => None
case h :: Nil => Some(h)
case _ =>
// chain operator tables
Some(operatorTables.reduce((x, y) => ChainedSqlOperatorTable.of(x, y)))
},
this.replaceOperatorTable,
replaceSqlParserConfig,
replaceSqlToRelConverterConfig)
}
/**
* Calcite configuration for defining a custom Calcite configuration for Table and SQL API.
*/
trait CalciteConfig {
/**
* Returns a custom batch table optimize programs.
*/
def getBatchPrograms: Option[FlinkChainedPrograms[BatchOptimizeContext]]
/**
* Returns a custom stream table optimize programs.
*/
def getStreamPrograms: Option[FlinkChainedPrograms[StreamOptimizeContext]]
/**
* Returns whether this configuration replaces the built-in SQL operator table.
*/
def replacesSqlOperatorTable: Boolean
/**
* Returns a custom SQL operator table.
*/
def getSqlOperatorTable: Option[SqlOperatorTable]
/**
* Returns a custom SQL parser configuration.
*/
def getSqlParserConfig: Option[SqlParser.Config]
/**
* Returns a custom configuration for SqlToRelConverter.
*/
def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]
}
object CalciteConfig {
val DEFAULT: CalciteConfig = createBuilder().build()
/**
* Creates a new builder for constructing a [[CalciteConfig]].
*/
def createBuilder(): CalciteConfigBuilder = {
new CalciteConfigBuilder
}
/**
* Creates a new builder for constructing a [[CalciteConfig]] based on a given [[CalciteConfig]].
*/
def createBuilder(calciteConfig: CalciteConfig): CalciteConfigBuilder = {
val builder = new CalciteConfigBuilder
if (calciteConfig.getBatchPrograms.isDefined) {
builder.replaceBatchPrograms(calciteConfig.getBatchPrograms.get)
}
if (calciteConfig.getStreamPrograms.isDefined) {
builder.replaceStreamPrograms(calciteConfig.getStreamPrograms.get)
}
if (calciteConfig.getSqlOperatorTable.isDefined) {
if (calciteConfig.replacesSqlOperatorTable) {
builder.replaceSqlOperatorTable(calciteConfig.getSqlOperatorTable.get)
} else {
builder.addSqlOperatorTable(calciteConfig.getSqlOperatorTable.get)
}
}
if (calciteConfig.getSqlParserConfig.isDefined) {
builder.replaceSqlParserConfig(calciteConfig.getSqlParserConfig.get)
}
if (calciteConfig.getSqlToRelConverterConfig.isDefined) {
builder.replaceSqlToRelConverterConfig(calciteConfig.getSqlToRelConverterConfig.get)
}
builder
}
def connectionConfig(parserConfig: SqlParser.Config): CalciteConnectionConfig = {
val prop = new Properties()
prop.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName,
String.valueOf(parserConfig.caseSensitive))
new CalciteConnectionConfigImpl(prop)
}
}