[KYUUBI #6249] Drop support for Flink 1.16.

# :mag: Description
## Issue References ๐Ÿ”—

This pull request fixes #6249.

## Describe Your Solution ๐Ÿ”ง

According to the plan, we will no longer support Flink 1.16, and this PR will try to remove Flink 1.16.

## Types of changes :bookmark:

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan ๐Ÿงช

Pass GHA

---

# Checklist ๐Ÿ“

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6259 from slfan1989/drop_flink1.16_support.

Closes #6249

808acc2fb [Cheng Pan] Update docs/deployment/migration-guide.md
fc5ecf6fd [Shilun Fan] [KYUUBI #6249] Fix CheckStyle.
8d8f9ded4 [Shilun Fan] [KYUUBI #6249] Fix CheckStyle.
7a1d974b3 [Shilun Fan] [KYUUBI #6249] Drop support for Flink 1.16.

Lead-authored-by: Shilun Fan <slfan1989@apache.org>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 907378a..1dfcc87 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -193,10 +193,6 @@
         include:
           - java: 8
             flink: '1.17'
-            flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.16.3 -Dflink.archive.name=flink-1.16.3-bin-scala_2.12.tgz'
-            comment: 'verify-on-flink-1.16-binary'
-          - java: 8
-            flink: '1.17'
             flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.18.1 -Dflink.archive.name=flink-1.18.1-bin-scala_2.12.tgz'
             comment: 'verify-on-flink-1.18-binary'
           - java: 8
diff --git a/docs/deployment/migration-guide.md b/docs/deployment/migration-guide.md
index a1586d4..4767f15 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -20,6 +20,7 @@
 ## Upgrading from Kyuubi 1.9 to 1.10
 
 * Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.
+* Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed.
 
 ## Upgrading from Kyuubi 1.8 to 1.9
 
diff --git a/docs/quick_start/quick_start.rst b/docs/quick_start/quick_start.rst
index 2329038..a77f7ee 100644
--- a/docs/quick_start/quick_start.rst
+++ b/docs/quick_start/quick_start.rst
@@ -44,7 +44,7 @@
                    Engine lib                        - Kyuubi Engine
                    Beeline                           - Kyuubi Beeline
   **Spark**        Engine       3.1 to 3.5           A Spark distribution
-  **Flink**        Engine       1.16 to 1.19         A Flink distribution
+  **Flink**        Engine       1.17 to 1.19         A Flink distribution
   **Trino**        Engine       N/A                  A Trino cluster allows to access via trino-client v411
   **Doris**        Engine       N/A                  A Doris cluster
   **Hive**         Engine       - 2.1-cdh6/2.3/3.1   - A Hive distribution
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index d9c8427..2865dba 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -22,12 +22,10 @@
 import java.net.URL
 import java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
 
-import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions._
 
 import org.apache.commons.cli.{CommandLine, DefaultParser, Options}
 import org.apache.flink.api.common.JobID
-import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI, GenericCLI}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.util.EnvironmentInformation
@@ -41,7 +39,6 @@
 
 import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.util.SemanticVersion
-import org.apache.kyuubi.util.reflect._
 import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 object FlinkEngineUtils extends Logging {
@@ -49,7 +46,7 @@
   val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options)
 
   private def SUPPORTED_FLINK_VERSIONS =
-    Set("1.16", "1.17", "1.18", "1.19").map(SemanticVersion.apply)
+    Set("1.17", "1.18", "1.19").map(SemanticVersion.apply)
 
   val FLINK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(EnvironmentInformation.getVersion)
 
@@ -57,9 +54,6 @@
     val flinkVersion = EnvironmentInformation.getVersion
     if (SUPPORTED_FLINK_VERSIONS.contains(FLINK_RUNTIME_VERSION)) {
       info(s"The current Flink version is $flinkVersion")
-      if (FlinkEngineUtils.FLINK_RUNTIME_VERSION === "1.16") {
-        warn("The support for Flink 1.16 is deprecated, and will be removed in the next version.")
-      }
     } else {
       throw new UnsupportedOperationException(
         s"You are using unsupported Flink version $flinkVersion, " +
@@ -127,17 +121,6 @@
         (classOf[JList[URL]], dependencies),
         (classOf[Boolean], JBoolean.TRUE),
         (classOf[Boolean], JBoolean.FALSE))
-    } else if (FLINK_RUNTIME_VERSION === "1.16") {
-      val commandLines: JList[CustomCommandLine] =
-        Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
-      DynConstructors.builder()
-        .impl(
-          classOf[DefaultContext],
-          classOf[Configuration],
-          classOf[JList[CustomCommandLine]])
-        .build()
-        .newInstance(flinkConf, commandLines)
-        .asInstanceOf[DefaultContext]
     } else {
       throw new KyuubiException(
         s"Flink version ${EnvironmentInformation.getVersion} are not supported currently.")
@@ -147,9 +130,6 @@
   def getSessionContext(session: Session): SessionContext = getField(session, "sessionContext")
 
   def getResultJobId(resultFetch: ResultFetcher): Option[JobID] = {
-    if (FLINK_RUNTIME_VERSION <= "1.16") {
-      return None
-    }
     try {
       Option(getField[JobID](resultFetch, "jobID"))
     } catch {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
index 60c92d9..b43f6fa 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
@@ -29,12 +29,12 @@
 import org.apache.flink.table.catalog.ResolvedSchema
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.conversion.DataStructureConverters
+import org.apache.flink.table.gateway.api.results.ResultSet.ResultType
 import org.apache.flink.table.gateway.service.result.ResultFetcher
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
 import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils
 import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
 import org.apache.kyuubi.operation.FetchIterator
 import org.apache.kyuubi.util.reflect.DynFields
@@ -60,12 +60,10 @@
 
   val FETCH_INTERVAL_MS: Long = 1000
 
-  // for Flink 1.16 and below, isQueryResult is not supported
   val isQueryResult: Boolean =
-    FlinkEngineUtils.FLINK_RUNTIME_VERSION < "1.17" ||
-      DynFields.builder
-        .hiddenImpl(classOf[ResultFetcher], "isQueryResult")
-        .build[Boolean](resultFetcher).get()
+    DynFields.builder
+      .hiddenImpl(classOf[ResultFetcher], "isQueryResult")
+      .build[Boolean](resultFetcher).get()
 
   val effectiveMaxRows: Int = if (isQueryResult) maxRows else Int.MaxValue
 
@@ -91,16 +89,15 @@
       while (!fetched && !Thread.interrupted()) {
         val rs = resultFetcher.fetchResults(token, effectiveMaxRows - bufferedRows.length)
         val flinkRs = new FlinkResultSet(rs)
-        // TODO: replace string-based match when Flink 1.16 support is dropped
-        flinkRs.getResultType.name() match {
-          case "EOS" =>
+        flinkRs.getResultType match {
+          case ResultType.EOS =>
             debug("EOS received, no more data to fetch.")
             fetched = true
             hasNext = false
-          case "NOT_READY" =>
+          case ResultType.NOT_READY =>
             // if flink jobs are not ready, continue to retry
             debug("Result not ready, retrying...")
-          case "PAYLOAD" =>
+          case ResultType.PAYLOAD =>
             val fetchedData = flinkRs.getData
             // if no data fetched, continue to retry
             if (!fetchedData.isEmpty) {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 73908be..2a72029 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,19 +17,18 @@
 
 package org.apache.kyuubi.engine.flink.session
 
-import scala.collection.JavaConverters._
 import scala.collection.JavaConverters.mapAsJavaMap
 
 import org.apache.flink.table.gateway.api.session.SessionEnvironment
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
 import org.apache.flink.table.gateway.service.context.DefaultContext
+import org.apache.flink.table.gateway.service.session.SessionManagerImpl
 
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.flink.FlinkSQLEngine
 import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
-import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
 import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
 
@@ -41,7 +40,7 @@
   private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
 
   val operationManager = new FlinkSQLOperationManager()
-  val sessionManager = new FlinkSessionManager(engineContext)
+  val sessionManager = new SessionManagerImpl(engineContext)
 
   override def start(): Unit = {
     super.start()
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
deleted file mode 100644
index 89414ac..0000000
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.shim
-
-import org.apache.flink.table.gateway.api.session.{SessionEnvironment, SessionHandle}
-import org.apache.flink.table.gateway.service.context.DefaultContext
-import org.apache.flink.table.gateway.service.session.Session
-
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
-import org.apache.kyuubi.util.reflect._
-import org.apache.kyuubi.util.reflect.ReflectUtils._
-
-class FlinkSessionManager(engineContext: DefaultContext) {
-
-  val sessionManager: AnyRef = {
-    if (FLINK_RUNTIME_VERSION === "1.16") {
-      DynConstructors.builder().impl(
-        "org.apache.flink.table.gateway.service.session.SessionManager",
-        classOf[DefaultContext])
-        .build()
-        .newInstance(engineContext)
-    } else {
-      DynConstructors.builder().impl(
-        "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
-        classOf[DefaultContext])
-        .build()
-        .newInstance(engineContext)
-    }
-  }
-
-  def start(): Unit = invokeAs(sessionManager, "start")
-
-  def stop(): Unit = invokeAs(sessionManager, "stop")
-
-  def getSession(sessionHandle: SessionHandle): Session =
-    invokeAs(sessionManager, "getSession", (classOf[SessionHandle], sessionHandle))
-
-  def openSession(environment: SessionEnvironment): Session =
-    invokeAs(sessionManager, "openSession", (classOf[SessionEnvironment], environment))
-
-  def closeSession(sessionHandle: SessionHandle): Unit =
-    invokeAs(sessionManager, "closeSession", (classOf[SessionHandle], sessionHandle))
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
index 9ccbe79..6b50719 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
@@ -21,14 +21,11 @@
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
 import org.apache.flink.table.gateway.service.context.SessionContext
 
 import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
 import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
-import org.apache.kyuubi.util.reflect.DynMethods
 
 object KDFRegistry {
 
@@ -36,24 +33,7 @@
 
     val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction]
 
-    val flinkConfigMap: util.Map[String, String] = {
-      if (FLINK_RUNTIME_VERSION === "1.16") {
-        DynMethods
-          .builder("getConfigMap")
-          .impl(classOf[SessionContext])
-          .build()
-          .invoke(sessionContext)
-          .asInstanceOf[util.Map[String, String]]
-      } else {
-        DynMethods
-          .builder("getSessionConf")
-          .impl(classOf[SessionContext])
-          .build()
-          .invoke(sessionContext)
-          .asInstanceOf[Configuration]
-          .toMap
-      }
-    }
+    val flinkConfigMap: util.Map[String, String] = sessionContext.getSessionConf.toMap
 
     val kyuubi_version: KyuubiDefinedFunction = create(
       "kyuubi_version",
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 0ca8284..4bba523 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -1262,12 +1262,9 @@
       assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
       stmt.executeQuery("insert into tbl_a values (1)")
       val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
-      // Flink 1.16 doesn't support query id via ResultFetcher
-      if (FLINK_RUNTIME_VERSION >= "1.17") {
-        assert(queryId !== null)
-        // parse the string to check if it's valid Flink job id
-        assert(JobID.fromHexString(queryId) !== null)
-      }
+      assert(queryId !== null)
+      // parse the string to check if it's valid Flink job id
+      assert(JobID.fromHexString(queryId) !== null)
     }
   }
 
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 88d3a2d..39fee11 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -50,8 +50,8 @@
   private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
   private val tempOpt =
     Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "opt")).toFile
-  Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-client-1.16.3.jar"))
-  Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-gateway-1.16.3.jar"))
+  Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-client-1.17.2.jar"))
+  Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-gateway-1.17.2.jar"))
   private val tempUsrLib =
     Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "usrlib")).toFile
   private val tempUdfJar =
diff --git a/pom.xml b/pom.xml
index dffa0e1..ba0a0a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2284,13 +2284,6 @@
         </profile>
 
         <profile>
-            <id>flink-1.16</id>
-            <properties>
-                <flink.version>1.16.3</flink.version>
-            </properties>
-        </profile>
-
-        <profile>
             <id>flink-1.17</id>
             <properties>
                 <flink.version>1.17.2</flink.version>