[KYUUBI #6249] Drop support for Flink 1.16.
# :mag: Description
## Issue References ๐
This pull request fixes #6249.
## Describe Your Solution ๐ง
According to the plan, we will no longer support Flink 1.16, and this PR will try to remove Flink 1.16.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Breaking change (fix or feature that would cause existing functionality to change)
## Test Plan ๐งช
Pass GHA
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6259 from slfan1989/drop_flink1.16_support.
Closes #6249
808acc2fb [Cheng Pan] Update docs/deployment/migration-guide.md
fc5ecf6fd [Shilun Fan] [KYUUBI #6249] Fix CheckStyle.
8d8f9ded4 [Shilun Fan] [KYUUBI #6249] Fix CheckStyle.
7a1d974b3 [Shilun Fan] [KYUUBI #6249] Drop support for Flink 1.16.
Lead-authored-by: Shilun Fan <slfan1989@apache.org>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 907378a..1dfcc87 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -193,10 +193,6 @@
include:
- java: 8
flink: '1.17'
- flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.16.3 -Dflink.archive.name=flink-1.16.3-bin-scala_2.12.tgz'
- comment: 'verify-on-flink-1.16-binary'
- - java: 8
- flink: '1.17'
flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.18.1 -Dflink.archive.name=flink-1.18.1-bin-scala_2.12.tgz'
comment: 'verify-on-flink-1.18-binary'
- java: 8
diff --git a/docs/deployment/migration-guide.md b/docs/deployment/migration-guide.md
index a1586d4..4767f15 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -20,6 +20,7 @@
## Upgrading from Kyuubi 1.9 to 1.10
* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.
+* Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed.
## Upgrading from Kyuubi 1.8 to 1.9
diff --git a/docs/quick_start/quick_start.rst b/docs/quick_start/quick_start.rst
index 2329038..a77f7ee 100644
--- a/docs/quick_start/quick_start.rst
+++ b/docs/quick_start/quick_start.rst
@@ -44,7 +44,7 @@
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
**Spark** Engine 3.1 to 3.5 A Spark distribution
- **Flink** Engine 1.16 to 1.19 A Flink distribution
+ **Flink** Engine 1.17 to 1.19 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
**Doris** Engine N/A A Doris cluster
**Hive** Engine - 2.1-cdh6/2.3/3.1 - A Hive distribution
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index d9c8427..2865dba 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -22,12 +22,10 @@
import java.net.URL
import java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
-import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions._
import org.apache.commons.cli.{CommandLine, DefaultParser, Options}
import org.apache.flink.api.common.JobID
-import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI, GenericCLI}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.util.EnvironmentInformation
@@ -41,7 +39,6 @@
import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.util.SemanticVersion
-import org.apache.kyuubi.util.reflect._
import org.apache.kyuubi.util.reflect.ReflectUtils._
object FlinkEngineUtils extends Logging {
@@ -49,7 +46,7 @@
val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options)
private def SUPPORTED_FLINK_VERSIONS =
- Set("1.16", "1.17", "1.18", "1.19").map(SemanticVersion.apply)
+ Set("1.17", "1.18", "1.19").map(SemanticVersion.apply)
val FLINK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(EnvironmentInformation.getVersion)
@@ -57,9 +54,6 @@
val flinkVersion = EnvironmentInformation.getVersion
if (SUPPORTED_FLINK_VERSIONS.contains(FLINK_RUNTIME_VERSION)) {
info(s"The current Flink version is $flinkVersion")
- if (FlinkEngineUtils.FLINK_RUNTIME_VERSION === "1.16") {
- warn("The support for Flink 1.16 is deprecated, and will be removed in the next version.")
- }
} else {
throw new UnsupportedOperationException(
s"You are using unsupported Flink version $flinkVersion, " +
@@ -127,17 +121,6 @@
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE),
(classOf[Boolean], JBoolean.FALSE))
- } else if (FLINK_RUNTIME_VERSION === "1.16") {
- val commandLines: JList[CustomCommandLine] =
- Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
- DynConstructors.builder()
- .impl(
- classOf[DefaultContext],
- classOf[Configuration],
- classOf[JList[CustomCommandLine]])
- .build()
- .newInstance(flinkConf, commandLines)
- .asInstanceOf[DefaultContext]
} else {
throw new KyuubiException(
s"Flink version ${EnvironmentInformation.getVersion} are not supported currently.")
@@ -147,9 +130,6 @@
def getSessionContext(session: Session): SessionContext = getField(session, "sessionContext")
def getResultJobId(resultFetch: ResultFetcher): Option[JobID] = {
- if (FLINK_RUNTIME_VERSION <= "1.16") {
- return None
- }
try {
Option(getField[JobID](resultFetch, "jobID"))
} catch {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
index 60c92d9..b43f6fa 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
@@ -29,12 +29,12 @@
import org.apache.flink.table.catalog.ResolvedSchema
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.conversion.DataStructureConverters
+import org.apache.flink.table.gateway.api.results.ResultSet.ResultType
import org.apache.flink.table.gateway.service.result.ResultFetcher
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
import org.apache.kyuubi.operation.FetchIterator
import org.apache.kyuubi.util.reflect.DynFields
@@ -60,12 +60,10 @@
val FETCH_INTERVAL_MS: Long = 1000
- // for Flink 1.16 and below, isQueryResult is not supported
val isQueryResult: Boolean =
- FlinkEngineUtils.FLINK_RUNTIME_VERSION < "1.17" ||
- DynFields.builder
- .hiddenImpl(classOf[ResultFetcher], "isQueryResult")
- .build[Boolean](resultFetcher).get()
+ DynFields.builder
+ .hiddenImpl(classOf[ResultFetcher], "isQueryResult")
+ .build[Boolean](resultFetcher).get()
val effectiveMaxRows: Int = if (isQueryResult) maxRows else Int.MaxValue
@@ -91,16 +89,15 @@
while (!fetched && !Thread.interrupted()) {
val rs = resultFetcher.fetchResults(token, effectiveMaxRows - bufferedRows.length)
val flinkRs = new FlinkResultSet(rs)
- // TODO: replace string-based match when Flink 1.16 support is dropped
- flinkRs.getResultType.name() match {
- case "EOS" =>
+ flinkRs.getResultType match {
+ case ResultType.EOS =>
debug("EOS received, no more data to fetch.")
fetched = true
hasNext = false
- case "NOT_READY" =>
+ case ResultType.NOT_READY =>
// if flink jobs are not ready, continue to retry
debug("Result not ready, retrying...")
- case "PAYLOAD" =>
+ case ResultType.PAYLOAD =>
val fetchedData = flinkRs.getData
// if no data fetched, continue to retry
if (!fetchedData.isEmpty) {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 73908be..2a72029 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,19 +17,18 @@
package org.apache.kyuubi.engine.flink.session
-import scala.collection.JavaConverters._
import scala.collection.JavaConverters.mapAsJavaMap
import org.apache.flink.table.gateway.api.session.SessionEnvironment
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
import org.apache.flink.table.gateway.service.context.DefaultContext
+import org.apache.flink.table.gateway.service.session.SessionManagerImpl
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.FlinkSQLEngine
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
-import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
@@ -41,7 +40,7 @@
private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
val operationManager = new FlinkSQLOperationManager()
- val sessionManager = new FlinkSessionManager(engineContext)
+ val sessionManager = new SessionManagerImpl(engineContext)
override def start(): Unit = {
super.start()
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
deleted file mode 100644
index 89414ac..0000000
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.shim
-
-import org.apache.flink.table.gateway.api.session.{SessionEnvironment, SessionHandle}
-import org.apache.flink.table.gateway.service.context.DefaultContext
-import org.apache.flink.table.gateway.service.session.Session
-
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
-import org.apache.kyuubi.util.reflect._
-import org.apache.kyuubi.util.reflect.ReflectUtils._
-
-class FlinkSessionManager(engineContext: DefaultContext) {
-
- val sessionManager: AnyRef = {
- if (FLINK_RUNTIME_VERSION === "1.16") {
- DynConstructors.builder().impl(
- "org.apache.flink.table.gateway.service.session.SessionManager",
- classOf[DefaultContext])
- .build()
- .newInstance(engineContext)
- } else {
- DynConstructors.builder().impl(
- "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
- classOf[DefaultContext])
- .build()
- .newInstance(engineContext)
- }
- }
-
- def start(): Unit = invokeAs(sessionManager, "start")
-
- def stop(): Unit = invokeAs(sessionManager, "stop")
-
- def getSession(sessionHandle: SessionHandle): Session =
- invokeAs(sessionManager, "getSession", (classOf[SessionHandle], sessionHandle))
-
- def openSession(environment: SessionEnvironment): Session =
- invokeAs(sessionManager, "openSession", (classOf[SessionEnvironment], environment))
-
- def closeSession(sessionHandle: SessionHandle): Unit =
- invokeAs(sessionManager, "closeSession", (classOf[SessionHandle], sessionHandle))
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
index 9ccbe79..6b50719 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
@@ -21,14 +21,11 @@
import scala.collection.mutable.ArrayBuffer
-import org.apache.flink.configuration.Configuration
import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
import org.apache.flink.table.gateway.service.context.SessionContext
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
-import org.apache.kyuubi.util.reflect.DynMethods
object KDFRegistry {
@@ -36,24 +33,7 @@
val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction]
- val flinkConfigMap: util.Map[String, String] = {
- if (FLINK_RUNTIME_VERSION === "1.16") {
- DynMethods
- .builder("getConfigMap")
- .impl(classOf[SessionContext])
- .build()
- .invoke(sessionContext)
- .asInstanceOf[util.Map[String, String]]
- } else {
- DynMethods
- .builder("getSessionConf")
- .impl(classOf[SessionContext])
- .build()
- .invoke(sessionContext)
- .asInstanceOf[Configuration]
- .toMap
- }
- }
+ val flinkConfigMap: util.Map[String, String] = sessionContext.getSessionConf.toMap
val kyuubi_version: KyuubiDefinedFunction = create(
"kyuubi_version",
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 0ca8284..4bba523 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -1262,12 +1262,9 @@
assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
stmt.executeQuery("insert into tbl_a values (1)")
val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
- // Flink 1.16 doesn't support query id via ResultFetcher
- if (FLINK_RUNTIME_VERSION >= "1.17") {
- assert(queryId !== null)
- // parse the string to check if it's valid Flink job id
- assert(JobID.fromHexString(queryId) !== null)
- }
+ assert(queryId !== null)
+ // parse the string to check if it's valid Flink job id
+ assert(JobID.fromHexString(queryId) !== null)
}
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 88d3a2d..39fee11 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -50,8 +50,8 @@
private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
private val tempOpt =
Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "opt")).toFile
- Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-client-1.16.3.jar"))
- Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-gateway-1.16.3.jar"))
+ Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-client-1.17.2.jar"))
+ Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-gateway-1.17.2.jar"))
private val tempUsrLib =
Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "usrlib")).toFile
private val tempUdfJar =
diff --git a/pom.xml b/pom.xml
index dffa0e1..ba0a0a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2284,13 +2284,6 @@
</profile>
<profile>
- <id>flink-1.16</id>
- <properties>
- <flink.version>1.16.3</flink.version>
- </properties>
- </profile>
-
- <profile>
<id>flink-1.17</id>
<properties>
<flink.version>1.17.2</flink.version>