[EAGLE-1038] Support alertDuplication customization for each policy
https://issues.apache.org/jira/browse/EAGLE-1038
* support duplication check for each outputStream of a policy
* compatible with the duplication check in old versions (check in a publisher)
Author: Zhao, Qingwen <qingwzhao@apache.org>
Closes #944 from qingwen220/minor.
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
index 78fef7a..d47d7d0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
@@ -24,9 +24,18 @@
import java.util.Objects;
public class AlertDeduplication {
+ private String outputStreamId;
private String dedupIntervalMin;
private List<String> dedupFields;
+ public String getOutputStreamId() {
+ return outputStreamId;
+ }
+
+ public void setOutputStreamId(String outputStreamId) {
+ this.outputStreamId = outputStreamId;
+ }
+
public String getDedupIntervalMin() {
return dedupIntervalMin;
}
@@ -46,6 +55,7 @@
@Override
public int hashCode() {
return new HashCodeBuilder()
+ .append(outputStreamId)
.append(dedupFields)
.append(dedupIntervalMin)
.build();
@@ -61,7 +71,8 @@
}
AlertDeduplication another = (AlertDeduplication) that;
if (ListUtils.isEqualList(another.dedupFields, this.dedupFields)
- && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) {
+ && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)
+ && Objects.equals(another.outputStreamId, this.outputStreamId)) {
return true;
}
return false;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 5004513..698605e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -43,7 +43,7 @@
private Definition stateDefinition;
private PolicyStatus policyStatus = PolicyStatus.ENABLED;
private AlertDefinition alertDefinition;
- private AlertDeduplication deduplication;
+ private List<AlertDeduplication> alertDeduplications = new ArrayList<>();
// one stream only have one partition in one policy, since we don't support stream alias
private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
@@ -136,49 +136,12 @@
this.policyStatus = policyStatus;
}
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(siteId)
- .append(name)
- .append(inputStreams)
- .append(outputStreams)
- .append(definition)
- .append(partitionSpec)
- .append(policyStatus)
- .append(parallelismHint)
- .append(alertDefinition)
- .append(deduplication)
- .build();
+ public List<AlertDeduplication> getAlertDeduplications() {
+ return alertDeduplications;
}
- @Override
- public boolean equals(Object that) {
- if (that == this) {
- return true;
- }
-
- if (!(that instanceof PolicyDefinition)) {
- return false;
- }
-
- PolicyDefinition another = (PolicyDefinition) that;
-
- if (Objects.equals(another.siteId, this.siteId)
- && Objects.equals(another.name, this.name)
- && Objects.equals(another.description, this.description)
- && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
- && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
- && (another.definition != null && another.definition.equals(this.definition))
- && Objects.equals(this.definition, another.definition)
- && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
- && another.policyStatus.equals(this.policyStatus)
- && another.parallelismHint == this.parallelismHint
- && Objects.equals(another.alertDefinition, alertDefinition)
- && Objects.equals(another.deduplication, deduplication)) {
- return true;
- }
- return false;
+ public void setAlertDeduplications(List<AlertDeduplication> alertDeduplications) {
+ this.alertDeduplications = alertDeduplications;
}
public AlertDefinition getAlertDefinition() {
@@ -205,12 +168,49 @@
this.siteId = siteId;
}
- public AlertDeduplication getDeduplication() {
- return deduplication;
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(siteId)
+ .append(name)
+ .append(inputStreams)
+ .append(outputStreams)
+ .append(definition)
+ .append(partitionSpec)
+ .append(policyStatus)
+ .append(parallelismHint)
+ .append(alertDefinition)
+ .append(alertDeduplications)
+ .build();
}
- public void setDeduplication(AlertDeduplication deduplication) {
- this.deduplication = deduplication;
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+
+ if (!(that instanceof PolicyDefinition)) {
+ return false;
+ }
+
+ PolicyDefinition another = (PolicyDefinition) that;
+
+ if (Objects.equals(another.siteId, this.siteId)
+ && Objects.equals(another.name, this.name)
+ && Objects.equals(another.description, this.description)
+ && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
+ && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
+ && (another.definition != null && another.definition.equals(this.definition))
+ && Objects.equals(this.definition, another.definition)
+ && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
+ && another.policyStatus.equals(this.policyStatus)
+ && another.parallelismHint == this.parallelismHint
+ && Objects.equals(another.alertDefinition, alertDefinition)
+ && CollectionUtils.isEqualCollection(another.alertDeduplications, alertDeduplications)) {
+ return true;
+ }
+ return false;
}
@JsonIgnoreProperties(ignoreUnknown = true)
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 00170df..3079f77 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -39,6 +39,7 @@
private long createdTime;
private String category;
private AlertSeverity severity = AlertSeverity.WARNING;
+ private boolean duplicationChecked = false;
// ----------------------
// Lazy Alert Fields
@@ -187,4 +188,12 @@
public void setSiteId(String siteId) {
this.siteId = siteId;
}
+
+ public boolean isDuplicationChecked() {
+ return duplicationChecked;
+ }
+
+ public void setDuplicationChecked(boolean duplicationChecked) {
+ this.duplicationChecked = duplicationChecked;
+ }
}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java
new file mode 100644
index 0000000..59da244
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class AlertDeduplicationTest {
+
+ @Test
+ public void testEqual() {
+ AlertDeduplication deduplication1 = new AlertDeduplication();
+ deduplication1.setDedupIntervalMin("1");
+ deduplication1.setOutputStreamId("stream");
+
+ AlertDeduplication deduplication2 = new AlertDeduplication();
+ deduplication2.setDedupIntervalMin("1");
+ deduplication2.setOutputStreamId("stream");
+ deduplication2.setDedupFields(new ArrayList<>());
+
+ Assert.assertFalse(deduplication1.equals(deduplication2));
+
+ AlertDeduplication deduplication3 = new AlertDeduplication();
+ deduplication3.setDedupFields(new ArrayList<>());
+ deduplication3.setOutputStreamId("stream");
+ deduplication3.setDedupIntervalMin("1");
+
+ Assert.assertTrue(deduplication3.equals(deduplication2));
+ }
+}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index abb83d6..96eeffd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -26,7 +26,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
index 86bc9b3..e666c64 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
@@ -20,8 +20,6 @@
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-
public class DedupEntity {
private String publishName;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java
new file mode 100644
index 0000000..f6b03a4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.Objects;
+
+public class DedupKey {
+ private String policyId;
+ private String outputStreamId;
+
+ public DedupKey(String policyId, String outputStreamId) {
+ this.policyId = policyId;
+ this.outputStreamId = outputStreamId;
+ }
+
+ public String getPolicyId() {
+ return policyId;
+ }
+
+ public void setPolicyId(String policyId) {
+ this.policyId = policyId;
+ }
+
+ public String getOutputStreamId() {
+ return outputStreamId;
+ }
+
+ public void setOutputStreamId(String outputStreamId) {
+ this.outputStreamId = outputStreamId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof DedupKey) {
+ DedupKey au = (DedupKey) obj;
+ return Objects.equals(au.getOutputStreamId(), this.outputStreamId)
+ && Objects.equals(au.getPolicyId(), this.policyId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(outputStreamId).append(policyId).build();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DedupKey[outputStreamId: %s, policyId: %s]", outputStreamId, policyId);
+ }
+}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
similarity index 77%
rename from eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
rename to eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
index 54d551e..2307d7a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
@@ -1,38 +1,34 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.eagle.alert.engine.publisher.impl;
+package org.apache.eagle.alert.engine.publisher.dedup;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.AlertDeduplication;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.TimeUnit;
public class DefaultDeduplicator implements AlertDeduplicator {
@@ -43,28 +39,25 @@
private List<String> customDedupFields = new ArrayList<>();
private String dedupStateField;
private String dedupStateCloseValue;
+ private AlertDeduplication alertDeduplication = null;
private DedupCache dedupCache;
private Cache<EventUniq, String> withoutStatesCache;
- public DefaultDeduplicator() {
- this.dedupIntervalSec = 0;
- }
-
- public DefaultDeduplicator(String intervalMin) {
- setDedupIntervalMin(intervalMin);
- }
-
- public DefaultDeduplicator(long intervalMin) {
- this.dedupIntervalSec = intervalMin;
- }
-
public DefaultDeduplicator(AlertDeduplication alertDeduplication) {
+ this.alertDeduplication = alertDeduplication;
this.customDedupFields = alertDeduplication.getDedupFields();
- this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
+ try {
+ this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
+ } catch (Exception e) {
+ LOG.error("de-duplication intervalSec {} parse error, use 30 min instead", alertDeduplication.getDedupIntervalMin(), e.getMessage());
+ this.dedupIntervalSec = 1800;
+ }
this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
this.dedupIntervalSec, TimeUnit.SECONDS).build();
+
+ LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields);
}
public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
@@ -83,6 +76,8 @@
withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
this.dedupIntervalSec, TimeUnit.SECONDS).build();
+
+ LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields);
}
/*
@@ -112,6 +107,10 @@
if (event == null) {
return null;
}
+ if (dedupIntervalSec <= 0) {
+ return Collections.singletonList(event);
+ }
+
// check custom field, and get the field values
StreamDefinition streamDefinition = event.getSchema();
HashMap<String, String> customFieldValues = new HashMap<>();
@@ -133,7 +132,9 @@
// make all of the field as unique key if no custom dedup field provided
if (colValue != null) {
if (customDedupFields == null || customDedupFields.size() <= 0) {
- customFieldValues.put(colName, colValue.toString());
+ if (streamDefinition.getColumns().get(i).getType().equals(StreamColumn.Type.STRING)) {
+ customFieldValues.put(colName, colValue.toString());
+ }
} else {
for (String field : customDedupFields) {
if (colName.equals(field)) {
@@ -170,4 +171,8 @@
}
}
+ public AlertDeduplication getAlertDeduplication() {
+ return alertDeduplication;
+ }
+
}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
similarity index 82%
rename from eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
rename to eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
index 511abcd..1434bb7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
@@ -1,24 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
/**
*
*/
-package org.apache.eagle.alert.engine.publisher.impl;
+package org.apache.eagle.alert.engine.publisher.dedup;
import org.apache.commons.lang3.builder.HashCodeBuilder;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index c5c9e04..771f736 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -26,6 +26,7 @@
import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator;
import org.slf4j.Logger;
@@ -93,7 +94,7 @@
}
serializer = (IEventSerializer) obj;
} catch (Exception e) {
- getLogger().error(String.format("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage()), e);
+ getLogger().error("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage(), e);
serializer = new StringEventSerializer(conf);
}
}
@@ -105,7 +106,7 @@
@Override
public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
- if (null != deduplicator) {
+ if (null != deduplicator && !event.isDuplicationChecked()) {
return deduplicator.dedup(event);
} else {
return Collections.singletonList(event);
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index d6829d6..39c577c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -28,22 +28,17 @@
import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
-import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
+import org.apache.eagle.alert.engine.publisher.*;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupKey;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
import org.apache.eagle.alert.utils.AlertConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -53,7 +48,7 @@
private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
private volatile Map<String, PolicyDefinition> policyDefinitionMap;
private volatile Map<String, StreamDefinition> streamDefinitionMap;
- private volatile Map<String, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
+ private volatile Map<DedupKey, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
private AlertTemplateEngine alertTemplateEngine;
private boolean logEventEnabled;
@@ -90,13 +85,16 @@
if (logEventEnabled) {
LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
}
- if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) {
- List<AlertStreamEvent> eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event);
+ DedupKey dedupKey = new DedupKey(event.getPolicyId(), event.getStreamId());
+ if (deduplicatorMap != null && deduplicatorMap.containsKey(dedupKey)) {
+ List<AlertStreamEvent> eventList = deduplicatorMap.get(dedupKey).dedup(event);
if (eventList == null || eventList.isEmpty()) {
collector.ack(input);
return;
}
+ event.setDuplicationChecked(true);
}
+
AlertStreamEvent filteredEvent = alertFilter.filter(event);
if (filteredEvent != null) {
alertPublisher.nextEvent(partition, filteredEvent);
@@ -161,8 +159,15 @@
for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
try {
this.alertTemplateEngine.register(entry.getValue());
- if (entry.getValue().getDeduplication() != null) {
- this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication()));
+ List<AlertDeduplication> alertDeduplications = entry.getValue().getAlertDeduplications();
+ if (alertDeduplications != null && alertDeduplications.size() > 0) {
+ for (AlertDeduplication deduplication : alertDeduplications) {
+ DedupKey dedupKey = new DedupKey(entry.getKey(), deduplication.getOutputStreamId());
+ if (!deduplicatorMap.containsKey(dedupKey)
+ || !deduplicatorMap.get(dedupKey).getAlertDeduplication().equals(deduplication)) {
+ deduplicatorMap.put(dedupKey, new DefaultDeduplicator(deduplication));
+ }
+ }
}
} catch (Throwable throwable) {
LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable);
@@ -172,8 +177,10 @@
for (String policyId : policyToRemove) {
try {
this.alertTemplateEngine.unregister(policyId);
- if (deduplicatorMap != null && deduplicatorMap.containsKey(policyId)) {
- deduplicatorMap.remove(policyId);
+ for (DedupKey dedupKey : deduplicatorMap.keySet()) {
+ if (dedupKey.getPolicyId().equals(policyId)) {
+ deduplicatorMap.remove(dedupKey);
+ }
}
} catch (Throwable throwable) {
LOG.error("Failed to unregister policy {} from template engine", policyId, throwable);
@@ -231,4 +238,5 @@
return this.alertTemplateEngine.filter(event);
}
}
-}
\ No newline at end of file
+
+}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index 5bf0410..95e679d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -27,7 +27,6 @@
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
import org.junit.Assert;
import org.junit.Test;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java
new file mode 100644
index 0000000..7fc886d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DedupKeyTest {
+
+ @Test
+ public void test() {
+ Map<DedupKey, Integer> testMap = new HashMap<>();
+ DedupKey key1 = new DedupKey("policy1", "stream1");
+ update(testMap, key1);
+ update(testMap, key1);
+
+ DedupKey key2 = new DedupKey("policy2", "stream2");
+ update(testMap, key2);
+
+ Assert.assertTrue(testMap.get(key1) == 1);
+ Assert.assertTrue(testMap.get(key2) == 0);
+
+ DedupKey key3 = new DedupKey("policy1", "stream1");
+ update(testMap, key3);
+
+ Assert.assertTrue(testMap.get(key3) == 2);
+ }
+
+ private void update(Map<DedupKey, Integer> map, DedupKey key) {
+ if (map.containsKey(key)) {
+ map.put(key, map.get(key) + 1);
+ } else {
+ map.put(key, 0);
+ }
+ }
+}
+
+
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
index c48df9a..f839474 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -22,7 +22,6 @@
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
import org.junit.Assert;
import org.junit.Test;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 297b790..96da4c9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -22,7 +22,6 @@
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
import org.junit.Assert;
import org.junit.Test;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
index 247f332..51d054c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
@@ -18,7 +18,6 @@
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 46517fe..2cd2183 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,22 +18,23 @@
package org.apache.eagle.alert.engine.router;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupKey;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
import org.apache.eagle.alert.engine.runner.MapComparator;
import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
@@ -300,4 +301,76 @@
pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3);
}
+
+ @Test
+ public void testOnAlertPolicyChange() throws IllegalAccessException, NoSuchFieldException {
+ AlertDeduplication deduplication = new AlertDeduplication();
+ deduplication.setDedupIntervalMin("1");
+ deduplication.setOutputStreamId("stream");
+
+ PolicyDefinition policy1 = new PolicyDefinition();
+ policy1.setName("policy1");
+ policy1.getAlertDeduplications().add(deduplication);
+
+ Map<String, PolicyDefinition> pds1 = new HashMap<>();
+ pds1.put("policy1", policy1);
+
+ PolicyDefinition policy2 = new PolicyDefinition();
+ policy2.setName("policy2");
+ policy2.getAlertDeduplications().add(deduplication);
+
+ Map<String, PolicyDefinition> pds2 = new HashMap<>();
+ pds2.put("policy2", policy2);
+
+ AlertPublisherBolt bolt = new AlertPublisherBolt("publisher", null, null);
+
+ Field field = AlertPublisherBolt.class.getDeclaredField("alertTemplateEngine");
+ field.setAccessible(true);
+ AlertTemplateEngine engine = AlertTemplateProvider.createAlertTemplateEngine();
+ engine.init(null);
+ field.set(bolt, engine);
+
+ DedupKey dedupKey1 = new DedupKey("policy1", "stream");
+ DedupKey dedupKey2 = new DedupKey("policy2", "stream");
+ bolt.onAlertPolicyChange(pds1, null);
+ Map<DedupKey, DefaultDeduplicator> deduplicatorMap = getAlertDeduplicator(bolt);
+ Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1));
+
+ // remove policy1 and add policy2
+ bolt.onAlertPolicyChange(pds2, null);
+ deduplicatorMap = getAlertDeduplicator(bolt);
+ Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2));
+ Assert.assertFalse(deduplicatorMap.containsKey(dedupKey1));
+
+ // add new policy policy1 in pds2
+ pds2.put("policy1", policy1);
+ bolt.onAlertPolicyChange(pds2, null);
+ deduplicatorMap = getAlertDeduplicator(bolt);
+ Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1));
+ Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2));
+ Assert.assertTrue(deduplicatorMap.get(dedupKey1).getAlertDeduplication()
+ .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication()));
+
+ // update policy1 alertDeduplication
+ AlertDeduplication deduplication1 = new AlertDeduplication();
+ deduplication1.setOutputStreamId("stream");
+ deduplication1.setDedupIntervalMin("2");
+ policy1.getAlertDeduplications().clear();
+ policy1.getAlertDeduplications().add(deduplication1);
+ pds2.put("policy1", policy1);
+ bolt.onAlertPolicyChange(pds2, null);
+ deduplicatorMap = getAlertDeduplicator(bolt);
+ Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy2", "stream")));
+ Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy1", "stream")));
+ Assert.assertFalse(deduplicatorMap.get(dedupKey1).getAlertDeduplication()
+ .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication()));
+
+ }
+
+
+ private Map<DedupKey, DefaultDeduplicator> getAlertDeduplicator(AlertPublisherBolt bolt) throws NoSuchFieldException, IllegalAccessException {
+ Field field = AlertPublisherBolt.class.getDeclaredField("deduplicatorMap");
+ field.setAccessible(true);
+ return (Map<DedupKey, DefaultDeduplicator>) field.get(bolt);
+ }
}