blob: 4e290fc0a143941d504e8e138ab7dd0b25339f54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.service.impl;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.app.config.ConnectorDataSourceMapperConfig;
import org.apache.seatunnel.app.dal.dao.IDatasourceDao;
import org.apache.seatunnel.app.dal.dao.IJobTaskDao;
import org.apache.seatunnel.app.dal.dao.IVirtualTableDao;
import org.apache.seatunnel.app.dal.entity.Datasource;
import org.apache.seatunnel.app.dal.entity.JobTask;
import org.apache.seatunnel.app.dal.entity.VirtualTable;
import org.apache.seatunnel.app.domain.response.PageInfo;
import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes;
import org.apache.seatunnel.app.domain.response.datasource.DatasourceRes;
import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
import org.apache.seatunnel.app.dynamicforms.FormStructure;
import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.permission.enums.SeatunnelResourcePermissionModuleEnum;
import org.apache.seatunnel.app.service.IDatasourceService;
import org.apache.seatunnel.app.service.IJobDefinitionService;
import org.apache.seatunnel.app.service.ITableSchemaService;
import org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory;
import org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl
implements IDatasourceService, ApplicationContextAware {
private static final String VIRTUAL_TABLE_DATABASE_NAME = "default";
@Autowired
@Qualifier("datasourceDaoImpl") private IDatasourceDao datasourceDao;
private ApplicationContext applicationContext;
@Resource private IJobDefinitionService jobDefinitionService;
@Resource(name = "jobTaskDaoImpl")
private IJobTaskDao jobTaskDao;
@Autowired
@Qualifier("virtualTableDaoImpl") private IVirtualTableDao virtualTableDao;
@Autowired private ConnectorDataSourceMapperConfig dataSourceMapperConfig;
protected static final String DEFAULT_DATASOURCE_PLUGIN_VERSION = "1.0.0";
@Override
public String createDatasource(
Integer userId,
String datasourceName,
String pluginName,
String pluginVersion,
String description,
Map<String, String> datasourceConfig)
throws CodeGenerateUtils.CodeGenerateException {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_CREATE, userId);
long uuid = CodeGenerateUtils.getInstance().genCode();
boolean unique = datasourceDao.checkDatasourceNameUnique(datasourceName, 0L);
if (!unique) {
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_NAME_ALREADY_EXISTS, datasourceName);
}
if (MapUtils.isEmpty(datasourceConfig)) {
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_PRAM_NOT_ALLOWED_NULL, "datasourceConfig");
}
String datasourceConfigStr = JsonUtils.toJsonString(datasourceConfig);
Datasource datasource =
Datasource.builder()
.id(uuid)
.createUserId(userId)
.updateUserId(userId)
.datasourceName(datasourceName)
.pluginName(pluginName)
.pluginVersion(pluginVersion)
.description(description)
.datasourceConfig(datasourceConfigStr)
.createTime(new Date())
.updateTime(new Date())
.build();
boolean success = datasourceDao.insertDatasource(datasource);
if (success) {
resourcePostHandle(
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(datasource.getId()),
userId);
return String.valueOf(uuid);
}
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_CREATE_FAILED);
}
@Override
public boolean updateDatasource(
Integer userId,
Long datasourceId,
String datasourceName,
String description,
Map<String, String> datasourceConfig) {
funcAndResourcePermissionCheck(
SeatunnelFuncPermissionKeyConstant.DATASOURCE_UPDATE,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(datasourceId),
userId);
if (datasourceId == null) {
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_PRAM_NOT_ALLOWED_NULL, "datasourceId");
}
Datasource datasource = datasourceDao.selectDatasourceById(datasourceId);
if (datasource == null) {
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId.toString());
}
if (StringUtils.isNotBlank(datasourceName)) {
datasource.setDatasourceName(datasourceName);
boolean unique = datasourceDao.checkDatasourceNameUnique(datasourceName, datasourceId);
if (!unique) {
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_NAME_ALREADY_EXISTS, datasourceName);
}
}
datasource.setUpdateUserId(userId);
datasource.setUpdateTime(new Date());
datasource.setDescription(description);
if (MapUtils.isNotEmpty(datasourceConfig)) {
String configJson = JsonUtils.toJsonString(datasourceConfig);
datasource.setDatasourceConfig(configJson);
}
return datasourceDao.updateDatasourceById(datasource);
}
@Override
public boolean deleteDatasource(Integer userId, Long datasourceId) {
// check role permission
funcAndResourcePermissionCheck(
SeatunnelFuncPermissionKeyConstant.DATASOURCE_DELETE,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(datasourceId),
userId);
// check has job task has used this datasource
List<JobTask> jobTaskList = jobTaskDao.getJobTaskByDataSourceId(datasourceId);
if (!CollectionUtils.isEmpty(jobTaskList)) {
throw new SeatunnelException(SeatunnelErrorEnum.DATA_SOURCE_HAD_USED);
}
// check has virtual table has used this datasource
List<String> virtualDatabaseNames = virtualTableDao.getVirtualDatabaseNames(datasourceId);
if (!CollectionUtils.isEmpty(virtualDatabaseNames)) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_CAN_NOT_DELETE);
}
if (!jobDefinitionService.getJobVersionByDataSourceId(datasourceId).isEmpty()) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_CAN_NOT_DELETE);
}
return datasourceDao.deleteDatasourceById(datasourceId);
}
@Override
public boolean testDatasourceConnectionAble(
Integer userId,
String pluginName,
String pluginVersion,
Map<String, String> datasourceConfig) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_TEST_CONNECT, userId);
return DataSourceClientFactory.getDataSourceClient()
.checkDataSourceConnectivity(pluginName, datasourceConfig);
}
@Override
public boolean testDatasourceConnectionAble(Integer userId, Long datasourceId) {
funcAndResourcePermissionCheck(
SeatunnelFuncPermissionKeyConstant.DATASOURCE_TEST_CONNECT,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(datasourceId),
userId);
Datasource datasource = datasourceDao.selectDatasourceById(datasourceId);
if (datasource == null) {
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId.toString());
}
String configJson = datasource.getDatasourceConfig();
Map<String, String> datasourceConfig =
JsonUtils.toMap(configJson, String.class, String.class);
String pluginName = datasource.getPluginName();
return DataSourceClientFactory.getDataSourceClient()
.checkDataSourceConnectivity(pluginName, datasourceConfig);
}
@Override
public String queryDatasourceNameById(String datasourceId) {
long datasourceIdLong = Long.parseLong(datasourceId);
return datasourceDao.queryDatasourceNameById(datasourceIdLong);
}
@Override
public String getDynamicForm(String pluginName) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_DYNAMIC, 0);
OptionRule optionRule =
DataSourceClientFactory.getDataSourceClient()
.queryDataSourceFieldByName(pluginName);
// If the plugin doesn't have connector will directly use pluginName
String connectorForDatasourceName =
dataSourceMapperConfig
.findConnectorForDatasourceName(pluginName)
.orElse(pluginName);
FormStructure testForm =
SeaTunnelOptionRuleWrapper.wrapper(optionRule, connectorForDatasourceName);
return JsonUtils.toJsonString(testForm);
}
@Override
public boolean checkDatasourceNameUnique(
Integer userId, String datasourceName, Long dataSourceId) {
if (StringUtils.isNotBlank(datasourceName)) {
return datasourceDao.checkDatasourceNameUnique(datasourceName, dataSourceId);
}
return false;
}
@Override
public List<String> queryDatabaseByDatasourceName(String datasourceName) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_DATABASES, 0);
Datasource datasource = datasourceDao.queryDatasourceByName(datasourceName);
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName);
}
String pluginName = datasource.getPluginName();
if (Boolean.FALSE.equals(checkIsSupportVirtualTable(pluginName))) {
String config = datasource.getDatasourceConfig();
Map<String, String> datasourceConfig =
JsonUtils.toMap(config, String.class, String.class);
return DataSourceClientFactory.getDataSourceClient()
.getDatabases(pluginName, datasourceConfig);
}
long dataSourceId = datasource.getId();
boolean hasVirtualTable = virtualTableDao.checkHasVirtualTable(dataSourceId);
if (hasVirtualTable) {
return Collections.singletonList(VIRTUAL_TABLE_DATABASE_NAME);
}
return new ArrayList<>();
}
private boolean checkIsSupportVirtualTable(String pluginName) {
return DataSourceClientFactory.getDataSourceClient().listAllDataSources().stream()
.anyMatch(d -> d.getName().equals(pluginName) && d.getSupportVirtualTables());
}
@Override
public List<String> queryTableNames(
String datasourceName, String databaseName, String filterName, Integer size) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_TABLE, 0);
Datasource datasource = datasourceDao.queryDatasourceByName(datasourceName);
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName);
}
String config = datasource.getDatasourceConfig();
Map<String, String> datasourceConfig = JsonUtils.toMap(config, String.class, String.class);
Map<String, String> options = new HashMap<>();
options.put("size", size.toString());
options.put("filterName", filterName);
String pluginName = datasource.getPluginName();
if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) {
return DataSourceClientFactory.getDataSourceClient()
.getTables(pluginName, databaseName, datasourceConfig, options);
}
long dataSourceId = datasource.getId();
return virtualTableDao.getVirtualTableNames(VIRTUAL_TABLE_DATABASE_NAME, dataSourceId);
}
@Override
public List<String> queryTableNames(String datasourceName, String databaseName) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_TABLE, 0);
Datasource datasource = datasourceDao.queryDatasourceByName(datasourceName);
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName);
}
String config = datasource.getDatasourceConfig();
Map<String, String> datasourceConfig = JsonUtils.toMap(config, String.class, String.class);
Map<String, String> options = new HashMap<>();
String pluginName = datasource.getPluginName();
if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) {
return DataSourceClientFactory.getDataSourceClient()
.getTables(pluginName, databaseName, datasourceConfig, options);
}
long dataSourceId = datasource.getId();
return virtualTableDao.getVirtualTableNames(VIRTUAL_TABLE_DATABASE_NAME, dataSourceId);
}
@Override
public List<TableField> queryTableSchema(
String datasourceName, String databaseName, String tableName) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_TABLE_SCHEMA, 0);
Datasource datasource = datasourceDao.queryDatasourceByName(datasourceName);
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName);
}
String config = datasource.getDatasourceConfig();
Map<String, String> datasourceConfig = JsonUtils.toMap(config, String.class, String.class);
String pluginName = datasource.getPluginName();
ITableSchemaService tableSchemaService =
(ITableSchemaService) applicationContext.getBean("tableSchemaServiceImpl");
if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) {
List<TableField> tableFields =
DataSourceClientFactory.getDataSourceClient()
.getTableFields(pluginName, datasourceConfig, databaseName, tableName);
tableSchemaService.getAddSeaTunnelSchema(tableFields, pluginName);
return tableFields;
}
VirtualTable virtualTable = virtualTableDao.selectVirtualTableByTableName(tableName);
if (virtualTable == null) {
throw new SeatunnelException(SeatunnelErrorEnum.VIRTUAL_TABLE_NOT_FOUND, tableName);
}
// convert virtual table to table field
// virtualTable.getTableFields()
List<TableField> tableFields = convertTableSchema(virtualTable.getTableFields());
tableSchemaService.getAddSeaTunnelSchema(tableFields, pluginName);
return tableFields;
}
private List<TableField> convertTableSchema(String virtualTableFieldJson) {
List<TableField> fields = new ArrayList<>();
List<VirtualTableFieldRes> virtualTableFields =
JsonUtils.toList(virtualTableFieldJson, VirtualTableFieldRes.class);
if (CollectionUtils.isEmpty(virtualTableFields)) {
return fields;
}
virtualTableFields.forEach(
virtualTableField -> {
TableField tableField = new TableField();
tableField.setPrimaryKey(virtualTableField.getPrimaryKey());
tableField.setName(virtualTableField.getFieldName());
tableField.setType(virtualTableField.getFieldType());
tableField.setComment(virtualTableField.getFieldComment());
tableField.setNullable(virtualTableField.getNullable());
tableField.setDefaultValue(virtualTableField.getDefaultValue());
fields.add(tableField);
});
return fields;
}
@Override
public PageInfo<DatasourceRes> queryDatasourceList(
Integer userId, String searchVal, String pluginName, Integer pageNo, Integer pageSize) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_LIST, userId);
Page<Datasource> page = new Page<>(pageNo, pageSize);
PageInfo<DatasourceRes> pageInfo = new PageInfo<>();
List<Long> ids =
availableResourceRange(
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(), userId);
if (org.springframework.util.CollectionUtils.isEmpty(ids)) {
return pageInfo;
}
IPage<Datasource> datasourcePage =
datasourceDao.selectDatasourceByParam(page, ids, searchVal, pluginName);
pageInfo = new PageInfo<>();
pageInfo.setPageNo((int) datasourcePage.getPages());
pageInfo.setPageSize((int) datasourcePage.getSize());
pageInfo.setTotalCount((int) datasourcePage.getTotal());
if (CollectionUtils.isEmpty(datasourcePage.getRecords())) {
pageInfo.setData(new ArrayList<>());
return pageInfo;
}
List<Integer> userIds = new ArrayList<>();
datasourcePage
.getRecords()
.forEach(
datasource -> {
userIds.add(datasource.getCreateUserId());
userIds.add(datasource.getUpdateUserId());
});
List<DatasourceRes> datasourceResList =
datasourcePage.getRecords().stream()
.map(
datasource -> {
DatasourceRes datasourceRes = new DatasourceRes();
datasourceRes.setId(datasource.getId().toString());
datasourceRes.setDatasourceName(datasource.getDatasourceName());
datasourceRes.setPluginName(datasource.getPluginName());
datasourceRes.setPluginVersion(datasource.getPluginVersion());
datasourceRes.setDescription(datasource.getDescription());
datasourceRes.setCreateTime(datasource.getCreateTime());
datasourceRes.setUpdateTime(datasource.getUpdateTime());
Map<String, String> datasourceConfig =
JsonUtils.toMap(
datasource.getDatasourceConfig(),
String.class,
String.class);
datasourceRes.setDatasourceConfig(datasourceConfig);
datasourceRes.setCreateUserId(datasource.getCreateUserId());
datasourceRes.setUpdateUserId(datasource.getUpdateUserId());
datasourceRes.setUpdateTime(datasource.getUpdateTime());
return datasourceRes;
})
.collect(Collectors.toList());
pageInfo.setData(datasourceResList);
return pageInfo;
}
@Override
public List<DataSourcePluginInfo> queryAllDatasources() {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_QUERY_ALL, 0);
return DataSourceClientFactory.getDataSourceClient().listAllDataSources();
}
@Override
public List<DataSourcePluginInfo> queryAllDatasourcesByType(Integer type) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_QUERY_ALL, 0);
return DataSourceClientFactory.getDataSourceClient().listAllDataSources().stream()
.map(
dataSourcePluginInfo ->
type.equals(dataSourcePluginInfo.getType())
? dataSourcePluginInfo
: null)
.collect(Collectors.toList());
}
@Override
public Map<Integer, List<DataSourcePluginInfo>> queryAllDatasourcesGroupByType(
Boolean onlyShowVirtualDatasource) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_QUERY_ALL, 0);
Map<Integer, List<DataSourcePluginInfo>> dataSourcePluginInfoMap = new HashMap<>();
for (DatasourcePluginTypeEnum value : DatasourcePluginTypeEnum.values()) {
dataSourcePluginInfoMap.put(value.getCode(), new ArrayList<>());
}
List<DataSourcePluginInfo> dataSourcePluginInfos =
DataSourceClientFactory.getDataSourceClient().listAllDataSources();
for (DataSourcePluginInfo dataSourcePluginInfo : dataSourcePluginInfos) {
// query datasource types
if (BooleanUtils.isNotTrue(onlyShowVirtualDatasource)) {
List<DataSourcePluginInfo> dataSourcePluginInfoList =
dataSourcePluginInfoMap.computeIfAbsent(
dataSourcePluginInfo.getType(), k -> new ArrayList<>());
dataSourcePluginInfoList.add(dataSourcePluginInfo);
continue;
}
if (Boolean.TRUE.equals(dataSourcePluginInfo.getSupportVirtualTables())) {
List<DataSourcePluginInfo> dataSourcePluginInfoList =
dataSourcePluginInfoMap.computeIfAbsent(
dataSourcePluginInfo.getType(), k -> new ArrayList<>());
dataSourcePluginInfoList.add(dataSourcePluginInfo);
}
}
return dataSourcePluginInfoMap;
}
@Override
public Map<String, String> queryDatasourceConfigById(String datasourceId) {
long datasourceIdLong = Long.parseLong(datasourceId);
Datasource datasource = datasourceDao.selectDatasourceById(datasourceIdLong);
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId);
}
String configJson = datasource.getDatasourceConfig();
return JsonUtils.toMap(configJson, String.class, String.class);
}
@Override
public Map<String, String> queryDatasourceNameByPluginName(String pluginName) {
Map<String, String> datasourceNameMap = new HashMap<>();
List<Datasource> datasourceList =
datasourceDao.selectDatasourceByPluginName(
pluginName, DEFAULT_DATASOURCE_PLUGIN_VERSION);
datasourceList.forEach(
datasource ->
datasourceNameMap.put(
datasource.getId().toString(), datasource.getDatasourceName()));
return datasourceNameMap;
}
@Override
public OptionRule queryOptionRuleByPluginName(String pluginName) {
return DataSourceClientFactory.getDataSourceClient().queryDataSourceFieldByName(pluginName);
}
@Override
public OptionRule queryVirtualTableOptionRuleByPluginName(String pluginName) {
if (checkIsSupportVirtualTable(pluginName)) {
return DataSourceClientFactory.getDataSourceClient()
.queryMetadataFieldByName(pluginName);
}
return OptionRule.builder().build();
}
@Override
public List<DatasourceDetailRes> queryDatasourceDetailListByDatasourceIds(
List<String> datasourceIds) {
if (CollectionUtils.isEmpty(datasourceIds)) {
return new ArrayList<>();
}
funcAndResourcePermissionCheck(
SeatunnelFuncPermissionKeyConstant.DATASOURCE_DETAIL_LIST,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
datasourceIds,
0);
List<Long> datasourceIdsLong =
datasourceIds.stream().map(Long::parseLong).collect(Collectors.toList());
List<Datasource> datasourceList = datasourceDao.selectDatasourceByIds(datasourceIdsLong);
return convertDatasourceDetailRes(datasourceList);
}
private List<DatasourceDetailRes> convertDatasourceDetailRes(List<Datasource> datasourceList) {
if (CollectionUtils.isEmpty(datasourceList)) {
return new ArrayList<>();
}
List<Integer> userIds = new ArrayList<>();
datasourceList.forEach(
datasource -> {
if (null != datasource.getCreateUserId()) {
userIds.add(datasource.getCreateUserId());
}
if (null != datasource.getUpdateUserId()) {
userIds.add(datasource.getUpdateUserId());
}
});
List<DatasourceDetailRes> datasourceDetailResList = new ArrayList<>();
datasourceList.forEach(
datasource -> {
DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
datasourceDetailRes.setId(datasource.getId().toString());
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
datasourceDetailRes.setPluginName(datasource.getPluginName());
datasourceDetailRes.setPluginVersion(datasource.getPluginVersion());
datasourceDetailRes.setDescription(datasource.getDescription());
datasourceDetailRes.setCreateTime(datasource.getCreateTime());
datasourceDetailRes.setUpdateTime(datasource.getUpdateTime());
Map<String, String> config = JsonUtils.toMap(datasource.getDatasourceConfig());
datasourceDetailRes.setDatasourceConfig(config);
datasourceDetailResList.add(datasourceDetailRes);
});
return datasourceDetailResList;
}
@Override
public List<DatasourceDetailRes> queryAllDatasourcesInstance() {
List<Datasource> datasourceList = datasourceDao.queryAll();
return convertDatasourceDetailRes(datasourceList);
}
@Override
public DatasourceDetailRes queryDatasourceDetailByDatasourceName(String datasourceName) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_DETAIL, 0);
Datasource datasource = datasourceDao.queryDatasourceByName(datasourceName);
// @cc liuli
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName);
}
DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
datasourceDetailRes.setId(datasource.getId().toString());
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
datasourceDetailRes.setPluginName(datasource.getPluginName());
datasourceDetailRes.setPluginVersion(datasource.getPluginVersion());
datasourceDetailRes.setDescription(datasource.getDescription());
datasourceDetailRes.setCreateTime(datasource.getCreateTime());
datasourceDetailRes.setUpdateTime(datasource.getUpdateTime());
Map<String, String> datasourceConfig =
JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class);
// convert option rule
datasourceDetailRes.setDatasourceConfig(datasourceConfig);
return datasourceDetailRes;
}
@Override
public DatasourceDetailRes queryDatasourceDetailById(String datasourceId) {
long datasourceIdLong = Long.parseLong(datasourceId);
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_DETAIL, 0);
Datasource datasource = datasourceDao.selectDatasourceById(datasourceIdLong);
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId);
}
DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
datasourceDetailRes.setId(datasource.getId().toString());
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
datasourceDetailRes.setPluginName(datasource.getPluginName());
datasourceDetailRes.setPluginVersion(datasource.getPluginVersion());
datasourceDetailRes.setDescription(datasource.getDescription());
datasourceDetailRes.setCreateTime(datasource.getCreateTime());
datasourceDetailRes.setUpdateTime(datasource.getUpdateTime());
Map<String, String> datasourceConfig =
JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class);
// convert option rule
datasourceDetailRes.setDatasourceConfig(datasourceConfig);
return datasourceDetailRes;
}
@Override
public DatasourceDetailRes queryDatasourceDetailById(Integer userId, String datasourceId) {
// check user
funcAndResourcePermissionCheck(
SeatunnelFuncPermissionKeyConstant.DATASOURCE_DETAIL,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(Long.parseLong(datasourceId)),
userId);
return this.queryDatasourceDetailById(datasourceId);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}