blob: 87d4e1c066e951973f2d9d8acea79444011991d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.alert.impl;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.AlertConfig;
import org.apache.streampark.console.core.enums.AlertTypeEnum;
import org.apache.streampark.console.core.service.alert.AlertConfigService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.flink.api.java.tuple.Tuple2;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Nonnull;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class AlertServiceImpl implements AlertService {
private final AlertConfigService alertConfigService;
@Override
public boolean alert(Long alertConfigId, AlertTemplate alertTemplate) {
if (alertConfigId == null) {
log.warn("alertConfigId is null");
return false;
}
AlertConfig alertConfig = alertConfigService.getById(alertConfigId);
try {
AlertConfigParams params = AlertConfigParams.of(alertConfig);
List<AlertTypeEnum> alertTypeEnums = AlertTypeEnum.decode(params.getAlertType());
if (CollectionUtils.isEmpty(alertTypeEnums)) {
return true;
}
// No use thread pool, ensure that the alarm can be sent successfully
Tuple2<Boolean, AlertException> reduce = triggerAlert(alertTemplate, alertTypeEnums, params);
if (reduce.f1 != null) {
throw reduce.f1;
}
return reduce.f0;
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return false;
}
@Nonnull
private Tuple2<Boolean, AlertException> triggerAlert(
AlertTemplate alertTemplate, List<AlertTypeEnum> alertTypeEnums, AlertConfigParams params) {
return alertTypeEnums.stream()
.map(
alertTypeEnum -> {
try {
boolean alertRes =
SpringContextUtils.getBean(alertTypeEnum.getClazz())
.doAlert(params, alertTemplate);
return new Tuple2<Boolean, AlertException>(alertRes, null);
} catch (AlertException e) {
return new Tuple2<>(false, e);
}
})
.reduce(
new Tuple2<>(true, null),
(tp1, tp2) -> {
boolean alertResult = tp1.f0 & tp2.f0;
if (tp1.f1 == null && tp2.f1 == null) {
return new Tuple2<>(tp1.f0 & tp2.f0, null);
}
if (tp1.f1 != null && tp2.f1 != null) {
// merge multiple exception, and keep the details of the first exception
AlertException alertException =
new AlertException(tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), tp1.f1);
return new Tuple2<>(alertResult, alertException);
}
return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : tp1.f1);
});
}
}