| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.seatunnel.app.service.impl; |
| |
| import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; |
| |
| import org.apache.seatunnel.app.config.ConnectorDataSourceMapperConfig; |
| import org.apache.seatunnel.app.dal.dao.IJobLineDao; |
| import org.apache.seatunnel.app.dal.dao.IJobTaskDao; |
| import org.apache.seatunnel.app.dal.dao.IJobVersionDao; |
| import org.apache.seatunnel.app.dal.entity.JobLine; |
| import org.apache.seatunnel.app.dal.entity.JobTask; |
| import org.apache.seatunnel.app.dal.entity.JobVersion; |
| import org.apache.seatunnel.app.domain.request.connector.SceneMode; |
| import org.apache.seatunnel.app.domain.request.job.DataSourceOption; |
| import org.apache.seatunnel.app.domain.request.job.DatabaseTableSchemaReq; |
| import org.apache.seatunnel.app.domain.request.job.Edge; |
| import org.apache.seatunnel.app.domain.request.job.JobDAG; |
| import org.apache.seatunnel.app.domain.request.job.JobTaskInfo; |
| import org.apache.seatunnel.app.domain.request.job.PluginConfig; |
| import org.apache.seatunnel.app.domain.request.job.SelectTableFields; |
| import org.apache.seatunnel.app.domain.request.job.transform.Copy; |
| import org.apache.seatunnel.app.domain.request.job.transform.CopyTransformOptions; |
| import org.apache.seatunnel.app.domain.request.job.transform.FieldMapperTransformOptions; |
| import org.apache.seatunnel.app.domain.request.job.transform.RenameField; |
| import org.apache.seatunnel.app.domain.request.job.transform.SQLTransformOptions; |
| import org.apache.seatunnel.app.domain.request.job.transform.SplitTransformOptions; |
| import org.apache.seatunnel.app.domain.request.job.transform.Transform; |
| import org.apache.seatunnel.app.domain.request.job.transform.TransformOption; |
| import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions; |
| import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; |
| import org.apache.seatunnel.app.domain.response.job.SchemaError; |
| import org.apache.seatunnel.app.domain.response.job.SchemaErrorType; |
| import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant; |
| import org.apache.seatunnel.app.service.IDatasourceService; |
| import org.apache.seatunnel.app.service.IJobInstanceService; |
| import org.apache.seatunnel.app.service.IJobTaskService; |
| import org.apache.seatunnel.common.constants.PluginType; |
| import org.apache.seatunnel.common.utils.ExceptionUtils; |
| import org.apache.seatunnel.common.utils.SeaTunnelException; |
| import org.apache.seatunnel.datasource.plugin.api.model.TableField; |
| import org.apache.seatunnel.server.common.CodeGenerateUtils; |
| import org.apache.seatunnel.server.common.SeatunnelErrorEnum; |
| import org.apache.seatunnel.server.common.SeatunnelException; |
| |
| import org.apache.commons.lang3.StringUtils; |
| |
| import org.springframework.stereotype.Service; |
| import org.springframework.transaction.annotation.Transactional; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import javax.annotation.Resource; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.apache.seatunnel.app.utils.TaskOptionUtils.getTransformOption; |
| |
| @Service |
| @Slf4j |
| public class JobTaskServiceImpl extends SeatunnelBaseServiceImpl implements IJobTaskService { |
| |
| @Resource(name = "jobLineDaoImpl") |
| private IJobLineDao jobLineDao; |
| |
| @Resource(name = "jobTaskDaoImpl") |
| private IJobTaskDao jobTaskDao; |
| |
| @Resource(name = "jobVersionDaoImpl") |
| private IJobVersionDao jobVersionDao; |
| |
| @Resource private IDatasourceService datasourceService; |
| |
| @Resource private IJobInstanceService jobInstanceService; |
| |
| @Resource private ConnectorDataSourceMapperConfig connectorDataSourceMapperConfig; |
| |
| private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); |
| |
| private void checkConfigIntegrity(JobVersion version, JobTaskInfo jobTaskInfo) { |
| if (StringUtils.isEmpty(version.getEnv())) { |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.ERROR_CONFIG, |
| "job env can't be empty, please change config"); |
| } |
| Map<String, PluginConfig> pluginConfigMap = |
| jobTaskInfo.getPlugins().stream() |
| .collect(Collectors.toMap(PluginConfig::getPluginId, Function.identity())); |
| |
| List<String> allPluginIdsFromEdge = |
| Stream.concat( |
| jobTaskInfo.getEdges().stream().map(Edge::getInputPluginId), |
| jobTaskInfo.getEdges().stream().map(Edge::getTargetPluginId)) |
| .collect(Collectors.toList()); |
| |
| jobTaskInfo.getPlugins().stream() |
| .filter(p -> !allPluginIdsFromEdge.contains(p.getPluginId())) |
| .findAny() |
| .ifPresent( |
| p -> { |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.ERROR_CONFIG, |
| "plugin '" + p.getName() + "' is not used in any edge"); |
| }); |
| |
| jobTaskInfo |
| .getEdges() |
| .forEach( |
| e -> { |
| jobTaskInfo.getPlugins().stream() |
| .filter( |
| p -> |
| Objects.equals( |
| p.getPluginId(), e.getInputPluginId())) |
| .findFirst() |
| .orElseThrow( |
| () -> |
| new SeatunnelException( |
| SeatunnelErrorEnum.ERROR_CONFIG, |
| "input plugin not found")); |
| jobTaskInfo.getPlugins().stream() |
| .filter( |
| p -> |
| Objects.equals( |
| p.getPluginId(), e.getTargetPluginId())) |
| .findFirst() |
| .orElseThrow( |
| () -> |
| new SeatunnelException( |
| SeatunnelErrorEnum.ERROR_CONFIG, |
| "target plugin not found")); |
| }); |
| |
| List<String> inputTransformId = |
| jobTaskInfo.getEdges().stream() |
| .map(Edge::getInputPluginId) |
| .filter(e -> pluginConfigMap.get(e).getType().equals(PluginType.TRANSFORM)) |
| .collect(Collectors.toList()); |
| List<String> targetTransformId = |
| jobTaskInfo.getEdges().stream() |
| .map(Edge::getTargetPluginId) |
| .filter(e -> pluginConfigMap.get(e).getType().equals(PluginType.TRANSFORM)) |
| .collect(Collectors.toList()); |
| |
| if (!new HashSet<>(inputTransformId).containsAll(targetTransformId) |
| || !new HashSet<>(targetTransformId).containsAll(inputTransformId)) { |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.ERROR_CONFIG, "transform plugin must be connected"); |
| } |
| |
| for (Edge edge : jobTaskInfo.getEdges()) { |
| if (!pluginTypeMatch( |
| pluginConfigMap.get(edge.getInputPluginId()).getType(), |
| pluginConfigMap.get(edge.getTargetPluginId()).getType())) { |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.ERROR_CONFIG, |
| "plugin line not match, please check plugin line"); |
| } |
| } |
| } |
| |
| private boolean pluginTypeMatch(PluginType inputType, PluginType outputType) { |
| if (inputType == PluginType.SOURCE) { |
| return outputType != PluginType.SOURCE; |
| } |
| if (inputType == PluginType.SINK) { |
| return false; |
| } |
| if (inputType == PluginType.TRANSFORM) { |
| return outputType != PluginType.SOURCE; |
| } |
| return false; |
| } |
| |
| private static void checkConfigFormat(String config) { |
| if (StringUtils.isNotEmpty(config)) { |
| ConfigFactory.parseString(config); |
| } |
| } |
| |
| @Override |
| public JobTaskInfo getTaskConfig(long jobVersionId) { |
| funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_TASK_DETAIL, 0); |
| List<JobTask> tasks = jobTaskDao.getTasksByVersionId(jobVersionId); |
| if (tasks.isEmpty()) { |
| return null; |
| } |
| List<JobLine> lines = jobLineDao.getLinesByVersionId(jobVersionId); |
| return new JobTaskInfo( |
| lines.stream() |
| .map(l -> new Edge(l.getInputPluginId(), l.getTargetPluginId())) |
| .collect(Collectors.toList()), |
| tasks.stream() |
| .map( |
| t -> { |
| try { |
| return getPluginConfigFromJobTask(t); |
| } catch (Exception e) { |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.UNKNOWN, e.getMessage()); |
| } |
| }) |
| .collect(Collectors.toList())); |
| } |
| |
| @Override |
| @Transactional |
| public JobTaskCheckRes saveJobDAG(long jobVersionId, JobDAG jobDAG) { |
| funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_TASK_DAG_CREATE, 0); |
| JobVersion version = jobVersionDao.getVersionById(jobVersionId); |
| List<JobTask> tasks = jobTaskDao.getTasksByVersionId(jobVersionId); |
| List<PluginConfig> pluginConfigs = |
| tasks.stream() |
| .map(JobTaskServiceImpl::getPluginConfigFromJobTask) |
| .collect(Collectors.toList()); |
| JobTaskInfo taskInfo = new JobTaskInfo(jobDAG.getEdges(), pluginConfigs); |
| checkConfigIntegrity(version, taskInfo); |
| List<JobLine> lines = |
| jobDAG.getEdges().stream() |
| .map( |
| e -> { |
| try { |
| return JobLine.builder() |
| .id(CodeGenerateUtils.getInstance().genCode()) |
| .inputPluginId(e.getInputPluginId()) |
| .targetPluginId(e.getTargetPluginId()) |
| .versionId(jobVersionId) |
| .build(); |
| } catch (CodeGenerateUtils.CodeGenerateException ex) { |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.ILLEGAL_STATE, ex.getMessage()); |
| } |
| }) |
| .collect(Collectors.toList()); |
| |
| try { |
| JobTaskCheckRes jobTaskCheckRes = checkPluginSchemaIntegrity(taskInfo); |
| if (jobTaskCheckRes != null) { |
| return jobTaskCheckRes; |
| } |
| // check the config can be generated |
| jobInstanceService.generateJobConfig( |
| version.getJobId(), tasks, lines, version.getEnv()); |
| // TODO check schema output and input matched |
| } catch (SeaTunnelException e) { |
| log.error(ExceptionUtils.getMessage(e)); |
| throw e; |
| } catch (Exception e) { |
| throw new SeatunnelException(SeatunnelErrorEnum.ERROR_CONFIG, e.getMessage()); |
| } |
| |
| jobLineDao.deleteLinesByVersionId(jobVersionId); |
| jobLineDao.insertLines(lines); |
| return null; |
| } |
| |
| private JobTaskCheckRes checkPluginSchemaIntegrity(JobTaskInfo taskInfo) throws IOException { |
| |
| List<PluginConfig> source = |
| taskInfo.getPlugins().stream() |
| .filter(p -> p.getType().equals(PluginType.SOURCE)) |
| .collect(Collectors.toList()); |
| |
| Map<String, PluginConfig> pluginMap = |
| taskInfo.getPlugins().stream() |
| .collect(Collectors.toMap(PluginConfig::getPluginId, Function.identity())); |
| Map<String, String> edgeMap = |
| taskInfo.getEdges().stream() |
| .collect(Collectors.toMap(Edge::getInputPluginId, Edge::getTargetPluginId)); |
| |
| for (PluginConfig config : source) { |
| PluginConfig nextConfig = pluginMap.get(edgeMap.get(config.getPluginId())); |
| JobTaskCheckRes res = checkNextTaskSchema(config, nextConfig, pluginMap, edgeMap); |
| if (res != null) { |
| return res; |
| } |
| } |
| return null; |
| } |
| |
| private JobTaskCheckRes checkNextTaskSchema( |
| PluginConfig config, |
| PluginConfig nextConfig, |
| Map<String, PluginConfig> pluginMap, |
| Map<String, String> edgeMap) |
| throws IOException { |
| Map<String, Object> options = nextConfig.getTransformOptions(); |
| if (options != null && !options.isEmpty()) { |
| Transform transform = Transform.valueOf(nextConfig.getConnectorType().toUpperCase()); |
| String transformOptionsStr = OBJECT_MAPPER.writeValueAsString(options); |
| |
| List<TransformOption> transformOptions = new ArrayList<>(); |
| |
| switch (transform) { |
| case FIELDMAPPER: |
| FieldMapperTransformOptions fieldMapperTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (fieldMapperTransformOptions != null) { |
| fillTransformOptions( |
| transformOptions, fieldMapperTransformOptions.getRenameFields()); |
| fillTransformOptions( |
| transformOptions, fieldMapperTransformOptions.getChangeOrders()); |
| } |
| break; |
| case MULTIFIELDSPLIT: |
| SplitTransformOptions splitTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (splitTransformOptions != null) { |
| fillTransformOptions(transformOptions, splitTransformOptions.getSplits()); |
| } |
| break; |
| case COPY: |
| CopyTransformOptions copyTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (copyTransformOptions != null) { |
| fillTransformOptions(transformOptions, copyTransformOptions.getCopyList()); |
| } |
| break; |
| case SQL: |
| SQLTransformOptions sqlTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (sqlTransformOptions != null) { |
| fillTransformOptions( |
| transformOptions, |
| Collections.singletonList(sqlTransformOptions.getSql())); |
| } |
| break; |
| case FILTERROWKIND: |
| case REPLACE: |
| default: |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.UNSUPPORTED_CONNECTOR_TYPE, |
| "Unsupported Transform Option " + transform); |
| } |
| |
| if (!transformOptions.isEmpty()) { |
| DatabaseTableSchemaReq databaseTableSchemaReq = config.getOutputSchema().get(0); |
| List<String> fields = |
| databaseTableSchemaReq.getFields().stream() |
| .map(TableField::getName) |
| .collect(Collectors.toList()); |
| Optional<TransformOption> transformOption = |
| transformOptions.stream() |
| .filter(option -> !fields.contains(option.getSourceFieldName())) |
| .findFirst(); |
| if (transformOption.isPresent()) { |
| String sourceFiledName = transformOption.get().getSourceFieldName(); |
| return new JobTaskCheckRes( |
| false, |
| nextConfig.getPluginId(), |
| new SchemaError( |
| databaseTableSchemaReq.getDatabase(), |
| databaseTableSchemaReq.getTableName(), |
| sourceFiledName, |
| SchemaErrorType.MISS_FIELD), |
| null); |
| } |
| } |
| } |
| if (edgeMap.containsKey(nextConfig.getPluginId())) { |
| return checkNextTaskSchema( |
| nextConfig, |
| pluginMap.get(edgeMap.get(nextConfig.getPluginId())), |
| pluginMap, |
| edgeMap); |
| } |
| return null; |
| } |
| |
| private static void fillTransformOptions( |
| List<TransformOption> transformOptions, List<? extends TransformOption> options) { |
| if (options != null) { |
| transformOptions.addAll(options); |
| } |
| } |
| |
| @Override |
| public void saveSingleTask(long jobVersionId, PluginConfig pluginConfig) { |
| funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.SINGLE_TASK_CREATE, 0); |
| JobTask jobTask; |
| JobTask old = jobTaskDao.getTask(jobVersionId, pluginConfig.getPluginId()); |
| try { |
| checkConfigFormat(pluginConfig.getConfig()); |
| long id; |
| if (old != null) { |
| id = old.getId(); |
| } else { |
| id = CodeGenerateUtils.getInstance().genCode(); |
| } |
| String connectorType; |
| String transformOptionsStr = null; |
| if (pluginConfig.getType().equals(PluginType.TRANSFORM)) { |
| connectorType = pluginConfig.getConnectorType(); |
| if (pluginConfig.getTransformOptions() != null) { |
| transformOptionsStr = |
| OBJECT_MAPPER.writeValueAsString(pluginConfig.getTransformOptions()); |
| } |
| transformOptionCheck(connectorType, transformOptionsStr); |
| } else { |
| connectorType = getConnectorTypeFromDataSource(pluginConfig.getDataSourceId()); |
| } |
| jobTask = |
| JobTask.builder() |
| .id(id) |
| .pluginId(pluginConfig.getPluginId()) |
| .name(pluginConfig.getName()) |
| .type(pluginConfig.getType().name().toUpperCase()) |
| .dataSourceId(pluginConfig.getDataSourceId()) |
| .config(pluginConfig.getConfig()) |
| .sceneMode( |
| pluginConfig.getSceneMode() == null |
| ? null |
| : pluginConfig.getSceneMode().name()) |
| .versionId(jobVersionId) |
| .connectorType(connectorType) |
| .dataSourceOption( |
| pluginConfig.getTableOption() == null |
| ? null |
| : OBJECT_MAPPER.writeValueAsString( |
| pluginConfig.getTableOption())) |
| .selectTableFields( |
| pluginConfig.getSelectTableFields() == null |
| ? null |
| : OBJECT_MAPPER.writeValueAsString( |
| pluginConfig.getSelectTableFields())) |
| .outputSchema( |
| pluginConfig.getOutputSchema() == null |
| ? null |
| : OBJECT_MAPPER.writeValueAsString( |
| pluginConfig.getOutputSchema())) |
| .transformOptions(transformOptionsStr) |
| .build(); |
| } catch (Exception e) { |
| throw new SeatunnelException(SeatunnelErrorEnum.ILLEGAL_STATE, e.getMessage()); |
| } |
| if (old != null) { |
| jobTaskDao.updateTask(jobTask); |
| } else { |
| jobTaskDao.insertTask(jobTask); |
| } |
| } |
| |
| private void transformOptionCheck(String connectorType, String transformOptionsStr) |
| throws IOException { |
| Transform transform = Transform.valueOf(connectorType.toUpperCase()); |
| switch (transform) { |
| case FIELDMAPPER: |
| FieldMapperTransformOptions fieldMapperTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (fieldMapperTransformOptions != null) { |
| List<RenameField> renameFields = fieldMapperTransformOptions.getRenameFields(); |
| checkTransformTargetFieldRepeat( |
| renameFields.stream() |
| .map(RenameField::getTargetName) |
| .collect(Collectors.toList())); |
| } |
| break; |
| case MULTIFIELDSPLIT: |
| SplitTransformOptions splitTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (splitTransformOptions != null) { |
| List<String> fields = |
| splitTransformOptions.getSplits().stream() |
| .flatMap(split -> split.getOutputFields().stream()) |
| .collect(Collectors.toList()); |
| checkTransformTargetFieldRepeat(fields); |
| } |
| break; |
| case COPY: |
| CopyTransformOptions copyTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (copyTransformOptions != null) { |
| List<String> fields = |
| copyTransformOptions.getCopyList().stream() |
| .map(Copy::getTargetFieldName) |
| .collect(Collectors.toList()); |
| checkTransformTargetFieldRepeat(fields); |
| } |
| break; |
| case SQL: |
| SQLTransformOptions sqlTransformOptions = |
| getTransformOption(transform, transformOptionsStr); |
| if (sqlTransformOptions != null) { |
| // TODO 调用接口返回目标字段 |
| List<String> fields = new ArrayList<>(); |
| checkTransformTargetFieldRepeat(fields); |
| } |
| break; |
| case FILTERROWKIND: |
| case REPLACE: |
| default: |
| } |
| } |
| |
| private void checkTransformTargetFieldRepeat(List<String> fields) { |
| Set<String> duplicates = |
| fields.stream() |
| .filter(i -> Collections.frequency(fields, i) > 1) |
| .collect(Collectors.toSet()); |
| if (!duplicates.isEmpty()) { |
| throw new SeatunnelException( |
| SeatunnelErrorEnum.ILLEGAL_STATE, |
| "Can't convert same target name: " + new ArrayList<>(duplicates)); |
| } |
| } |
| |
| @Override |
| public PluginConfig getSingleTask(long jobVersionId, String pluginId) { |
| funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.SINGLE_TASK_DETAIL, 0); |
| return getPluginConfigFromJobTask(jobTaskDao.getTask(jobVersionId, pluginId)); |
| } |
| |
| @Override |
| public <T extends TransformOptions> T getTransformOptions(long jobVersionId, String pluginId) { |
| JobTask jobTask = jobTaskDao.getTask(jobVersionId, pluginId); |
| try { |
| Transform transform = Transform.valueOf(jobTask.getConnectorType().toUpperCase()); |
| return getTransformOption(transform, jobTask.getTransformOptions()); |
| } catch (IOException e) { |
| throw new SeatunnelException(SeatunnelErrorEnum.ILLEGAL_STATE, e.getMessage()); |
| } |
| } |
| |
| @Override |
| public void deleteSingleTask(long jobVersionId, String pluginId) { |
| funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.SINGLE_TASK_DELETE, 0); |
| jobTaskDao.deleteTask(jobVersionId, pluginId); |
| } |
| |
| @Override |
| public void deleteTaskByVersionId(long id) { |
| funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.SINGLE_TASK_DELETE, 0); |
| jobTaskDao.deleteTaskByVersionId(id); |
| } |
| |
| private String getConnectorTypeFromDataSource(long datasourceId) { |
| String pluginName = |
| datasourceService |
| .queryDatasourceDetailById(String.valueOf(datasourceId)) |
| .getPluginName(); |
| return connectorDataSourceMapperConfig |
| .findConnectorForDatasourceName(pluginName) |
| .orElseThrow( |
| () -> |
| new SeatunnelException( |
| SeatunnelErrorEnum.ILLEGAL_STATE, |
| "can not find connector for datasourceName: " |
| + pluginName)); |
| } |
| |
| private static PluginConfig getPluginConfigFromJobTask(JobTask jobTask) { |
| try { |
| return PluginConfig.builder() |
| .pluginId(jobTask.getPluginId()) |
| .name(jobTask.getName()) |
| .type(PluginType.valueOf(jobTask.getType().toUpperCase())) |
| .dataSourceId(jobTask.getDataSourceId()) |
| .config(jobTask.getConfig()) |
| .connectorType(jobTask.getConnectorType()) |
| .sceneMode( |
| StringUtils.isEmpty(jobTask.getSceneMode()) |
| ? null |
| : SceneMode.valueOf(jobTask.getSceneMode())) |
| .tableOption( |
| StringUtils.isEmpty(jobTask.getDataSourceOption()) |
| ? null |
| : OBJECT_MAPPER.readValue( |
| jobTask.getDataSourceOption(), DataSourceOption.class)) |
| .selectTableFields( |
| StringUtils.isEmpty(jobTask.getSelectTableFields()) |
| ? null |
| : OBJECT_MAPPER.readValue( |
| jobTask.getSelectTableFields(), |
| SelectTableFields.class)) |
| .outputSchema( |
| StringUtils.isEmpty(jobTask.getOutputSchema()) |
| ? null |
| : OBJECT_MAPPER.readValue( |
| jobTask.getOutputSchema(), |
| new TypeReference<List<DatabaseTableSchemaReq>>() {})) |
| .transformOptions( |
| StringUtils.isEmpty(jobTask.getTransformOptions()) |
| ? null |
| : OBJECT_MAPPER.readValue( |
| jobTask.getTransformOptions(), |
| new TypeReference<Map<String, Object>>() {})) |
| .config(jobTask.getConfig()) |
| .build(); |
| } catch (Exception e) { |
| throw new SeatunnelException(SeatunnelErrorEnum.UNKNOWN, e.getMessage()); |
| } |
| } |
| } |