| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.dolphinscheduler.api.service.impl; |
| |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DATASOURCE_DELETE; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DATASOURCE_UPDATE; |
| |
| import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; |
| import org.apache.dolphinscheduler.api.enums.Status; |
| import org.apache.dolphinscheduler.api.exceptions.ServiceException; |
| import org.apache.dolphinscheduler.api.service.DataSourceService; |
| import org.apache.dolphinscheduler.api.utils.PageInfo; |
| import org.apache.dolphinscheduler.common.constants.Constants; |
| import org.apache.dolphinscheduler.common.enums.AuthorizationType; |
| import org.apache.dolphinscheduler.common.enums.UserType; |
| import org.apache.dolphinscheduler.common.utils.JSONUtils; |
| import org.apache.dolphinscheduler.dao.entity.DataSource; |
| import org.apache.dolphinscheduler.dao.entity.User; |
| import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; |
| import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; |
| import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; |
| import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; |
| import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; |
| import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; |
| import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; |
| import org.apache.dolphinscheduler.spi.enums.DbType; |
| import org.apache.dolphinscheduler.spi.params.base.ParamsOptions; |
| |
| import org.apache.commons.collections4.CollectionUtils; |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.dao.DuplicateKeyException; |
| import org.springframework.stereotype.Service; |
| import org.springframework.transaction.annotation.Transactional; |
| |
| import com.baomidou.mybatisplus.core.metadata.IPage; |
| import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| |
| /** |
| * data source service impl |
| */ |
| @Service |
| @Slf4j |
| public class DataSourceServiceImpl extends BaseServiceImpl implements DataSourceService { |
| |
| @Autowired |
| private DataSourceMapper dataSourceMapper; |
| |
| @Autowired |
| private DataSourceUserMapper datasourceUserMapper; |
| |
| private static final String TABLE = "TABLE"; |
| private static final String VIEW = "VIEW"; |
| private static final String[] TABLE_TYPES = new String[]{TABLE, VIEW}; |
| private static final String TABLE_NAME = "TABLE_NAME"; |
| private static final String COLUMN_NAME = "COLUMN_NAME"; |
| |
| /** |
| * create data source |
| * |
| * @param loginUser login user |
| * @param datasourceParam datasource parameters |
| * @return create result code |
| */ |
| @Override |
| public DataSource createDataSource(User loginUser, BaseDataSourceParamDTO datasourceParam) { |
| DataSourceUtils.checkDatasourceParam(datasourceParam); |
| if (!canOperatorPermissions(loginUser, null, AuthorizationType.DATASOURCE, |
| ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) { |
| throw new ServiceException(Status.USER_NO_OPERATION_PERM); |
| } |
| // check name can use or not |
| if (checkName(datasourceParam.getName())) { |
| throw new ServiceException(Status.DATASOURCE_EXIST); |
| } |
| if (checkDescriptionLength(datasourceParam.getNote())) { |
| throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR); |
| } |
| ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(datasourceParam); |
| |
| // build datasource |
| DataSource dataSource = new DataSource(); |
| Date now = new Date(); |
| |
| dataSource.setName(datasourceParam.getName().trim()); |
| dataSource.setNote(datasourceParam.getNote()); |
| dataSource.setUserId(loginUser.getId()); |
| dataSource.setUserName(loginUser.getUserName()); |
| dataSource.setType(datasourceParam.getType()); |
| dataSource.setConnectionParams(JSONUtils.toJsonString(connectionParam)); |
| dataSource.setCreateTime(now); |
| dataSource.setUpdateTime(now); |
| try { |
| dataSourceMapper.insert(dataSource); |
| return dataSource; |
| } catch (DuplicateKeyException ex) { |
| throw new ServiceException(Status.DATASOURCE_EXIST); |
| } |
| } |
| |
| /** |
| * updateProcessInstance datasource |
| * |
| * @param loginUser login user |
| * @return update result code |
| */ |
| @Override |
| public DataSource updateDataSource(User loginUser, BaseDataSourceParamDTO dataSourceParam) { |
| DataSourceUtils.checkDatasourceParam(dataSourceParam); |
| // determine whether the data source exists |
| DataSource dataSource = dataSourceMapper.selectById(dataSourceParam.getId()); |
| if (dataSource == null) { |
| throw new ServiceException(Status.RESOURCE_NOT_EXIST); |
| } |
| |
| if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, |
| DATASOURCE_UPDATE)) { |
| throw new ServiceException(Status.USER_NO_OPERATION_PERM); |
| } |
| |
| // check name can use or not |
| if (!dataSourceParam.getName().trim().equals(dataSource.getName()) && checkName(dataSourceParam.getName())) { |
| throw new ServiceException(Status.DATASOURCE_EXIST); |
| } |
| if (checkDescriptionLength(dataSourceParam.getNote())) { |
| throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR); |
| } |
| // check password,if the password is not updated, set to the old password. |
| ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(dataSourceParam); |
| |
| String password = connectionParam.getPassword(); |
| |
| if (StringUtils.isBlank(password)) { |
| String oldConnectionParams = dataSource.getConnectionParams(); |
| ObjectNode oldParams = JSONUtils.parseObject(oldConnectionParams); |
| connectionParam.setPassword(oldParams.path(Constants.PASSWORD).asText()); |
| } |
| |
| Date now = new Date(); |
| |
| dataSource.setName(dataSourceParam.getName().trim()); |
| dataSource.setNote(dataSourceParam.getNote()); |
| dataSource.setUserName(loginUser.getUserName()); |
| dataSource.setType(dataSource.getType()); |
| dataSource.setConnectionParams(JSONUtils.toJsonString(connectionParam)); |
| dataSource.setUpdateTime(now); |
| try { |
| dataSourceMapper.updateById(dataSource); |
| return dataSource; |
| } catch (DuplicateKeyException ex) { |
| throw new ServiceException(Status.DATASOURCE_EXIST); |
| } |
| } |
| |
| private boolean checkName(String name) { |
| List<DataSource> queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim()); |
| return queryDataSource != null && !queryDataSource.isEmpty(); |
| } |
| |
| /** |
| * updateProcessInstance datasource |
| * |
| * @param id datasource id |
| * @return data source detail |
| */ |
| @Override |
| public BaseDataSourceParamDTO queryDataSource(int id, User loginUser) { |
| DataSource dataSource = dataSourceMapper.selectById(id); |
| if (dataSource == null) { |
| log.error("Datasource does not exist, id:{}.", id); |
| throw new ServiceException(Status.RESOURCE_NOT_EXIST); |
| } |
| |
| if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, |
| ApiFuncIdentificationConstant.DATASOURCE)) { |
| throw new ServiceException(Status.USER_NO_OPERATION_PERM); |
| } |
| |
| // type |
| BaseDataSourceParamDTO baseDataSourceParamDTO = DataSourceUtils.buildDatasourceParamDTO( |
| dataSource.getType(), dataSource.getConnectionParams()); |
| baseDataSourceParamDTO.setId(dataSource.getId()); |
| baseDataSourceParamDTO.setName(dataSource.getName()); |
| baseDataSourceParamDTO.setNote(dataSource.getNote()); |
| baseDataSourceParamDTO.setPassword(getHiddenPassword()); |
| |
| return baseDataSourceParamDTO; |
| } |
| |
| /** |
| * query datasource list by keyword |
| * |
| * @param loginUser login user |
| * @param searchVal search value |
| * @param pageNo page number |
| * @param pageSize page size |
| * @return data source list page |
| */ |
| @Override |
| public PageInfo<DataSource> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, |
| Integer pageSize) { |
| IPage<DataSource> dataSourceList; |
| Page<DataSource> dataSourcePage = new Page<>(pageNo, pageSize); |
| PageInfo<DataSource> pageInfo = new PageInfo<>(pageNo, pageSize); |
| if (loginUser.getUserType().equals(UserType.ADMIN_USER)) { |
| dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal); |
| } else { |
| Set<Integer> ids = resourcePermissionCheckService |
| .userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), log); |
| if (ids.isEmpty()) { |
| return pageInfo; |
| } |
| dataSourceList = dataSourceMapper.selectPagingByIds(dataSourcePage, new ArrayList<>(ids), searchVal); |
| } |
| |
| List<DataSource> dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>(); |
| handlePasswd(dataSources); |
| pageInfo.setTotal((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L)); |
| pageInfo.setTotalList(dataSources); |
| return pageInfo; |
| } |
| |
| /** |
| * handle datasource connection password for safety |
| */ |
| private void handlePasswd(List<DataSource> dataSourceList) { |
| for (DataSource dataSource : dataSourceList) { |
| String connectionParams = dataSource.getConnectionParams(); |
| ObjectNode object = JSONUtils.parseObject(connectionParams); |
| object.put(Constants.PASSWORD, getHiddenPassword()); |
| dataSource.setConnectionParams(object.toString()); |
| } |
| } |
| |
| /** |
| * get hidden password (resolve the security hotspot) |
| * |
| * @return hidden password |
| */ |
| private String getHiddenPassword() { |
| return Constants.XXXXXX; |
| } |
| |
| /** |
| * query data resource list |
| * |
| * @param loginUser login user |
| * @param type data source type |
| * @return data source list page |
| */ |
| @Override |
| public List<DataSource> queryDataSourceList(User loginUser, Integer type) { |
| |
| List<DataSource> datasourceList; |
| if (loginUser.getUserType().equals(UserType.ADMIN_USER)) { |
| datasourceList = dataSourceMapper.queryDataSourceByType(0, type); |
| } else { |
| Set<Integer> ids = resourcePermissionCheckService |
| .userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), log); |
| if (ids.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| datasourceList = dataSourceMapper.selectBatchIds(ids).stream() |
| .filter(dataSource -> dataSource.getType().getCode() == type).collect(Collectors.toList()); |
| } |
| |
| return datasourceList; |
| } |
| |
| /** |
| * verify datasource exists |
| * |
| * @param name datasource name |
| * @return true if data datasource not exists, otherwise return false |
| */ |
| @Override |
| public void verifyDataSourceName(String name) { |
| List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name); |
| if (dataSourceList != null && !dataSourceList.isEmpty()) { |
| throw new ServiceException(Status.DATASOURCE_EXIST); |
| } |
| } |
| |
| /** |
| * check connection |
| * |
| * @param type data source type |
| * @param connectionParam connectionParam |
| * @return true if connect successfully, otherwise false |
| * @return true if connect successfully, otherwise false |
| */ |
| @Override |
| public void checkConnection(DbType type, ConnectionParam connectionParam) { |
| DataSourceProcessor sshDataSourceProcessor = DataSourceUtils.getDatasourceProcessor(type); |
| boolean connectivity = sshDataSourceProcessor.checkDataSourceConnectivity(connectionParam); |
| if (connectivity) { |
| return; |
| } |
| throw new ServiceException(Status.CONNECTION_TEST_FAILURE); |
| } |
| |
| /** |
| * test connection |
| * |
| * @param id datasource id |
| * @return connect result code |
| */ |
| @Override |
| public void connectionTest(int id) { |
| DataSource dataSource = dataSourceMapper.selectById(id); |
| if (dataSource == null) { |
| throw new ServiceException(Status.RESOURCE_NOT_EXIST); |
| } |
| checkConnection(dataSource.getType(), |
| DataSourceUtils.buildConnectionParams(dataSource.getType(), dataSource.getConnectionParams())); |
| } |
| |
| /** |
| * delete datasource |
| * |
| * @param loginUser login user |
| * @param datasourceId data source id |
| * @return delete result code |
| */ |
| @Override |
| @Transactional |
| public void delete(User loginUser, int datasourceId) { |
| // query datasource by id |
| DataSource dataSource = dataSourceMapper.selectById(datasourceId); |
| if (dataSource == null) { |
| throw new ServiceException(Status.RESOURCE_NOT_EXIST); |
| } |
| if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, |
| DATASOURCE_DELETE)) { |
| throw new ServiceException(Status.USER_NO_OPERATION_PERM); |
| } |
| dataSourceMapper.deleteById(datasourceId); |
| datasourceUserMapper.deleteByDatasourceId(datasourceId); |
| } |
| |
| /** |
| * unauthorized datasource |
| * |
| * @param loginUser login user |
| * @param userId user id |
| * @return unauthed data source result code |
| */ |
| @Override |
| public List<DataSource> unAuthDatasource(User loginUser, Integer userId) { |
| List<DataSource> datasourceList; |
| if (canOperatorPermissions(loginUser, null, AuthorizationType.DATASOURCE, null)) { |
| // admin gets all data sources except userId |
| datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId); |
| } else { |
| // non-admins users get their own data sources |
| datasourceList = dataSourceMapper.selectByMap(Collections.singletonMap("user_id", loginUser.getId())); |
| } |
| List<DataSource> resultList = new ArrayList<>(); |
| Set<DataSource> datasourceSet; |
| if (datasourceList != null && !datasourceList.isEmpty()) { |
| datasourceSet = new HashSet<>(datasourceList); |
| |
| List<DataSource> authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId); |
| |
| Set<DataSource> authedDataSourceSet; |
| if (authedDataSourceList != null && !authedDataSourceList.isEmpty()) { |
| authedDataSourceSet = new HashSet<>(authedDataSourceList); |
| datasourceSet.removeAll(authedDataSourceSet); |
| } |
| resultList = new ArrayList<>(datasourceSet); |
| } |
| return resultList; |
| } |
| |
| /** |
| * authorized datasource |
| * |
| * @param loginUser login user |
| * @param userId user id |
| * @return authorized result code |
| */ |
| @Override |
| public List<DataSource> authedDatasource(User loginUser, Integer userId) { |
| List<DataSource> authedDatasourceList = dataSourceMapper.queryAuthedDatasource(userId); |
| return authedDatasourceList; |
| } |
| |
| @Override |
| public List<ParamsOptions> getTables(Integer datasourceId, String database) { |
| DataSource dataSource = dataSourceMapper.selectById(datasourceId); |
| |
| List<String> tableList; |
| BaseConnectionParam connectionParam = |
| (BaseConnectionParam) DataSourceUtils.buildConnectionParams( |
| dataSource.getType(), |
| dataSource.getConnectionParams()); |
| |
| if (null == connectionParam) { |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } |
| |
| Connection connection = |
| DataSourceUtils.getConnection(dataSource.getType(), connectionParam); |
| ResultSet tables = null; |
| |
| try { |
| |
| if (null == connection) { |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } |
| |
| DatabaseMetaData metaData = connection.getMetaData(); |
| String schema = null; |
| try { |
| schema = metaData.getConnection().getSchema(); |
| } catch (SQLException e) { |
| log.error("Cant not get the schema, datasourceId:{}.", datasourceId, e); |
| throw new ServiceException(Status.GET_DATASOURCE_TABLES_ERROR); |
| } |
| |
| tables = metaData.getTables( |
| database, |
| getDbSchemaPattern(dataSource.getType(), schema, connectionParam), |
| "%", TABLE_TYPES); |
| if (null == tables) { |
| log.error("Get datasource tables error, datasourceId:{}.", datasourceId); |
| throw new ServiceException(Status.GET_DATASOURCE_TABLES_ERROR); |
| } |
| |
| tableList = new ArrayList<>(); |
| while (tables.next()) { |
| String name = tables.getString(TABLE_NAME); |
| tableList.add(name); |
| } |
| |
| } catch (Exception e) { |
| log.error("Get datasource tables error, datasourceId:{}.", datasourceId, e); |
| throw new ServiceException(Status.GET_DATASOURCE_TABLES_ERROR); |
| } finally { |
| closeResult(tables); |
| releaseConnection(connection); |
| } |
| |
| List<ParamsOptions> options = getParamsOptions(tableList); |
| return options; |
| } |
| |
| @Override |
| public List<ParamsOptions> getTableColumns(Integer datasourceId, String database, String tableName) { |
| DataSource dataSource = dataSourceMapper.selectById(datasourceId); |
| BaseConnectionParam connectionParam = |
| (BaseConnectionParam) DataSourceUtils.buildConnectionParams( |
| dataSource.getType(), |
| dataSource.getConnectionParams()); |
| |
| if (null == connectionParam) { |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } |
| |
| Connection connection = |
| DataSourceUtils.getConnection(dataSource.getType(), connectionParam); |
| List<String> columnList = new ArrayList<>(); |
| ResultSet rs = null; |
| |
| try { |
| if (null == connection) { |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } |
| |
| DatabaseMetaData metaData = connection.getMetaData(); |
| |
| if (dataSource.getType() == DbType.ORACLE) { |
| database = null; |
| } |
| rs = metaData.getColumns(database, null, tableName, "%"); |
| if (rs == null) { |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } |
| while (rs.next()) { |
| columnList.add(rs.getString(COLUMN_NAME)); |
| } |
| } catch (Exception e) { |
| log.error("Get datasource table columns error, datasourceId:{}.", dataSource.getId(), e); |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } finally { |
| closeResult(rs); |
| releaseConnection(connection); |
| } |
| |
| List<ParamsOptions> options = getParamsOptions(columnList); |
| return options; |
| } |
| |
| @Override |
| public List<ParamsOptions> getDatabases(Integer datasourceId) { |
| |
| DataSource dataSource = dataSourceMapper.selectById(datasourceId); |
| |
| if (dataSource == null) { |
| throw new ServiceException(Status.QUERY_DATASOURCE_ERROR); |
| } |
| |
| List<String> tableList; |
| BaseConnectionParam connectionParam = |
| (BaseConnectionParam) DataSourceUtils.buildConnectionParams( |
| dataSource.getType(), |
| dataSource.getConnectionParams()); |
| |
| if (null == connectionParam) { |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } |
| |
| Connection connection = |
| DataSourceUtils.getConnection(dataSource.getType(), connectionParam); |
| ResultSet rs = null; |
| |
| try { |
| if (null == connection) { |
| throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED); |
| } |
| if (dataSource.getType() == DbType.POSTGRESQL) { |
| rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY_PG); |
| } else { |
| rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY); |
| } |
| tableList = new ArrayList<>(); |
| while (rs.next()) { |
| String name = rs.getString(1); |
| tableList.add(name); |
| } |
| } catch (Exception e) { |
| log.error("Get databases error, datasourceId:{}.", datasourceId, e); |
| throw new ServiceException(Status.GET_DATASOURCE_TABLES_ERROR); |
| } finally { |
| closeResult(rs); |
| releaseConnection(connection); |
| } |
| |
| List<ParamsOptions> options = getParamsOptions(tableList); |
| return options; |
| } |
| |
| private List<ParamsOptions> getParamsOptions(List<String> columnList) { |
| List<ParamsOptions> options = null; |
| if (CollectionUtils.isNotEmpty(columnList)) { |
| options = new ArrayList<>(); |
| |
| for (String column : columnList) { |
| ParamsOptions childrenOption = |
| new ParamsOptions(column, column, false); |
| options.add(childrenOption); |
| } |
| } |
| return options; |
| } |
| |
| private String getDbSchemaPattern(DbType dbType, String schema, BaseConnectionParam connectionParam) { |
| if (dbType == null) { |
| return null; |
| } |
| String schemaPattern = null; |
| switch (dbType) { |
| case HIVE: |
| schemaPattern = connectionParam.getDatabase(); |
| break; |
| case ORACLE: |
| schemaPattern = connectionParam.getUser(); |
| if (null != schemaPattern) { |
| schemaPattern = schemaPattern.toUpperCase(); |
| } |
| break; |
| case SQLSERVER: |
| schemaPattern = "dbo"; |
| break; |
| case CLICKHOUSE: |
| case DATABEND: |
| case PRESTO: |
| if (!StringUtils.isEmpty(schema)) { |
| schemaPattern = schema; |
| } |
| break; |
| default: |
| break; |
| } |
| return schemaPattern; |
| } |
| |
| private static void releaseConnection(Connection connection) { |
| if (null != connection) { |
| try { |
| connection.close(); |
| } catch (Exception e) { |
| log.error("Connection release error", e); |
| } |
| } |
| } |
| |
| private static void closeResult(ResultSet rs) { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (Exception e) { |
| log.error("ResultSet close error", e); |
| } |
| } |
| } |
| |
| } |