[FLINK-35194][table] Support describe job statement for SqlGateway
This closes #24728
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 9452650..c50ba8c 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -83,6 +83,7 @@
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
@@ -481,6 +482,8 @@
return callStopJobOperation(tableEnv, handle, (StopJobOperation) op);
} else if (op instanceof ShowJobsOperation) {
return callShowJobsOperation(tableEnv, handle, (ShowJobsOperation) op);
+ } else if (op instanceof DescribeJobOperation) {
+ return callDescribeJobOperation(tableEnv, handle, (DescribeJobOperation) op);
} else if (op instanceof RemoveJarOperation) {
return callRemoveJar(handle, ((RemoveJarOperation) op).getPath());
} else if (op instanceof AddJarOperation
@@ -774,6 +777,56 @@
resultRows);
}
+ public ResultFetcher callDescribeJobOperation(
+ TableEnvironmentInternal tableEnv,
+ OperationHandle operationHandle,
+ DescribeJobOperation describeJobOperation)
+ throws SqlExecutionException {
+ Configuration configuration = tableEnv.getConfig().getConfiguration();
+ Duration clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
+ String jobId = describeJobOperation.getJobId();
+ Optional<JobStatusMessage> jobStatusOp =
+ runClusterAction(
+ configuration,
+ operationHandle,
+ clusterClient -> {
+ try {
+ JobID expectedJobId = JobID.fromHexString(jobId);
+ return clusterClient.listJobs()
+ .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS)
+ .stream()
+ .filter(job -> expectedJobId.equals(job.getJobId()))
+ .findFirst();
+ } catch (Exception e) {
+ throw new SqlExecutionException(
+ String.format(
+ "Failed to get job %s in the cluster.", jobId),
+ e);
+ }
+ });
+
+ if (!jobStatusOp.isPresent()) {
+ throw new SqlExecutionException(
+ String.format("Described job %s does not exist in the cluster.", jobId));
+ }
+ JobStatusMessage job = jobStatusOp.get();
+
+ RowData resultRow =
+ GenericRowData.of(
+ StringData.fromString(jobId),
+ StringData.fromString(job.getJobName()),
+ StringData.fromString(job.getJobState().toString()),
+ DateTimeUtils.toTimestampData(job.getStartTime(), 3));
+ return ResultFetcher.fromResults(
+ operationHandle,
+ ResolvedSchema.of(
+ Column.physical(JOB_ID, DataTypes.STRING()),
+ Column.physical(JOB_NAME, DataTypes.STRING()),
+ Column.physical(STATUS, DataTypes.STRING()),
+ Column.physical(START_TIME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ Collections.singletonList(resultRow));
+ }
+
/**
* Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
* against it.
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 55aa16c..012c4ae 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -511,6 +511,57 @@
.isBetween(timeOpStart, timeOpSucceed);
}
+ @Test
+ void testDescribeJobOperation(@InjectClusterClient RestClusterClient<?> restClusterClient)
+ throws Exception {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+ Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+
+ String pipelineName = "test-describe-job";
+ configuration.set(PipelineOptions.NAME, pipelineName);
+
+ // running jobs
+ String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+ String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+ String insertSql = "INSERT INTO sink SELECT * FROM source;";
+
+ service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+ service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+ long timeOpStart = System.currentTimeMillis();
+ OperationHandle insertsOperationHandle =
+ service.executeStatement(sessionHandle, insertSql, -1, configuration);
+ String jobId =
+ fetchAllResults(sessionHandle, insertsOperationHandle)
+ .get(0)
+ .getString(0)
+ .toString();
+
+ TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
+ long timeOpSucceed = System.currentTimeMillis();
+
+ OperationHandle describeJobOperationHandle =
+ service.executeStatement(
+ sessionHandle,
+ String.format("DESCRIBE JOB '%s'", jobId),
+ -1,
+ configuration);
+
+ List<RowData> result = fetchAllResults(sessionHandle, describeJobOperationHandle);
+ RowData jobRow =
+ result.stream()
+ .filter(row -> jobId.equals(row.getString(0).toString()))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Test job " + jobId + " not found."));
+ assertThat(jobRow.getString(1)).hasToString(pipelineName);
+ assertThat(jobRow.getString(2)).hasToString("RUNNING");
+ assertThat(jobRow.getTimestamp(3, 3).getMillisecond())
+ .isBetween(timeOpStart, timeOpSucceed);
+ }
+
// --------------------------------------------------------------------------------------------
// Catalog API tests
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 100e9ed..0984496 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -128,6 +128,7 @@
"org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
"org.apache.flink.sql.parser.ddl.SqlStopJob"
"org.apache.flink.sql.parser.dql.SqlShowJobs"
+ "org.apache.flink.sql.parser.dql.SqlDescribeJob"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -572,6 +573,7 @@
# List of methods for parsing custom SQL statements.
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
+ # Note: move SqlRichDescribeTable at last, otherwise all DESCRIBE syntax will fall into this method
statementParserMethods: [
"RichSqlInsert()"
"SqlBeginStatementSet()"
@@ -591,7 +593,6 @@
"SqlShowColumns()"
"SqlShowCreate()"
"SqlReplaceTable()"
- "SqlRichDescribeTable()"
"SqlAlterMaterializedTable()"
"SqlAlterTable()"
"SqlAlterView()"
@@ -615,6 +616,8 @@
"SqlStopJob()"
"SqlShowJobs()"
"SqlTruncateTable()"
+ "SqlDescribeJob()"
+ "SqlRichDescribeTable()"
]
# List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 95509e7..a9e299e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2961,6 +2961,24 @@
}
/**
+* Parse a "DESCRIBE JOB" statement:
+* DESCRIBE | DESC JOB <JOB_ID>
+*/
+SqlDescribeJob SqlDescribeJob() :
+{
+ SqlCharStringLiteral jobId;
+ SqlParserPos pos;
+}
+{
+ ( <DESCRIBE> | <DESC> ) <JOB> <QUOTED_STRING>
+ {
+ String id = SqlParserUtil.parseString(token.image);
+ jobId = SqlLiteral.createCharString(id, getPos());
+ return new SqlDescribeJob(getPos(), jobId);
+ }
+}
+
+/**
* Parses a STOP JOB statement:
* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
*/
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java
new file mode 100644
index 0000000..af316fc
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+
+/** DESCRIBE | DESC <JOB_ID> sql call. */
+public class SqlDescribeJob extends SqlCall {
+
+ public static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("DESCRIBE JOB", SqlKind.OTHER);
+
+ private final SqlCharStringLiteral jobId;
+
+ public SqlDescribeJob(SqlParserPos pos, SqlCharStringLiteral jobId) {
+ super(pos);
+ this.jobId = jobId;
+ }
+
+ public String getJobId() {
+ return jobId.getValueAs(NlsString.class).getValue();
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Collections.singletonList(jobId);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("DESCRIBE JOB");
+ jobId.unparse(writer, leftPrec, rightPrec);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 49093ad..429ac1d 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2956,6 +2956,12 @@
}
@Test
+ void testDescribeJob() {
+ sql("DESCRIBE JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
+ sql("DESC JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
+ }
+
+ @Test
void testTruncateTable() {
sql("truncate table t1").ok("TRUNCATE TABLE `T1`");
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java
new file mode 100644
index 0000000..a521402
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.command;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.ExecutableOperation;
+import org.apache.flink.table.operations.Operation;
+
+/** Operation to describe a DESCRIBE JOB statement. */
+@Internal
+public class DescribeJobOperation implements Operation, ExecutableOperation {
+
+ private final String jobId;
+
+ public DescribeJobOperation(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("DESCRIBE JOB '%s'", jobId);
+ }
+
+ @Override
+ public TableResultInternal execute(Context ctx) {
+ // TODO: We may need to migrate the execution for ShowJobsOperation from SQL Gateway
+ // OperationExecutor to here.
+ throw new UnsupportedOperationException(
+ "DescribeJobOperation does not support ExecutableOperation yet.");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java
new file mode 100644
index 0000000..9b62570
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlDescribeJob;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
+
+/** A converter for {@link SqlDescribeJob}. */
+public class SqlDescribeJobConverter implements SqlNodeConverter<SqlDescribeJob> {
+
+ @Override
+ public Operation convertSqlNode(SqlDescribeJob node, ConvertContext context) {
+ return new DescribeJobOperation(node.getJobId());
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index ab9da76..caaafc9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -55,6 +55,7 @@
register(new SqlShowDatabasesConverter());
register(new SqlShowCreateCatalogConverter());
register(new SqlDescribeCatalogConverter());
+ register(new SqlDescribeJobConverter());
}
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index cda97a4..7ad2492 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,7 +19,7 @@
import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.sql.parser.ddl.{SqlCompilePlan, SqlReset, SqlSet, SqlUseModules}
-import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlCompileAndExecutePlan, SqlEndStatementSet, SqlExecute, SqlExecutePlan, SqlStatementSet, SqlTruncateTable}
+import org.apache.flink.sql.parser.dml._
import org.apache.flink.sql.parser.dql._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.hint.FlinkHints
@@ -146,6 +146,7 @@
|| sqlNode.isInstanceOf[SqlShowPartitions]
|| sqlNode.isInstanceOf[SqlShowProcedures]
|| sqlNode.isInstanceOf[SqlShowJobs]
+ || sqlNode.isInstanceOf[SqlDescribeJob]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]
|| sqlNode.isInstanceOf[SqlUnloadModule]
|| sqlNode.isInstanceOf[SqlUseModules]