[FLINK-35194][table] Support describe job statement for SqlGateway

This closes #24728
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 9452650..c50ba8c 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -83,6 +83,7 @@
 import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseOperation;
 import org.apache.flink.table.operations.command.AddJarOperation;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
 import org.apache.flink.table.operations.command.ExecutePlanOperation;
 import org.apache.flink.table.operations.command.RemoveJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
@@ -481,6 +482,8 @@
             return callStopJobOperation(tableEnv, handle, (StopJobOperation) op);
         } else if (op instanceof ShowJobsOperation) {
             return callShowJobsOperation(tableEnv, handle, (ShowJobsOperation) op);
+        } else if (op instanceof DescribeJobOperation) {
+            return callDescribeJobOperation(tableEnv, handle, (DescribeJobOperation) op);
         } else if (op instanceof RemoveJarOperation) {
             return callRemoveJar(handle, ((RemoveJarOperation) op).getPath());
         } else if (op instanceof AddJarOperation
@@ -774,6 +777,56 @@
                 resultRows);
     }
 
+    public ResultFetcher callDescribeJobOperation(
+            TableEnvironmentInternal tableEnv,
+            OperationHandle operationHandle,
+            DescribeJobOperation describeJobOperation)
+            throws SqlExecutionException {
+        Configuration configuration = tableEnv.getConfig().getConfiguration();
+        Duration clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
+        String jobId = describeJobOperation.getJobId();
+        Optional<JobStatusMessage> jobStatusOp =
+                runClusterAction(
+                        configuration,
+                        operationHandle,
+                        clusterClient -> {
+                            try {
+                                JobID expectedJobId = JobID.fromHexString(jobId);
+                                return clusterClient.listJobs()
+                                        .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS)
+                                        .stream()
+                                        .filter(job -> expectedJobId.equals(job.getJobId()))
+                                        .findFirst();
+                            } catch (Exception e) {
+                                throw new SqlExecutionException(
+                                        String.format(
+                                                "Failed to get job %s in the cluster.", jobId),
+                                        e);
+                            }
+                        });
+
+        if (!jobStatusOp.isPresent()) {
+            throw new SqlExecutionException(
+                    String.format("Described job %s does not exist in the cluster.", jobId));
+        }
+        JobStatusMessage job = jobStatusOp.get();
+
+        RowData resultRow =
+                GenericRowData.of(
+                        StringData.fromString(jobId),
+                        StringData.fromString(job.getJobName()),
+                        StringData.fromString(job.getJobState().toString()),
+                        DateTimeUtils.toTimestampData(job.getStartTime(), 3));
+        return ResultFetcher.fromResults(
+                operationHandle,
+                ResolvedSchema.of(
+                        Column.physical(JOB_ID, DataTypes.STRING()),
+                        Column.physical(JOB_NAME, DataTypes.STRING()),
+                        Column.physical(STATUS, DataTypes.STRING()),
+                        Column.physical(START_TIME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+                Collections.singletonList(resultRow));
+    }
+
     /**
      * Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
      * against it.
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 55aa16c..012c4ae 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -511,6 +511,57 @@
                 .isBetween(timeOpStart, timeOpSucceed);
     }
 
+    @Test
+    void testDescribeJobOperation(@InjectClusterClient RestClusterClient<?> restClusterClient)
+            throws Exception {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+
+        String pipelineName = "test-describe-job";
+        configuration.set(PipelineOptions.NAME, pipelineName);
+
+        // running jobs
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        long timeOpStart = System.currentTimeMillis();
+        OperationHandle insertsOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, configuration);
+        String jobId =
+                fetchAllResults(sessionHandle, insertsOperationHandle)
+                        .get(0)
+                        .getString(0)
+                        .toString();
+
+        TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
+        long timeOpSucceed = System.currentTimeMillis();
+
+        OperationHandle describeJobOperationHandle =
+                service.executeStatement(
+                        sessionHandle,
+                        String.format("DESCRIBE JOB '%s'", jobId),
+                        -1,
+                        configuration);
+
+        List<RowData> result = fetchAllResults(sessionHandle, describeJobOperationHandle);
+        RowData jobRow =
+                result.stream()
+                        .filter(row -> jobId.equals(row.getString(0).toString()))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Test job " + jobId + " not found."));
+        assertThat(jobRow.getString(1)).hasToString(pipelineName);
+        assertThat(jobRow.getString(2)).hasToString("RUNNING");
+        assertThat(jobRow.getTimestamp(3, 3).getMillisecond())
+                .isBetween(timeOpStart, timeOpSucceed);
+    }
+
     // --------------------------------------------------------------------------------------------
     // Catalog API tests
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 100e9ed..0984496 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -128,6 +128,7 @@
     "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
     "org.apache.flink.sql.parser.ddl.SqlStopJob"
     "org.apache.flink.sql.parser.dql.SqlShowJobs"
+    "org.apache.flink.sql.parser.dql.SqlDescribeJob"
     "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
     "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
     "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -572,6 +573,7 @@
   # List of methods for parsing custom SQL statements.
   # Return type of method implementation should be 'SqlNode'.
   # Example: SqlShowDatabases(), SqlShowTables().
+  # Note: move SqlRichDescribeTable at last, otherwise all DESCRIBE syntax will fall into this method
   statementParserMethods: [
     "RichSqlInsert()"
     "SqlBeginStatementSet()"
@@ -591,7 +593,6 @@
     "SqlShowColumns()"
     "SqlShowCreate()"
     "SqlReplaceTable()"
-    "SqlRichDescribeTable()"
     "SqlAlterMaterializedTable()"
     "SqlAlterTable()"
     "SqlAlterView()"
@@ -615,6 +616,8 @@
     "SqlStopJob()"
     "SqlShowJobs()"
     "SqlTruncateTable()"
+    "SqlDescribeJob()"
+    "SqlRichDescribeTable()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 95509e7..a9e299e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2961,6 +2961,24 @@
 }
 
 /**
+* Parse a "DESCRIBE JOB" statement:
+* DESCRIBE | DESC JOB <JOB_ID>
+*/
+SqlDescribeJob SqlDescribeJob() :
+{
+    SqlCharStringLiteral jobId;
+    SqlParserPos pos;
+}
+{
+    ( <DESCRIBE> | <DESC> ) <JOB>  <QUOTED_STRING>
+    {
+        String id = SqlParserUtil.parseString(token.image);
+        jobId = SqlLiteral.createCharString(id, getPos());
+        return new SqlDescribeJob(getPos(), jobId);
+    }
+}
+
+/**
 * Parses a STOP JOB statement:
 * STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
 */
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java
new file mode 100644
index 0000000..af316fc
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+
+/** DESCRIBE | DESC &lt;JOB_ID&gt; sql call. */
+public class SqlDescribeJob extends SqlCall {
+
+    public static final SqlOperator OPERATOR =
+            new SqlSpecialOperator("DESCRIBE JOB", SqlKind.OTHER);
+
+    private final SqlCharStringLiteral jobId;
+
+    public SqlDescribeJob(SqlParserPos pos, SqlCharStringLiteral jobId) {
+        super(pos);
+        this.jobId = jobId;
+    }
+
+    public String getJobId() {
+        return jobId.getValueAs(NlsString.class).getValue();
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(jobId);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("DESCRIBE JOB");
+        jobId.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 49093ad..429ac1d 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2956,6 +2956,12 @@
     }
 
     @Test
+    void testDescribeJob() {
+        sql("DESCRIBE JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
+        sql("DESC JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
+    }
+
+    @Test
     void testTruncateTable() {
         sql("truncate table t1").ok("TRUNCATE TABLE `T1`");
     }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java
new file mode 100644
index 0000000..a521402
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.command;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.ExecutableOperation;
+import org.apache.flink.table.operations.Operation;
+
+/** Operation to describe a DESCRIBE JOB statement. */
+@Internal
+public class DescribeJobOperation implements Operation, ExecutableOperation {
+
+    private final String jobId;
+
+    public DescribeJobOperation(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format("DESCRIBE JOB '%s'", jobId);
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        // TODO: We may need to migrate the execution for ShowJobsOperation from SQL Gateway
+        //  OperationExecutor to here.
+        throw new UnsupportedOperationException(
+                "DescribeJobOperation does not support ExecutableOperation yet.");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java
new file mode 100644
index 0000000..9b62570
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlDescribeJob;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
+
+/** A converter for {@link SqlDescribeJob}. */
+public class SqlDescribeJobConverter implements SqlNodeConverter<SqlDescribeJob> {
+
+    @Override
+    public Operation convertSqlNode(SqlDescribeJob node, ConvertContext context) {
+        return new DescribeJobOperation(node.getJobId());
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index ab9da76..caaafc9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -55,6 +55,7 @@
         register(new SqlShowDatabasesConverter());
         register(new SqlShowCreateCatalogConverter());
         register(new SqlDescribeCatalogConverter());
+        register(new SqlDescribeJobConverter());
     }
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index cda97a4..7ad2492 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,7 +19,7 @@
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
 import org.apache.flink.sql.parser.ddl.{SqlCompilePlan, SqlReset, SqlSet, SqlUseModules}
-import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlCompileAndExecutePlan, SqlEndStatementSet, SqlExecute, SqlExecutePlan, SqlStatementSet, SqlTruncateTable}
+import org.apache.flink.sql.parser.dml._
 import org.apache.flink.sql.parser.dql._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.hint.FlinkHints
@@ -146,6 +146,7 @@
         || sqlNode.isInstanceOf[SqlShowPartitions]
         || sqlNode.isInstanceOf[SqlShowProcedures]
         || sqlNode.isInstanceOf[SqlShowJobs]
+        || sqlNode.isInstanceOf[SqlDescribeJob]
         || sqlNode.isInstanceOf[SqlRichDescribeTable]
         || sqlNode.isInstanceOf[SqlUnloadModule]
         || sqlNode.isInstanceOf[SqlUseModules]