blob: fc429061305e68fb42549da0ccf0a72fd5f3eb48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.dashboard.console.service.connection.impl;
import org.apache.eventmesh.dashboard.console.annotation.EmLog;
import org.apache.eventmesh.dashboard.console.entity.config.ConfigEntity;
import org.apache.eventmesh.dashboard.console.entity.connection.ConnectionEntity;
import org.apache.eventmesh.dashboard.console.entity.connector.ConnectorEntity;
import org.apache.eventmesh.dashboard.console.mapper.config.ConfigMapper;
import org.apache.eventmesh.dashboard.console.mapper.connection.ConnectionMapper;
import org.apache.eventmesh.dashboard.console.mapper.connector.ConnectorMapper;
import org.apache.eventmesh.dashboard.console.modle.dto.connection.AddConnectionDTO;
import org.apache.eventmesh.dashboard.console.modle.dto.connection.CreateConnectionDTO;
import org.apache.eventmesh.dashboard.console.modle.dto.connection.GetConnectionListDTO;
import org.apache.eventmesh.dashboard.console.modle.vo.connection.ConnectionListVO;
import org.apache.eventmesh.dashboard.console.service.connection.ConnectionDataService;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ConnectionDataServiceDatabaseImpl implements ConnectionDataService {
@Autowired
private ConnectionMapper connectionMapper;
@Autowired
private ConnectorMapper connectorMapper;
@Autowired
private ConfigMapper configMapper;
@Override
public ConnectorEntity getConnectorById(Long connectorId) {
ConnectorEntity connectorEntity = new ConnectorEntity();
connectorEntity.setId(connectorId);
return connectorMapper.selectById(connectorEntity);
}
@Override
public List<String> getConnectorBusinessType(String type) {
ConfigEntity config = new ConfigEntity();
config.setBusinessType(type);
return configMapper.selectConnectorBusinessType(config);
}
@Override
public List<ConnectionEntity> getAllConnectionsByClusterId(Long clusterId) {
ConnectionEntity connectionEntity = new ConnectionEntity();
connectionEntity.setClusterId(clusterId);
return connectionMapper.selectByClusterId(connectionEntity);
}
@Override
public Long insert(ConnectionEntity connectionEntity) {
return connectionMapper.insert(connectionEntity);
}
@EmLog(OprType = "add", OprTarget = "Connection")
@Override
public boolean createConnection(CreateConnectionDTO createConnectionDTO) {
ConnectorEntity sinkConnector = this.createSinkConnector(createConnectionDTO.getClusterId(), createConnectionDTO.getAddConnectionDTO());
ConnectorEntity sourceConnector = this.createSourceConnector(createConnectionDTO.getClusterId(), createConnectionDTO.getAddConnectionDTO());
ConnectionEntity connectionEntity = this.setConnection(createConnectionDTO);
connectionEntity.setSinkId(sinkConnector.getId());
connectionEntity.setSourceId(sourceConnector.getId());
connectionMapper.insert(connectionEntity);
this.addConnectorConfigs(createConnectionDTO.getAddConnectorConfigDTO().getSinkConnectorConfigs(), sinkConnector);
this.addConnectorConfigs(createConnectionDTO.getAddConnectorConfigDTO().getSourceConnectorConfigs(), sourceConnector);
return false;
}
private ConnectionEntity setConnection(CreateConnectionDTO createConnectionDTO) {
ConnectionEntity connectionEntity = new ConnectionEntity();
connectionEntity.setClusterId(createConnectionDTO.getClusterId());
connectionEntity.setSourceType("connector");
connectionEntity.setSinkType("connector");
connectionEntity.setRuntimeId(-1L);
connectionEntity.setGroupId(createConnectionDTO.getAddConnectionDTO().getGroupId());
connectionEntity.setStatus(1);
connectionEntity.setDescription(createConnectionDTO.getAddConnectionDTO().getConnectionDescription());
connectionEntity.setTopic(createConnectionDTO.getAddConnectionDTO().getTopicName());
return connectionEntity;
}
public void addConnectorConfigs(List<ConfigEntity> configEntityList, ConnectorEntity connectorEntity) {
configEntityList.forEach(n -> {
n.setInstanceId(connectorEntity.getId());
n.setIsDefault(0);
n.setClusterId(connectorEntity.getClusterId());
});
configMapper.batchInsert(configEntityList);
}
public ConnectorEntity createSinkConnector(Long clusterId, AddConnectionDTO addConnectionDTO) {
ConnectorEntity connectorEntity = new ConnectorEntity();
connectorEntity.setName(addConnectionDTO.getSinkName());
connectorEntity.setHost(addConnectionDTO.getSinkHost());
connectorEntity.setClusterId(clusterId);
connectorEntity.setClassName(addConnectionDTO.getSinkClass());
connectorEntity.setType("Connector");
connectorEntity.setStatus(1);
connectorEntity.setPodState(0);
connectorEntity.setPort(addConnectionDTO.getSinkPort());
connectorMapper.insert(connectorEntity);
return connectorEntity;
}
public ConnectorEntity createSourceConnector(Long clusterId, AddConnectionDTO addConnectionDTO) {
ConnectorEntity connectorEntity = new ConnectorEntity();
connectorEntity.setName(addConnectionDTO.getSourceName());
connectorEntity.setHost(addConnectionDTO.getSourceHost());
connectorEntity.setClusterId(clusterId);
connectorEntity.setClassName(addConnectionDTO.getSourceClass());
connectorEntity.setType("Connector");
connectorEntity.setStatus(1);
connectorEntity.setPodState(0);
connectorEntity.setPort(addConnectionDTO.getSourcePort());
connectorMapper.insert(connectorEntity);
return connectorEntity;
}
@Override
public List<ConnectionEntity> getAllConnections() {
return connectionMapper.selectAll();
}
public ConnectionEntity setSearchCriteria(GetConnectionListDTO getConnectionListDTO, ConnectionEntity connectionEntity) {
connectionEntity.setTopic(getConnectionListDTO.getTopicName());
return connectionEntity;
}
@Override
public List<ConnectionListVO> getConnectionToFrontByCluster(Long clusterId, GetConnectionListDTO getConnectionListDTO) {
ConnectionEntity connectionEntity = new ConnectionEntity();
connectionEntity.setClusterId(clusterId);
connectionEntity = this.setSearchCriteria(getConnectionListDTO, connectionEntity);
List<ConnectionEntity> allConnectionsByClusterId = connectionMapper.selectToFrontByClusterId(connectionEntity);
List<ConnectionListVO> connectionListVOs = new ArrayList<>();
allConnectionsByClusterId.forEach(n -> {
connectionListVOs.add(this.setConnectionListVO(n));
});
return connectionListVOs;
}
private ConnectionListVO setConnectionListVO(ConnectionEntity connectionEntity) {
ConnectionListVO connectionListVO = new ConnectionListVO();
ConnectorEntity connectorEntity = new ConnectorEntity();
connectorEntity.setId(connectionEntity.getSinkId());
ConnectorEntity sinkConnector = connectorMapper.selectById(connectorEntity);
connectorEntity.setId(connectionEntity.getSourceId());
ConnectorEntity sourceConnector = connectorMapper.selectById(connectorEntity);
connectionListVO.setSinkClass(sinkConnector.getClassName());
connectionListVO.setSourceClass(sourceConnector.getClassName());
connectionListVO.setSinkConnectorId(sinkConnector.getId());
connectionListVO.setSourceConnectorId(sourceConnector.getId());
connectionListVO.setSinkConnectorName(sinkConnector.getName());
connectionListVO.setSourceConnectorName(sourceConnector.getName());
connectionListVO.setTopicName(connectionEntity.getTopic());
connectionListVO.setStatus(connectionEntity.getStatus());
return connectionListVO;
}
@Override
public List<ConfigEntity> getConnectorConfigsByClassAndVersion(String classType, String version) {
ConfigEntity config = new ConfigEntity();
config.setBusinessType(classType);
List<ConfigEntity> configEntityList = configMapper.selectConnectorConfigsByBusinessType(config);
configEntityList.forEach(n -> {
if (!n.matchVersion(version)) {
configEntityList.remove(n);
}
});
return configEntityList;
}
}