[FLINK-38087][sql-gateway] Improve OperationExecutor error messages and Java stream usage
The configureSession() error message does not include SET/RESET.
There are also some usages of Java stream which can be improved.
This closes #26781
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 9612b57..acf5ac7 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -82,7 +82,9 @@
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.UseOperation;
+import org.apache.flink.table.operations.UseCatalogOperation;
+import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.UseModulesOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
@@ -92,11 +94,32 @@
import org.apache.flink.table.operations.command.ShowJarsOperation;
import org.apache.flink.table.operations.command.ShowJobsOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
-import org.apache.flink.table.operations.ddl.AlterOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogCommentOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogOptionsOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation;
+import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterModelChangeOperation;
+import org.apache.flink.table.operations.ddl.AlterModelRenameOperation;
+import org.apache.flink.table.operations.ddl.AlterTableOperation;
+import org.apache.flink.table.operations.ddl.AlterViewOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
+import org.apache.flink.table.operations.ddl.CreateModelOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropOperation;
+import org.apache.flink.table.operations.ddl.CreateViewOperation;
+import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.DropCatalogOperation;
+import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropModelOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
+import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
+import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableOperation;
+import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.utils.DateTimeUtils;
@@ -114,8 +137,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -142,6 +167,9 @@
private static final Logger LOG = LoggerFactory.getLogger(OperationExecutor.class);
+ private static final Map<Class<?>, String> SUPPORTED_CONFIG_SESSION_OPERATIONS =
+ createSupportedInitializationOperations();
+
protected final SessionContext sessionContext;
private final Configuration executionConfig;
@@ -180,28 +208,14 @@
}
Operation op = parsedOperations.get(0);
- if (!(op instanceof SetOperation)
- && !(op instanceof ResetOperation)
- && !(op instanceof CreateOperation)
- && !(op instanceof DropOperation)
- && !(op instanceof UseOperation)
- && !(op instanceof AlterOperation)
- && !(op instanceof LoadModuleOperation)
- && !(op instanceof UnloadModuleOperation)
- && !(op instanceof AddJarOperation)) {
+ if (SUPPORTED_CONFIG_SESSION_OPERATIONS.keySet().stream()
+ .noneMatch(c -> c.isInstance(op))) {
throw new UnsupportedOperationException(
String.format(
- "Unsupported statement for configuring session:%s\n"
- + "The configureSession API only supports to execute statement of type "
- + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
- + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
- + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
- + "CREATE CATALOG, DROP CATALOG, "
- + "USE CATALOG, USE [CATALOG.]DATABASE, "
- + "CREATE VIEW, DROP VIEW, "
- + "LOAD MODULE, UNLOAD MODULE, USE MODULE, "
- + "ADD JAR.",
- statement));
+ "Unsupported statement for configuring session: %s\n"
+ + "The configureSession API only supports executing statements of type %s.",
+ statement,
+ String.join(", ", SUPPORTED_CONFIG_SESSION_OPERATIONS.values())));
}
if (op instanceof SetOperation) {
@@ -290,8 +304,8 @@
public Set<TableInfo> listTables(
String catalogName, String databaseName, Set<TableKind> tableKinds) {
checkArgument(
- Arrays.asList(TableKind.TABLE, TableKind.VIEW).containsAll(tableKinds),
- "Currently only support to list TABLE, VIEW or TABLE AND VIEW.");
+ EnumSet.of(TableKind.TABLE, TableKind.VIEW).containsAll(tableKinds),
+ "Currently only supports listing TABLE, VIEW or TABLE AND VIEW.");
if (tableKinds.contains(TableKind.TABLE) && tableKinds.contains(TableKind.VIEW)) {
return listTables(catalogName, databaseName, true);
} else if (tableKinds.contains(TableKind.TABLE)) {
@@ -608,7 +622,7 @@
// set a property
sessionContext.set(setOp.getKey().get().trim(), setOp.getValue().get().trim());
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
- } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+ } else if (setOp.getKey().isEmpty() && setOp.getValue().isEmpty()) {
// show all properties
Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
return ResultFetcher.fromResults(
@@ -743,23 +757,21 @@
ObjectIdentifier.of(
catalogName, databaseName, name),
TableKind.TABLE)));
- return Collections.unmodifiableSet(new HashSet<>(ans.values()));
+ return ans.values().stream().collect(Collectors.toUnmodifiableSet());
}
private Set<TableInfo> listViews(String catalogName, String databaseName) {
- return Collections.unmodifiableSet(
- sessionContext
- .getSessionState()
- .catalogManager
- .listViews(catalogName, databaseName)
- .stream()
- .map(
- name ->
- new TableInfo(
- ObjectIdentifier.of(
- catalogName, databaseName, name),
- TableKind.VIEW))
- .collect(Collectors.toSet()));
+ return sessionContext
+ .getSessionState()
+ .catalogManager
+ .listViews(catalogName, databaseName)
+ .stream()
+ .map(
+ name ->
+ new TableInfo(
+ ObjectIdentifier.of(catalogName, databaseName, name),
+ TableKind.VIEW))
+ .collect(Collectors.toUnmodifiableSet());
}
public ResultFetcher callStopJobOperation(
@@ -890,7 +902,7 @@
}
});
- if (!jobStatusOp.isPresent()) {
+ if (jobStatusOp.isEmpty()) {
throw new SqlExecutionException(
String.format("Described job %s does not exist in the cluster.", jobId));
}
@@ -945,6 +957,53 @@
}
}
+ private static Map<Class<?>, String> createSupportedInitializationOperations() {
+ // Use LinkedHashMap to preserve insertion order
+ Map<Class<?>, String> ops = new LinkedHashMap<>();
+ // Configuration operations
+ ops.put(SetOperation.class, "SET");
+ ops.put(ResetOperation.class, "RESET");
+ // CREATE operations
+ ops.put(CreateTableOperation.class, "CREATE TABLE");
+ ops.put(CreateViewOperation.class, "CREATE VIEW");
+ ops.put(CreateDatabaseOperation.class, "CREATE DATABASE");
+ ops.put(CreateCatalogFunctionOperation.class, "CREATE FUNCTION");
+ ops.put(CreateTempSystemFunctionOperation.class, "CREATE TEMPORARY SYSTEM FUNCTION");
+ ops.put(CreateCatalogOperation.class, "CREATE CATALOG");
+ ops.put(CreateModelOperation.class, "CREATE MODEL");
+ ops.put(CreateMaterializedTableOperation.class, "CREATE MATERIALIZED TABLE");
+ // DROP operations
+ ops.put(DropTableOperation.class, "DROP TABLE");
+ ops.put(DropViewOperation.class, "DROP VIEW");
+ ops.put(DropDatabaseOperation.class, "DROP DATABASE");
+ ops.put(DropCatalogFunctionOperation.class, "DROP FUNCTION");
+ ops.put(DropTempSystemFunctionOperation.class, "DROP TEMPORARY SYSTEM FUNCTION");
+ ops.put(DropCatalogOperation.class, "DROP CATALOG");
+ ops.put(DropModelOperation.class, "DROP MODEL");
+ ops.put(DropMaterializedTableOperation.class, "DROP MATERIALIZED TABLE");
+ // ALTER operations
+ ops.put(AlterTableOperation.class, "ALTER TABLE");
+ ops.put(AlterViewOperation.class, "ALTER VIEW");
+ ops.put(AlterDatabaseOperation.class, "ALTER DATABASE");
+ ops.put(AlterCatalogFunctionOperation.class, "ALTER FUNCTION");
+ ops.put(AlterCatalogOptionsOperation.class, "ALTER CATALOG SET");
+ ops.put(AlterCatalogResetOperation.class, "ALTER CATALOG RESET");
+ ops.put(AlterCatalogCommentOperation.class, "ALTER CATALOG COMMENT");
+ ops.put(AlterModelChangeOperation.class, "ALTER MODEL");
+ ops.put(AlterModelRenameOperation.class, "ALTER MODEL");
+ ops.put(AlterMaterializedTableOperation.class, "ALTER MATERIALIZED TABLE");
+ // USE operations
+ ops.put(UseCatalogOperation.class, "USE CATALOG");
+ ops.put(UseDatabaseOperation.class, "USE [CATALOG.]DATABASE");
+ ops.put(UseModulesOperation.class, "USE MODULES");
+ // Module operations
+ ops.put(LoadModuleOperation.class, "LOAD MODULE");
+ ops.put(UnloadModuleOperation.class, "UNLOAD MODULE");
+ // AddJar operation
+ ops.put(AddJarOperation.class, "ADD JAR");
+ return Collections.unmodifiableMap(ops);
+ }
+
/**
* Internal interface to encapsulate cluster actions which are executed via the {@link
* ClusterClient}.
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index e2536b0..d64ed8f 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -981,13 +981,21 @@
.satisfies(
FlinkAssertions.anyCauseMatches(
UnsupportedOperationException.class,
- "Unsupported statement for configuring session:SELECT 1;\n"
- + "The configureSession API only supports to execute statement of type "
- + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
- + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
- + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
- + "CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, "
- + "CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR."));
+ "Unsupported statement for configuring session: SELECT 1;\n"
+ + "The configureSession API only supports executing statements of type "
+ + "SET, RESET, "
+ + "CREATE TABLE, CREATE VIEW, CREATE DATABASE, "
+ + "CREATE FUNCTION, CREATE TEMPORARY SYSTEM FUNCTION, "
+ + "CREATE CATALOG, CREATE MODEL, CREATE MATERIALIZED TABLE, "
+ + "DROP TABLE, DROP VIEW, DROP DATABASE, DROP FUNCTION, "
+ + "DROP TEMPORARY SYSTEM FUNCTION, DROP CATALOG, "
+ + "DROP MODEL, DROP MATERIALIZED TABLE, "
+ + "ALTER TABLE, ALTER VIEW, ALTER DATABASE, ALTER FUNCTION, "
+ + "ALTER CATALOG SET, ALTER CATALOG RESET, ALTER CATALOG COMMENT, "
+ + "ALTER MODEL, ALTER MODEL, ALTER MATERIALIZED TABLE, "
+ + "USE CATALOG, USE [CATALOG.]DATABASE, USE MODULES, "
+ + "LOAD MODULE, UNLOAD MODULE, "
+ + "ADD JAR"));
}
@Test