blob: 54d551e68503338fe9bc1312eff41942446956e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.eagle.alert.engine.publisher.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.AlertDeduplication;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class DefaultDeduplicator implements AlertDeduplicator {
private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
private long dedupIntervalSec;
private List<String> customDedupFields = new ArrayList<>();
private String dedupStateField;
private String dedupStateCloseValue;
private DedupCache dedupCache;
private Cache<EventUniq, String> withoutStatesCache;
public DefaultDeduplicator() {
this.dedupIntervalSec = 0;
}
public DefaultDeduplicator(String intervalMin) {
setDedupIntervalMin(intervalMin);
}
public DefaultDeduplicator(long intervalMin) {
this.dedupIntervalSec = intervalMin;
}
public DefaultDeduplicator(AlertDeduplication alertDeduplication) {
this.customDedupFields = alertDeduplication.getDedupFields();
this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
this.dedupIntervalSec, TimeUnit.SECONDS).build();
}
public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) {
setDedupIntervalMin(intervalMin);
if (customDedupFields != null) {
this.customDedupFields = customDedupFields;
}
if (StringUtils.isNotBlank(dedupStateField)) {
this.dedupStateField = dedupStateField;
}
if (StringUtils.isNotBlank(dedupStateCloseValue)) {
this.dedupStateCloseValue = dedupStateCloseValue;
}
this.dedupCache = dedupCache;
withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
this.dedupIntervalSec, TimeUnit.SECONDS).build();
}
/*
* @param key
* @return
*/
private List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
if (StringUtils.isBlank(stateFiledValue)) {
// without state field, we cannot determine whether it is duplicated
// without custom filed values, we cannot determine whether it is duplicated
synchronized (withoutStatesCache) {
if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key);
}
return null;
} else if (withoutStatesCache != null) {
withoutStatesCache.put(key, "");
}
}
return Arrays.asList(event);
}
return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue);
}
public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
if (event == null) {
return null;
}
// check custom field, and get the field values
StreamDefinition streamDefinition = event.getSchema();
HashMap<String, String> customFieldValues = new HashMap<>();
String stateFiledValue = null;
for (int i = 0; i < event.getData().length; i++) {
if (i > streamDefinition.getColumns().size()) {
if (LOG.isWarnEnabled()) {
LOG.warn("output column does not found for event data, this indicate code error!");
}
continue;
}
String colName = streamDefinition.getColumns().get(i).getName();
Object colValue = event.getData()[i];
if (colName.equals(dedupStateField) && colValue != null) {
stateFiledValue = colValue.toString();
}
// make all of the field as unique key if no custom dedup field provided
if (colValue != null) {
if (customDedupFields == null || customDedupFields.size() <= 0) {
customFieldValues.put(colName, colValue.toString());
} else {
for (String field : customDedupFields) {
if (colName.equals(field)) {
customFieldValues.put(field, colValue.toString());
break;
}
}
}
}
}
List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(),
event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue);
if (outputEvents != null && outputEvents.size() > 0) {
return outputEvents;
} else if (LOG.isInfoEnabled()) {
LOG.info("Alert event is skipped because it's duplicated: {}", event.toString());
}
return null;
}
@Override
public void setDedupIntervalMin(String newDedupIntervalMin) {
if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
dedupIntervalSec = 0;
return;
}
try {
Period period = Period.parse(newDedupIntervalMin);
this.dedupIntervalSec = period.toStandardSeconds().getSeconds();
} catch (Exception e) {
LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
this.dedupIntervalSec = 0;
}
}
}