[Improve] [Seatunnel-Web] Add support for Seatunnel 2.3.6 in Seatunnel-Web. (#170)
diff --git a/pom.xml b/pom.xml
index c0e49c7..b0fbead 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,7 @@
<guava.version>19.0</guava.version>
<checker.qual.version>3.10.0</checker.qual.version>
<awaitility.version>4.2.0</awaitility.version>
- <seatunnel-framework.version>2.3.3</seatunnel-framework.version>
+ <seatunnel-framework.version>2.3.6</seatunnel-framework.version>
<oracle-jdbc.version>21.5.0.0</oracle-jdbc.version>
<postgresql.version>42.4.3</postgresql.version>
<sqlserver.version>9.2.1.jre8</sqlserver.version>
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
index 061453f..86fa0c6 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
@@ -16,7 +16,6 @@
*/
package org.apache.seatunnel.app.bean.engine;
-import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -29,7 +28,6 @@
import java.util.Arrays;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -86,21 +84,19 @@
implements DataTypeConvertor<SeaTunnelDataType<?>> {
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {
- return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataType) {
+ return DATA_TYPE_MAP.get(connectorDataType).getRawType();
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
- throws DataTypeConvertException {
+ String field, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}
@Override
public SeaTunnelDataType<?> toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
- throws DataTypeConvertException {
+ String field, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java
index 48da062..cba0a92 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java
@@ -36,7 +36,7 @@
Lists.newArrayList(
// new Engine("Spark", "2.4.0"),
// new Engine("Flink", "1.13.6"),
- new Engine("SeaTunnel", "2.3.1")));
+ new Engine("SeaTunnel", "2.3.6")));
@Override
public List<Engine> listSupportEngines() {
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index 947519f..90d57d2 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -31,10 +31,12 @@
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
-import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.springframework.stereotype.Service;
@@ -104,8 +106,9 @@
jobConfig.setName(jobInstanceId + "_job");
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
try {
- JobExecutionEnvironment jobExecutionEnv =
- seaTunnelClient.createExecutionContext(filePath, jobConfig);
+ SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index 92726a6..bf02de6 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -363,7 +363,7 @@
} else if (statusList.contains("CANCELED")) {
jobStatus = JobStatus.CANCELED.name();
} else if (statusList.contains("CANCELLING")) {
- jobStatus = JobStatus.CANCELLING.name();
+ jobStatus = JobStatus.CANCELING.name();
} else {
jobStatus = JobStatus.RUNNING.name();
}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java
index 03cacb2..c488cbd 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java
@@ -26,8 +26,8 @@
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -103,8 +103,8 @@
tableSchema.getTableName());
Map<String, Object> config = new HashMap<>();
config.put(SQLTransform.KEY_QUERY.key(), sql.getQuery());
- TableFactoryContext context =
- new TableFactoryContext(
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
Collections.singletonList(table),
ReadonlyConfig.fromMap(config),
Thread.currentThread().getContextClassLoader());
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java
index 52c8c4e..6035379 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java
@@ -47,7 +47,6 @@
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,11 +71,17 @@
public TableSchemaServiceImpl() throws IOException {
Common.setStarter(true);
- Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
- if (path.toFile().exists()) {
- List<URL> files = FileUtils.searchJarFiles(path);
- files.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
- factory = new DataTypeConvertorFactory(new URLClassLoader(files.toArray(new URL[0])));
+ Set<PluginIdentifier> pluginIdentifiers =
+ SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
+ ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
+ pluginIdentifiersList.addAll(pluginIdentifiers);
+ List<URL> pluginJarPaths =
+ new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
+ if (!pluginJarPaths.isEmpty()) {
+ pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
+ factory =
+ new DataTypeConvertorFactory(
+ new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
} else {
factory = new DataTypeConvertorFactory();
}
@@ -105,7 +110,8 @@
}
for (TableField field : tableSchemaReq.getFields()) {
- SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
+ SeaTunnelDataType<?> dataType =
+ convertor.toSeaTunnelType(field.getName(), field.getType());
field.setType(dataType.toString());
}
TableSchemaRes res = new TableSchemaRes();
@@ -135,7 +141,8 @@
}
for (TableField field : tableFields) {
try {
- SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
+ SeaTunnelDataType<?> dataType =
+ convertor.toSeaTunnelType(field.getName(), field.getType());
field.setUnSupport(false);
field.setOutputDataType(dataType.toString());
} catch (Exception exception) {
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java
index 20d39e8..390ac62 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java
@@ -17,9 +17,10 @@
package org.apache.seatunnel.app.thirdparty.engine;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
-import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import com.hazelcast.client.config.ClientConfig;
@@ -31,16 +32,21 @@
@Slf4j
public class SeaTunnelEngineProxy {
- ClientConfig clientConfig = null;
- private SeaTunnelEngineProxy() {
- clientConfig = ConfigProvider.locateAndGetClientConfig();
+ private ClientConfig clientConfig = null;
+
+ private static class SeaTunnelEngineProxyHolder {
+ private static final SeaTunnelEngineProxy INSTANCE = new SeaTunnelEngineProxy();
}
public static SeaTunnelEngineProxy getInstance() {
return SeaTunnelEngineProxyHolder.INSTANCE;
}
+ private SeaTunnelEngineProxy() {
+ clientConfig = ConfigProvider.locateAndGetClientConfig();
+ }
+
public String getMetricsContent(@NonNull String jobEngineId) {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
try {
@@ -89,10 +95,6 @@
}
}
- private static class SeaTunnelEngineProxyHolder {
- private static final SeaTunnelEngineProxy INSTANCE = new SeaTunnelEngineProxy();
- }
-
public String getAllRunningJobMetricsContent() {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
@@ -103,22 +105,24 @@
}
}
- public void pauseJob(String jobEngineId) {
- SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
- JobClient jobClient = seaTunnelClient.getJobClient();
- jobClient.savePointJob(Long.valueOf(jobEngineId));
+ public void pauseJob(@NonNull String jobEngineId) {
+ try (SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig)) {
+ seaTunnelClient.getJobClient().savePointJob(Long.valueOf(jobEngineId));
+ } catch (Exception e) {
+ log.warn("Can not pause job from engine.", e);
+ }
}
public void restoreJob(
@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
- SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
- try {
- seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
+ SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
+ try (SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig)) {
+ seaTunnelClient
+ .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
+ .execute();
+ } catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/framework/PluginDiscoveryUtil.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/framework/PluginDiscoveryUtil.java
index f5fae52..4641574 100644
--- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/framework/PluginDiscoveryUtil.java
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/framework/PluginDiscoveryUtil.java
@@ -26,7 +26,6 @@
import org.apache.seatunnel.app.dynamicforms.FormStructure;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -36,7 +35,6 @@
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -57,17 +55,23 @@
}
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
- PluginType pluginType) throws IOException {
+ PluginType pluginType) {
Common.setStarter(true);
if (!pluginType.equals(PluginType.SOURCE)) {
throw new UnsupportedOperationException("ONLY support plugin type source");
}
- Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
+
+ ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
+ pluginIdentifiers.addAll(
+ SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());
+ List<URL> pluginJarPaths =
+ new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);
+
List<Factory> factories;
- if (path.toFile().exists()) {
- List<URL> files = FileUtils.searchJarFiles(path);
+ if (!pluginJarPaths.isEmpty()) {
factories =
- FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
+ FactoryUtil.discoverFactories(
+ new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
} else {
factories =
FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
diff --git a/seatunnel-ui/src/layouts/dashboard/index.tsx b/seatunnel-ui/src/layouts/dashboard/index.tsx
index ffa80ea..d504cca 100644
--- a/seatunnel-ui/src/layouts/dashboard/index.tsx
+++ b/seatunnel-ui/src/layouts/dashboard/index.tsx
@@ -72,7 +72,7 @@
style={'height: 100%'}
size='small'
>
- <router-view key={this.$route.fullPath} class={!this.showSide && 'px-32 py-12'} />
+ <router-view key={this['$route'].fullPath} class={!this.showSide && 'px-32 py-12'} />
</NSpace>
</NLayoutContent>
</NLayout>
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 1387d78..585cea8 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -10,7 +10,8 @@
guava-19.0.jar
hibernate-validator-6.2.2.Final.jar
jackson-annotations-2.12.6.jar
-jackson-core-2.12.6.jar
+jackson-core-2.13.3.jar
+jackson-annotations-2.13.3.jar
jackson-databind-2.12.6.jar
jackson-datatype-jdk8-2.13.3.jar
jackson-datatype-jsr310-2.13.3.jar
@@ -35,7 +36,7 @@
jjwt-impl-0.10.7.jar
jjwt-jackson-0.10.7.jar
mapstruct-1.0.0.Final.jar
-jsqlparser-4.4.jar
+jsqlparser-4.5.jar
mybatis-3.5.10.jar
mybatis-spring-2.0.7.jar
mybatis-plus-3.5.3.1.jar
@@ -43,7 +44,7 @@
mybatis-plus-boot-starter-3.5.3.1.jar
mybatis-plus-core-3.5.3.1.jar
mybatis-plus-extension-3.5.3.1.jar
-scala-library-2.11.12.jar
+scala-library-2.12.15.jar
@@ -86,14 +87,13 @@
h2-2.1.214.jar
auto-service-annotations-1.0.1.jar
jsr305-3.0.0.jar
-checkpoint-storage-api-2.3.3.jar
-checkpoint-storage-local-file-2.3.3.jar
+checkpoint-storage-api-2.3.6.jar
+checkpoint-storage-local-file-2.3.6.jar
clickhouse-cli-client-0.3.2-patch11-shaded.jar
clickhouse-grpc-client-0.3.2-patch11-shaded.jar
clickhouse-http-client-0.3.2-patch11-shaded.jar
clickhouse-jdbc-0.3.2-patch11.jar
commons-lang3-3.4.jar
-hazelcast-5.1.jar
httpcore-4.4.13.jar
httpmime-4.5.13.jar
jcl-over-slf4j-1.7.25.jar
@@ -107,20 +107,22 @@
protostuff-collectionschema-1.8.0.jar
protostuff-core-1.8.0.jar
protostuff-runtime-1.8.0.jar
-seatunnel-api-2.3.3.jar
-seatunnel-common-2.3.3.jar
-seatunnel-config-base-2.3.3.jar
-seatunnel-config-shade-2.3.3.jar
-seatunnel-core-starter-2.3.3.jar
-seatunnel-engine-client-2.3.3.jar
-seatunnel-engine-common-2.3.3.jar
-seatunnel-engine-core-2.3.3.jar
-seatunnel-guava-2.3.3-optional.jar
-seatunnel-jackson-2.3.3-optional.jar
-seatunnel-plugin-discovery-2.3.3.jar
-seatunnel-transforms-v2-2.3.3.jar
-serializer-api-2.3.3.jar
-serializer-protobuf-2.3.3.jar
+seatunnel-api-2.3.6.jar
+seatunnel-common-2.3.6.jar
+seatunnel-config-base-2.3.6.jar
+seatunnel-config-shade-2.3.6.jar
+seatunnel-core-starter-2.3.6.jar
+seatunnel-engine-client-2.3.6.jar
+seatunnel-engine-common-2.3.6.jar
+seatunnel-engine-core-2.3.6.jar
+seatunnel-guava-2.3.6-optional.jar
+seatunnel-jackson-2.3.6-optional.jar
+seatunnel-plugin-discovery-2.3.6.jar
+seatunnel-transforms-v2-2.3.6.jar
+seatunnel-config-sql-2.3.6.jar
+seatunnel-hazelcast-shade-2.3.6-optional.jar
+serializer-api-2.3.6.jar
+serializer-protobuf-2.3.6.jar
swagger-annotations-2.2.14.jar
commons-codec-1.11.jar
commons-compress-1.20.jar
@@ -133,6 +135,7 @@
security-206.jar
snappy-java-1.1.8.4.jar
zstd-jni-1.5.2-1.jar
+commons-csv-1.10.0.jar