[FLINK-21822][table] Introduce CatalogFactoryHelper
THis closes #15245.
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
index 3b361ad..2fb74f2 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
@@ -19,11 +19,10 @@
package org.apache.flink.connector.jdbc.catalog.factory;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,40 +65,15 @@
@Override
public Catalog createCatalog(Context context) {
- final Configuration configuration = Configuration.fromMap(context.getOptions());
- validateConfiguration(configuration);
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validate();
return new JdbcCatalog(
context.getName(),
- configuration.getString(DEFAULT_DATABASE),
- configuration.getString(USERNAME),
- configuration.getString(PASSWORD),
- configuration.getString(BASE_URL));
- }
-
- private void validateConfiguration(Configuration configuration) {
- final String defaultDatabase = configuration.getString(DEFAULT_DATABASE);
- if (defaultDatabase == null || defaultDatabase.isEmpty()) {
- throw new ValidationException(
- String.format("Missing or empty value for '%s'", DEFAULT_DATABASE.key()));
- }
-
- final String username = configuration.getString(USERNAME);
- if (username == null || username.isEmpty()) {
- throw new ValidationException(
- String.format("Missing or empty value for '%s'", USERNAME.key()));
- }
-
- final String password = configuration.getString(PASSWORD);
- if (password == null || password.isEmpty()) {
- throw new ValidationException(
- String.format("Missing or empty value for '%s'", PASSWORD.key()));
- }
-
- final String baseUrl = configuration.getString(BASE_URL);
- if (baseUrl == null || baseUrl.isEmpty()) {
- throw new ValidationException(
- String.format("Missing or empty value for '%s'", BASE_URL.key()));
- }
+ helper.getOptions().get(DEFAULT_DATABASE),
+ helper.getOptions().get(USERNAME),
+ helper.getOptions().get(PASSWORD),
+ helper.getOptions().get(BASE_URL));
}
}