blob: 96eeffd798e0307b4c721fac409d2503ea8e1b8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.engine.publisher.dedup;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.typesafe.config.Config;
public class DedupCache {
private static final Logger LOG = LoggerFactory.getLogger(DedupCache.class);
private static final long CACHE_MAX_EXPIRE_TIME_IN_DAYS = 30;
public static final String DEDUP_COUNT = "dedupCount";
public static final String DOC_ID = "docId";
public static final String DEDUP_FIRST_OCCURRENCE = "dedupFirstOccurrenceTime";
private long lastUpdated = -1;
private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
@SuppressWarnings("unused")
private Config config;
private String publishName;
public DedupCache(Config config, String publishName) {
this.config = config;
this.publishName = publishName;
}
public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
if (lastUpdated < 0
|| System.currentTimeMillis() - lastUpdated > CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY
|| events.size() <= 0) {
lastUpdated = System.currentTimeMillis();
}
return events;
}
public boolean contains(EventUniq eventEniq) {
return this.getEvents().containsKey(eventEniq);
}
public void removeEvent(EventUniq eventEniq) {
if (this.contains(eventEniq)) {
this.events.remove(eventEniq);
}
}
public List<AlertStreamEvent> dedup(AlertStreamEvent event, EventUniq eventEniq,
String dedupStateField, String stateFieldValue,
String stateCloseValue) {
DedupValue[] dedupValues = this.addOrUpdate(eventEniq, event, stateFieldValue, stateCloseValue);
if (dedupValues != null) {
// any of dedupValues won't be null
if (dedupValues.length == 2) {
// emit last event which includes count of dedup events & new state event
return Arrays.asList(
this.mergeEventWithDedupValue(event, dedupValues[0], dedupStateField),
this.mergeEventWithDedupValue(event, dedupValues[1], dedupStateField));
} else if (dedupValues.length == 1) {
//populate firstOccurrenceTime & count
return Arrays.asList(this.mergeEventWithDedupValue(event, dedupValues[0], dedupStateField));
}
}
// duplicated, will be ignored
return null;
}
public synchronized DedupValue[] addOrUpdate(EventUniq eventEniq, AlertStreamEvent event, String stateFieldValue, String stateCloseValue) {
Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = this.getEvents();
if (!events.containsKey(eventEniq)
|| (events.containsKey(eventEniq)
&& events.get(eventEniq).size() > 0
&& !StringUtils.equalsIgnoreCase(stateFieldValue,
events.get(eventEniq).getLast().getStateFieldValue()))) {
DedupValue[] dedupValues = this.add(eventEniq, event, stateFieldValue, stateCloseValue);
return dedupValues;
} else {
// update count
this.updateCount(eventEniq);
return null;
}
}
private DedupValue[] add(EventUniq eventEniq, AlertStreamEvent event, String stateFieldValue, String stateCloseValue) {
DedupValue dedupValue = null;
if (!events.containsKey(eventEniq)) {
dedupValue = createDedupValue(eventEniq, event, stateFieldValue);
ConcurrentLinkedDeque<DedupValue> dedupValues = new ConcurrentLinkedDeque<>();
dedupValues.add(dedupValue);
// skip the event which put failed due to concurrency
events.put(eventEniq, dedupValues);
LOG.info("{} Add new dedup key {}, and value {}", this.publishName, eventEniq, dedupValues);
} else if (!StringUtils.equalsIgnoreCase(stateFieldValue,
events.get(eventEniq).getLast().getStateFieldValue())) {
// existing a de-dup value, try update or reset
DedupValue lastDedupValue = events.get(eventEniq).getLast();
dedupValue = updateDedupValue(lastDedupValue, eventEniq, event, stateFieldValue, stateCloseValue);
LOG.info("{} Update dedup key {}, and value {}", this.publishName, eventEniq, dedupValue);
}
if (dedupValue == null) {
return null;
}
return new DedupValue[] {dedupValue};
}
private DedupValue updateDedupValue(DedupValue lastDedupValue, EventUniq eventEniq, AlertStreamEvent event, String stateFieldValue, String stateCloseValue) {
if (lastDedupValue.getFirstOccurrence() >= eventEniq.timestamp) {
// if dedup value happens later then event, dedup state changes.
return null;
}
if (lastDedupValue.getStateFieldValue().equals(stateCloseValue)
&& eventEniq.timestamp < lastDedupValue.getCloseTime()) {
DedupValue dv = createDedupValue(eventEniq, event, stateFieldValue);
lastDedupValue.resetTo(dv);
} else {
// update lastDedupValue, set closeTime when close
lastDedupValue.setStateFieldValue(stateFieldValue);
if (stateFieldValue.equals(stateCloseValue)) {
lastDedupValue.setCloseTime(eventEniq.timestamp); // when close an event, set closeTime for further check
}
}
return lastDedupValue;
}
private DedupValue createDedupValue(EventUniq eventEniq, AlertStreamEvent event, String stateFieldValue) {
DedupValue dedupValue;
dedupValue = new DedupValue();
dedupValue.setFirstOccurrence(eventEniq.timestamp);
int idx = event.getSchema().getColumnIndex(DOC_ID);
if (idx >= 0) {
dedupValue.setDocId(event.getData()[idx].toString());
} else {
dedupValue.setDocId("");
}
dedupValue.setCount(1);
dedupValue.setCloseTime(0);
dedupValue.setStateFieldValue(stateFieldValue);
return dedupValue;
}
private DedupValue updateCount(EventUniq eventEniq) {
ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
if (dedupValues == null || dedupValues.size() <= 0) {
LOG.warn("{} No dedup values found for {}, cannot update count", this.publishName, eventEniq);
return null;
} else {
DedupValue dedupValue = dedupValues.getLast();
dedupValue.setCount(dedupValue.getCount() + 1);
String updateMsg = String.format(
"%s Update count for dedup key %s, value %s and count %s", this.publishName, eventEniq,
dedupValue.getStateFieldValue(), dedupValue.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug(updateMsg);
}
return dedupValue;
}
}
private AlertStreamEvent mergeEventWithDedupValue(AlertStreamEvent originalEvent,
DedupValue dedupValue, String dedupStateField) {
AlertStreamEvent event = new AlertStreamEvent();
Object[] newdata = new Object[originalEvent.getData().length];
for (int i = 0; i < originalEvent.getData().length; i++) {
newdata[i] = originalEvent.getData()[i];
}
event.setData(newdata);
event.setStreamId(originalEvent.getStreamId());
event.setSchema(originalEvent.getSchema());
event.setPolicyId(originalEvent.getPolicyId());
event.setCreatedTime(originalEvent.getCreatedTime());
event.setCreatedBy(originalEvent.getCreatedBy());
event.setTimestamp(originalEvent.getTimestamp());
StreamDefinition streamDefinition = event.getSchema();
for (int i = 0; i < event.getData().length; i++) {
String colName = streamDefinition.getColumns().get(i).getName();
if (Objects.equal(colName, dedupStateField)) {
event.getData()[i] = dedupValue.getStateFieldValue();
}
if (Objects.equal(colName, DEDUP_COUNT)) {
event.getData()[i] = dedupValue.getCount();
}
if (Objects.equal(colName, DEDUP_FIRST_OCCURRENCE)) {
event.getData()[i] = dedupValue.getFirstOccurrence();
}
if (Objects.equal(colName, DOC_ID)) {
event.getData()[i] = dedupValue.getDocId();
}
}
return event;
}
}