blob: cf07c9fd73427e673b657e033bd9721b16a2b945 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ALERT_INSTANCE_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ALERT_PLUGIN_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ALERT_PLUGIN_UPDATE;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.AlertPluginInstanceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.vo.AlertPluginInstanceVO;
import org.apache.dolphinscheduler.common.enums.AlertPluginInstanceType;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.AlertGroup;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper;
import org.apache.dolphinscheduler.extract.alert.IAlertOperator;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
import org.apache.dolphinscheduler.extract.alert.request.AlertTestSendRequest;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* alert plugin instance service impl
*/
@Service
@Lazy
@Slf4j
public class AlertPluginInstanceServiceImpl extends BaseServiceImpl implements AlertPluginInstanceService {
@Autowired
private AlertPluginInstanceMapper alertPluginInstanceMapper;
@Autowired
private PluginDefineMapper pluginDefineMapper;
@Autowired
private AlertGroupMapper alertGroupMapper;
private final Integer GLOBAL_ALERT_GROUP_ID = 2;
@Autowired
private RegistryClient registryClient;
/**
* creat alert plugin instance
*
* @param loginUser login user
* @param pluginDefineId plugin define id
* @param instanceName instance name
* @param pluginInstanceParams plugin instance params
*/
@Override
public AlertPluginInstance create(User loginUser,
int pluginDefineId,
String instanceName,
AlertPluginInstanceType instanceType,
WarningType warningType,
String pluginInstanceParams) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.ALERT_PLUGIN_INSTANCE, ALERT_INSTANCE_CREATE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
AlertPluginInstance alertPluginInstance = new AlertPluginInstance();
String paramsMapJson = parsePluginParamsMap(pluginInstanceParams);
alertPluginInstance.setPluginInstanceParams(paramsMapJson);
alertPluginInstance.setInstanceName(instanceName);
alertPluginInstance.setPluginDefineId(pluginDefineId);
alertPluginInstance.setInstanceType(instanceType);
alertPluginInstance.setWarningType(warningType);
if (alertPluginInstanceMapper.existInstanceName(alertPluginInstance.getInstanceName()) == Boolean.TRUE) {
throw new ServiceException(Status.PLUGIN_INSTANCE_ALREADY_EXISTS);
}
int i = alertPluginInstanceMapper.insert(alertPluginInstance);
if (i > 0) {
log.info("Create alert plugin instance complete, name:{}", alertPluginInstance.getInstanceName());
// global instance will be added into global alert group automatically
if (instanceType == AlertPluginInstanceType.GLOBAL) {
AlertGroup globalAlertGroup = alertGroupMapper.selectById(GLOBAL_ALERT_GROUP_ID);
if (StringUtils.isEmpty(globalAlertGroup.getAlertInstanceIds())) {
globalAlertGroup.setAlertInstanceIds(String.valueOf(alertPluginInstance.getId()));
} else {
List<Integer> ids = Arrays.stream(globalAlertGroup.getAlertInstanceIds().split(","))
.map(s -> Integer.parseInt(s.trim()))
.collect(Collectors.toList());
ids.add(alertPluginInstance.getId());
globalAlertGroup.setAlertInstanceIds(StringUtils.join(ids, ","));
}
alertGroupMapper.updateById(globalAlertGroup);
}
return alertPluginInstance;
}
throw new ServiceException(Status.SAVE_ERROR);
}
/**
* update alert plugin instance
*
* @param loginUser login user
* @param pluginInstanceId plugin instance id
* @param instanceName instance name
* @param pluginInstanceParams plugin instance params
*/
@Override
public AlertPluginInstance updateById(User loginUser, int pluginInstanceId, String instanceName,
WarningType warningType, String pluginInstanceParams) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.ALERT_PLUGIN_INSTANCE, ALERT_PLUGIN_UPDATE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
String paramsMapJson = parsePluginParamsMap(pluginInstanceParams);
AlertPluginInstance alertPluginInstance =
new AlertPluginInstance(pluginInstanceId, paramsMapJson, instanceName, warningType, new Date());
int i = alertPluginInstanceMapper.updateById(alertPluginInstance);
if (i > 0) {
log.info("Update alert plugin instance complete, instanceId:{}, name:{}", alertPluginInstance.getId(),
alertPluginInstance.getInstanceName());
return alertPluginInstance;
}
throw new ServiceException(Status.SAVE_ERROR);
}
/**
* delete alert plugin instance
*
* @param loginUser login user
* @param alertPluginInstanceId id
* @return result
*/
@Override
@Transactional
public void deleteById(User loginUser, int alertPluginInstanceId) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.ALERT_PLUGIN_INSTANCE, ALERT_PLUGIN_DELETE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
AlertPluginInstance alertPluginInstance = alertPluginInstanceMapper.selectById(alertPluginInstanceId);
if (alertPluginInstance.getInstanceType() == AlertPluginInstanceType.GLOBAL) {
// global instance will be removed from global alert group automatically
AlertGroup globalAlertGroup = alertGroupMapper.selectById(GLOBAL_ALERT_GROUP_ID);
List<Integer> ids = Arrays.stream(globalAlertGroup.getAlertInstanceIds().split(","))
.map(s -> Integer.parseInt(s.trim()))
.collect(Collectors.toList());
ids = ids.stream().filter(x -> x != alertPluginInstanceId).collect(Collectors.toList());
globalAlertGroup.setAlertInstanceIds(StringUtils.join(ids, ","));
alertGroupMapper.updateById(globalAlertGroup);
log.info("Remove global alert plugin instance from global alert group automatically, name:{}",
alertPluginInstance.getInstanceName());
} else {
// check if there is an associated alert group
boolean hasAssociatedAlertGroup = checkHasAssociatedAlertGroup(String.valueOf(alertPluginInstanceId));
if (hasAssociatedAlertGroup) {
throw new ServiceException(Status.DELETE_ALERT_PLUGIN_INSTANCE_ERROR_HAS_ALERT_GROUP_ASSOCIATED);
}
}
alertPluginInstanceMapper.deleteById(alertPluginInstanceId);
}
/**
* get alert plugin instance
*
* @param loginUser login user
* @param id get id
* @return alert plugin
*/
@Override
public AlertPluginInstance getById(User loginUser, int id) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.ALERT_PLUGIN_INSTANCE,
ApiFuncIdentificationConstant.ALARM_INSTANCE_MANAGE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
return alertPluginInstanceMapper.selectById(id);
}
@Override
public List<AlertPluginInstanceVO> queryAll() {
List<AlertPluginInstance> alertPluginInstances = alertPluginInstanceMapper.queryAllAlertPluginInstanceList();
return buildPluginInstanceVOList(alertPluginInstances);
}
@Override
public boolean checkExistPluginInstanceName(String pluginInstanceName) {
return alertPluginInstanceMapper.existInstanceName(pluginInstanceName) == Boolean.TRUE;
}
@Override
public PageInfo<AlertPluginInstanceVO> listPaging(User loginUser, String searchVal, int pageNo, int pageSize) {
IPage<AlertPluginInstance> alertPluginInstanceIPage =
alertPluginInstanceMapper.queryByInstanceNamePage(new Page<>(pageNo, pageSize), searchVal);
PageInfo<AlertPluginInstanceVO> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) alertPluginInstanceIPage.getTotal());
pageInfo.setTotalList(buildPluginInstanceVOList(alertPluginInstanceIPage.getRecords()));
return pageInfo;
}
private List<AlertPluginInstanceVO> buildPluginInstanceVOList(List<AlertPluginInstance> alertPluginInstances) {
List<AlertPluginInstanceVO> alertPluginInstanceVOS = new ArrayList<>();
if (CollectionUtils.isEmpty(alertPluginInstances)) {
return alertPluginInstanceVOS;
}
List<PluginDefine> pluginDefineList = pluginDefineMapper.queryAllPluginDefineList();
if (CollectionUtils.isEmpty(pluginDefineList)) {
return alertPluginInstanceVOS;
}
Map<Integer, PluginDefine> pluginDefineMap =
pluginDefineList.stream().collect(Collectors.toMap(PluginDefine::getId, Function.identity()));
alertPluginInstances.forEach(alertPluginInstance -> {
AlertPluginInstanceVO alertPluginInstanceVO = new AlertPluginInstanceVO();
alertPluginInstanceVO.setCreateTime(alertPluginInstance.getCreateTime());
alertPluginInstanceVO.setUpdateTime(alertPluginInstance.getUpdateTime());
alertPluginInstanceVO.setPluginDefineId(alertPluginInstance.getPluginDefineId());
alertPluginInstanceVO.setInstanceName(alertPluginInstance.getInstanceName());
alertPluginInstanceVO.setId(alertPluginInstance.getId());
alertPluginInstanceVO.setInstanceType(alertPluginInstance.getInstanceType().getDescp());
if (alertPluginInstance.getWarningType() != null) {
alertPluginInstanceVO.setWarningType(alertPluginInstance.getWarningType().getDescp().toUpperCase());
}
PluginDefine pluginDefine = pluginDefineMap.get(alertPluginInstance.getPluginDefineId());
// FIXME When the user removes the plug-in, this will happen. At this time, maybe we should add a new field
// to indicate that the plug-in has expired?
if (null == pluginDefine) {
return;
}
alertPluginInstanceVO.setAlertPluginName(pluginDefine.getPluginName());
// todo List pages do not recommend returning this parameter
String pluginParamsMapString = alertPluginInstance.getPluginInstanceParams();
String uiPluginParams = parseToPluginUiParams(pluginParamsMapString, pluginDefine.getPluginParams());
alertPluginInstanceVO.setPluginInstanceParams(uiPluginParams);
alertPluginInstanceVOS.add(alertPluginInstanceVO);
});
return alertPluginInstanceVOS;
}
/**
* Get the parameters actually needed by the plugin
*
* @param pluginParams Complete parameters(include ui)
* @return k, v(json string)
*/
private String parsePluginParamsMap(String pluginParams) {
Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(pluginParams);
return JSONUtils.toJsonString(paramsMap);
}
/**
* parse To Plugin Ui Params
*
* @param pluginParamsMapString k-v data
* @param pluginUiParams Complete parameters(include ui)
* @return Complete parameters list(include ui)
*/
private String parseToPluginUiParams(String pluginParamsMapString, String pluginUiParams) {
List<Map<String, Object>> pluginParamsList =
PluginParamsTransfer.generatePluginParams(pluginParamsMapString, pluginUiParams);
return JSONUtils.toJsonString(pluginParamsList);
}
private boolean checkHasAssociatedAlertGroup(String id) {
List<String> idsList = alertGroupMapper.queryInstanceIdsList();
if (CollectionUtils.isEmpty(idsList)) {
return false;
}
Optional<String> first =
idsList.stream().filter(k -> null != k && Arrays.asList(k.split(",")).contains(id)).findFirst();
return first.isPresent();
}
public Optional<Host> getAlertServerAddress() {
List<Server> serverList = registryClient.getServerList(RegistryNodeType.ALERT_SERVER);
if (CollectionUtils.isEmpty(serverList)) {
return Optional.empty();
}
Server server = serverList.get(0);
return Optional.of(new Host(server.getHost(), server.getPort()));
}
@Override
public void testSend(int pluginDefineId, String pluginInstanceParams) {
Optional<Host> alertServerAddressOptional = getAlertServerAddress();
if (!alertServerAddressOptional.isPresent()) {
throw new ServiceException(Status.ALERT_SERVER_NOT_EXIST);
}
Host alertServerAddress = alertServerAddressOptional.get();
AlertTestSendRequest alertTestSendRequest = new AlertTestSendRequest(
pluginDefineId,
pluginInstanceParams);
AlertSendResponse alertSendResponse;
try {
IAlertOperator alertOperator = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(alertServerAddress.getAddress(), IAlertOperator.class);
alertSendResponse = alertOperator.sendTestAlert(alertTestSendRequest);
log.info("Send alert to: {} successfully, response: {}", alertServerAddress, alertSendResponse);
} catch (Exception e) {
log.error("Send alert: {} to: {} failed", alertTestSendRequest, alertServerAddress, e);
throw new ServiceException(Status.ALERT_TEST_SENDING_FAILED, e.getMessage());
}
if (!alertSendResponse.isSuccess()) {
throw new ServiceException(Status.ALERT_TEST_SENDING_FAILED,
alertSendResponse.getResResults().get(0).getMessage());
}
}
}