[EAGLE-1038] Support alertDuplication customization for each policy

https://issues.apache.org/jira/browse/EAGLE-1038

* support duplication check for each outputStream of a policy
* compatible with the duplication check in old versions (check in a publisher)

Author: Zhao, Qingwen <qingwzhao@apache.org>

Closes #944 from qingwen220/minor.
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
index 78fef7a..d47d7d0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
@@ -24,9 +24,18 @@
 import java.util.Objects;
 
 public class AlertDeduplication {
+    private String outputStreamId;
     private String dedupIntervalMin;
     private List<String> dedupFields;
 
+    public String getOutputStreamId() {
+        return outputStreamId;
+    }
+
+    public void setOutputStreamId(String outputStreamId) {
+        this.outputStreamId = outputStreamId;
+    }
+
     public String getDedupIntervalMin() {
         return dedupIntervalMin;
     }
@@ -46,6 +55,7 @@
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
+                .append(outputStreamId)
                 .append(dedupFields)
                 .append(dedupIntervalMin)
                 .build();
@@ -61,7 +71,8 @@
         }
         AlertDeduplication another = (AlertDeduplication) that;
         if (ListUtils.isEqualList(another.dedupFields, this.dedupFields)
-                && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) {
+                && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)
+                && Objects.equals(another.outputStreamId, this.outputStreamId)) {
             return true;
         }
         return false;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 5004513..698605e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -43,7 +43,7 @@
     private Definition stateDefinition;
     private PolicyStatus policyStatus = PolicyStatus.ENABLED;
     private AlertDefinition alertDefinition;
-    private AlertDeduplication deduplication;
+    private List<AlertDeduplication> alertDeduplications = new ArrayList<>();
 
     // one stream only have one partition in one policy, since we don't support stream alias
     private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
@@ -136,49 +136,12 @@
         this.policyStatus = policyStatus;
     }
 
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(siteId)
-                .append(name)
-                .append(inputStreams)
-                .append(outputStreams)
-                .append(definition)
-                .append(partitionSpec)
-                .append(policyStatus)
-                .append(parallelismHint)
-                .append(alertDefinition)
-                .append(deduplication)
-                .build();
+    public List<AlertDeduplication> getAlertDeduplications() {
+        return alertDeduplications;
     }
 
-    @Override
-    public boolean equals(Object that) {
-        if (that == this) {
-            return true;
-        }
-
-        if (!(that instanceof PolicyDefinition)) {
-            return false;
-        }
-
-        PolicyDefinition another = (PolicyDefinition) that;
-
-        if (Objects.equals(another.siteId, this.siteId)
-                && Objects.equals(another.name, this.name)
-                && Objects.equals(another.description, this.description)
-                && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
-                && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
-                && (another.definition != null && another.definition.equals(this.definition))
-                && Objects.equals(this.definition, another.definition)
-                && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
-                && another.policyStatus.equals(this.policyStatus)
-                && another.parallelismHint == this.parallelismHint
-                && Objects.equals(another.alertDefinition, alertDefinition)
-                && Objects.equals(another.deduplication, deduplication)) {
-            return true;
-        }
-        return false;
+    public void setAlertDeduplications(List<AlertDeduplication> alertDeduplications) {
+        this.alertDeduplications = alertDeduplications;
     }
 
     public AlertDefinition getAlertDefinition() {
@@ -205,12 +168,49 @@
         this.siteId = siteId;
     }
 
-    public AlertDeduplication getDeduplication() {
-        return deduplication;
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(siteId)
+                .append(name)
+                .append(inputStreams)
+                .append(outputStreams)
+                .append(definition)
+                .append(partitionSpec)
+                .append(policyStatus)
+                .append(parallelismHint)
+                .append(alertDefinition)
+                .append(alertDeduplications)
+                .build();
     }
 
-    public void setDeduplication(AlertDeduplication deduplication) {
-        this.deduplication = deduplication;
+    @Override
+    public boolean equals(Object that) {
+        if (that == this) {
+            return true;
+        }
+
+        if (!(that instanceof PolicyDefinition)) {
+            return false;
+        }
+
+        PolicyDefinition another = (PolicyDefinition) that;
+
+        if (Objects.equals(another.siteId, this.siteId)
+                && Objects.equals(another.name, this.name)
+                && Objects.equals(another.description, this.description)
+                && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
+                && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
+                && (another.definition != null && another.definition.equals(this.definition))
+                && Objects.equals(this.definition, another.definition)
+                && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
+                && another.policyStatus.equals(this.policyStatus)
+                && another.parallelismHint == this.parallelismHint
+                && Objects.equals(another.alertDefinition, alertDefinition)
+                && CollectionUtils.isEqualCollection(another.alertDeduplications, alertDeduplications)) {
+            return true;
+        }
+        return false;
     }
 
     @JsonIgnoreProperties(ignoreUnknown = true)
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 00170df..3079f77 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -39,6 +39,7 @@
     private long createdTime;
     private String category;
     private AlertSeverity severity = AlertSeverity.WARNING;
+    private boolean duplicationChecked = false;
 
     // ----------------------
     // Lazy Alert Fields
@@ -187,4 +188,12 @@
     public void setSiteId(String siteId) {
         this.siteId = siteId;
     }
+
+    public boolean isDuplicationChecked() {
+        return duplicationChecked;
+    }
+
+    public void setDuplicationChecked(boolean duplicationChecked) {
+        this.duplicationChecked = duplicationChecked;
+    }
 }
\ No newline at end of file
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java
new file mode 100644
index 0000000..59da244
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class AlertDeduplicationTest {
+
+    @Test
+    public void testEqual() {
+        AlertDeduplication deduplication1 = new AlertDeduplication();
+        deduplication1.setDedupIntervalMin("1");
+        deduplication1.setOutputStreamId("stream");
+
+        AlertDeduplication deduplication2 = new AlertDeduplication();
+        deduplication2.setDedupIntervalMin("1");
+        deduplication2.setOutputStreamId("stream");
+        deduplication2.setDedupFields(new ArrayList<>());
+
+        Assert.assertFalse(deduplication1.equals(deduplication2));
+
+        AlertDeduplication deduplication3 = new AlertDeduplication();
+        deduplication3.setDedupFields(new ArrayList<>());
+        deduplication3.setOutputStreamId("stream");
+        deduplication3.setDedupIntervalMin("1");
+
+        Assert.assertTrue(deduplication3.equals(deduplication2));
+    }
+}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index abb83d6..96eeffd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -26,7 +26,6 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
index 86bc9b3..e666c64 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
@@ -20,8 +20,6 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-
 public class DedupEntity {
 
     private String publishName;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java
new file mode 100644
index 0000000..f6b03a4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.Objects;
+
+public class DedupKey {
+    private String policyId;
+    private String outputStreamId;
+
+    public DedupKey(String policyId, String outputStreamId) {
+        this.policyId = policyId;
+        this.outputStreamId = outputStreamId;
+    }
+
+    public String getPolicyId() {
+        return policyId;
+    }
+
+    public void setPolicyId(String policyId) {
+        this.policyId = policyId;
+    }
+
+    public String getOutputStreamId() {
+        return outputStreamId;
+    }
+
+    public void setOutputStreamId(String outputStreamId) {
+        this.outputStreamId = outputStreamId;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj instanceof DedupKey) {
+            DedupKey au = (DedupKey) obj;
+            return Objects.equals(au.getOutputStreamId(), this.outputStreamId)
+                    && Objects.equals(au.getPolicyId(), this.policyId);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(outputStreamId).append(policyId).build();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DedupKey[outputStreamId: %s, policyId: %s]", outputStreamId, policyId);
+    }
+}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
similarity index 77%
rename from eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
rename to eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
index 54d551e..2307d7a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
@@ -1,38 +1,34 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
- *
  */
-package org.apache.eagle.alert.engine.publisher.impl;
+package org.apache.eagle.alert.engine.publisher.dedup;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.AlertDeduplication;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 public class DefaultDeduplicator implements AlertDeduplicator {
@@ -43,28 +39,25 @@
     private List<String> customDedupFields = new ArrayList<>();
     private String dedupStateField;
     private String dedupStateCloseValue;
+    private AlertDeduplication alertDeduplication = null;
 
     private DedupCache dedupCache;
 
     private Cache<EventUniq, String> withoutStatesCache;
 
-    public DefaultDeduplicator() {
-        this.dedupIntervalSec = 0;
-    }
-
-    public DefaultDeduplicator(String intervalMin) {
-        setDedupIntervalMin(intervalMin);
-    }
-
-    public DefaultDeduplicator(long intervalMin) {
-        this.dedupIntervalSec = intervalMin;
-    }
-
     public DefaultDeduplicator(AlertDeduplication alertDeduplication) {
+        this.alertDeduplication = alertDeduplication;
         this.customDedupFields = alertDeduplication.getDedupFields();
-        this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
+        try {
+            this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
+        } catch (Exception e) {
+            LOG.error("de-duplication intervalSec {} parse error, use 30 min instead", alertDeduplication.getDedupIntervalMin(), e.getMessage());
+            this.dedupIntervalSec = 1800;
+        }
         this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
                 this.dedupIntervalSec, TimeUnit.SECONDS).build();
+
+        LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields);
     }
 
     public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
@@ -83,6 +76,8 @@
 
         withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
             this.dedupIntervalSec, TimeUnit.SECONDS).build();
+
+        LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields);
     }
 
     /*
@@ -112,6 +107,10 @@
         if (event == null) {
             return null;
         }
+        if (dedupIntervalSec <= 0) {
+            return Collections.singletonList(event);
+        }
+
         // check custom field, and get the field values
         StreamDefinition streamDefinition = event.getSchema();
         HashMap<String, String> customFieldValues = new HashMap<>();
@@ -133,7 +132,9 @@
             // make all of the field as unique key if no custom dedup field provided
             if (colValue != null) {
                 if (customDedupFields == null || customDedupFields.size() <= 0) {
-                    customFieldValues.put(colName, colValue.toString());
+                    if (streamDefinition.getColumns().get(i).getType().equals(StreamColumn.Type.STRING)) {
+                        customFieldValues.put(colName, colValue.toString());
+                    }
                 } else {
                     for (String field : customDedupFields) {
                         if (colName.equals(field)) {
@@ -170,4 +171,8 @@
         }
     }
 
+    public AlertDeduplication getAlertDeduplication() {
+        return alertDeduplication;
+    }
+
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
similarity index 82%
rename from eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
rename to eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
index 511abcd..1434bb7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
@@ -1,24 +1,23 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
- *
  */
 /**
  *
  */
-package org.apache.eagle.alert.engine.publisher.impl;
+package org.apache.eagle.alert.engine.publisher.dedup;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index c5c9e04..771f736 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -26,6 +26,7 @@
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
 import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator;
 import org.slf4j.Logger;
 
@@ -93,7 +94,7 @@
             }
             serializer = (IEventSerializer) obj;
         } catch (Exception e) {
-            getLogger().error(String.format("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage()), e);
+            getLogger().error("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage(), e);
             serializer = new StringEventSerializer(conf);
         }
     }
@@ -105,7 +106,7 @@
 
     @Override
     public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
-        if (null != deduplicator) {
+        if (null != deduplicator && !event.isDuplicationChecked()) {
             return deduplicator.dedup(event);
         } else {
             return Collections.singletonList(event);
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index d6829d6..39c577c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -28,22 +28,17 @@
 import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
-import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
+import org.apache.eagle.alert.engine.publisher.*;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupKey;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
 import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
 import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -53,7 +48,7 @@
     private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
     private volatile Map<String, PolicyDefinition> policyDefinitionMap;
     private volatile Map<String, StreamDefinition> streamDefinitionMap;
-    private volatile Map<String, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
+    private volatile Map<DedupKey, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
     private AlertTemplateEngine alertTemplateEngine;
 
     private boolean logEventEnabled;
@@ -90,13 +85,16 @@
             if (logEventEnabled) {
                 LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
             }
-            if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) {
-                List<AlertStreamEvent> eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event);
+            DedupKey dedupKey = new DedupKey(event.getPolicyId(), event.getStreamId());
+            if (deduplicatorMap != null && deduplicatorMap.containsKey(dedupKey)) {
+                List<AlertStreamEvent> eventList = deduplicatorMap.get(dedupKey).dedup(event);
                 if (eventList == null || eventList.isEmpty()) {
                     collector.ack(input);
                     return;
                 }
+                event.setDuplicationChecked(true);
             }
+
             AlertStreamEvent filteredEvent = alertFilter.filter(event);
             if (filteredEvent != null) {
                 alertPublisher.nextEvent(partition, filteredEvent);
@@ -161,8 +159,15 @@
         for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
             try {
                 this.alertTemplateEngine.register(entry.getValue());
-                if (entry.getValue().getDeduplication() != null) {
-                    this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication()));
+                List<AlertDeduplication> alertDeduplications = entry.getValue().getAlertDeduplications();
+                if (alertDeduplications != null && alertDeduplications.size() > 0) {
+                    for (AlertDeduplication deduplication : alertDeduplications) {
+                        DedupKey dedupKey = new DedupKey(entry.getKey(), deduplication.getOutputStreamId());
+                        if (!deduplicatorMap.containsKey(dedupKey)
+                                || !deduplicatorMap.get(dedupKey).getAlertDeduplication().equals(deduplication)) {
+                            deduplicatorMap.put(dedupKey, new DefaultDeduplicator(deduplication));
+                        }
+                    }
                 }
             } catch (Throwable throwable) {
                 LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable);
@@ -172,8 +177,10 @@
         for (String policyId : policyToRemove) {
             try {
                 this.alertTemplateEngine.unregister(policyId);
-                if (deduplicatorMap != null && deduplicatorMap.containsKey(policyId)) {
-                    deduplicatorMap.remove(policyId);
+                for (DedupKey dedupKey : deduplicatorMap.keySet()) {
+                    if (dedupKey.getPolicyId().equals(policyId)) {
+                        deduplicatorMap.remove(dedupKey);
+                    }
                 }
             } catch (Throwable throwable) {
                 LOG.error("Failed to unregister policy {} from template engine", policyId, throwable);
@@ -231,4 +238,5 @@
             return this.alertTemplateEngine.filter(event);
         }
     }
-}
\ No newline at end of file
+
+}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index 5bf0410..95e679d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -27,7 +27,6 @@
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java
new file mode 100644
index 0000000..7fc886d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DedupKeyTest {
+
+    @Test
+    public void test() {
+        Map<DedupKey, Integer> testMap = new HashMap<>();
+        DedupKey key1 = new DedupKey("policy1", "stream1");
+        update(testMap, key1);
+        update(testMap, key1);
+
+        DedupKey key2 = new DedupKey("policy2", "stream2");
+        update(testMap, key2);
+
+        Assert.assertTrue(testMap.get(key1) == 1);
+        Assert.assertTrue(testMap.get(key2) == 0);
+
+        DedupKey key3 = new DedupKey("policy1", "stream1");
+        update(testMap, key3);
+
+        Assert.assertTrue(testMap.get(key3) == 2);
+    }
+
+    private void update(Map<DedupKey, Integer> map, DedupKey key) {
+        if (map.containsKey(key)) {
+            map.put(key, map.get(key) + 1);
+        } else {
+            map.put(key, 0);
+        }
+    }
+}
+
+
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
index c48df9a..f839474 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -22,7 +22,6 @@
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 297b790..96da4c9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -22,7 +22,6 @@
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
index 247f332..51d054c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
@@ -18,7 +18,6 @@
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 46517fe..2cd2183 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,22 +18,23 @@
 
 package org.apache.eagle.alert.engine.router;
 
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupKey;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
 import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
 import org.apache.eagle.alert.engine.runner.MapComparator;
 import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
@@ -300,4 +301,76 @@
             pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3);
 
     }
+
+    @Test
+    public void testOnAlertPolicyChange() throws IllegalAccessException, NoSuchFieldException {
+        AlertDeduplication deduplication = new AlertDeduplication();
+        deduplication.setDedupIntervalMin("1");
+        deduplication.setOutputStreamId("stream");
+
+        PolicyDefinition policy1 = new PolicyDefinition();
+        policy1.setName("policy1");
+        policy1.getAlertDeduplications().add(deduplication);
+
+        Map<String, PolicyDefinition> pds1 = new HashMap<>();
+        pds1.put("policy1", policy1);
+
+        PolicyDefinition policy2 = new PolicyDefinition();
+        policy2.setName("policy2");
+        policy2.getAlertDeduplications().add(deduplication);
+
+        Map<String, PolicyDefinition> pds2 = new HashMap<>();
+        pds2.put("policy2", policy2);
+
+        AlertPublisherBolt bolt = new AlertPublisherBolt("publisher", null, null);
+
+        Field field = AlertPublisherBolt.class.getDeclaredField("alertTemplateEngine");
+        field.setAccessible(true);
+        AlertTemplateEngine engine = AlertTemplateProvider.createAlertTemplateEngine();
+        engine.init(null);
+        field.set(bolt, engine);
+
+        DedupKey dedupKey1 = new DedupKey("policy1", "stream");
+        DedupKey dedupKey2 = new DedupKey("policy2", "stream");
+        bolt.onAlertPolicyChange(pds1, null);
+        Map<DedupKey, DefaultDeduplicator> deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1));
+
+        // remove policy1 and add policy2
+        bolt.onAlertPolicyChange(pds2, null);
+        deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2));
+        Assert.assertFalse(deduplicatorMap.containsKey(dedupKey1));
+
+        // add new policy policy1 in pds2
+        pds2.put("policy1", policy1);
+        bolt.onAlertPolicyChange(pds2, null);
+        deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1));
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2));
+        Assert.assertTrue(deduplicatorMap.get(dedupKey1).getAlertDeduplication()
+                .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication()));
+
+        // update policy1 alertDeduplication
+        AlertDeduplication deduplication1 = new AlertDeduplication();
+        deduplication1.setOutputStreamId("stream");
+        deduplication1.setDedupIntervalMin("2");
+        policy1.getAlertDeduplications().clear();
+        policy1.getAlertDeduplications().add(deduplication1);
+        pds2.put("policy1", policy1);
+        bolt.onAlertPolicyChange(pds2, null);
+        deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy2", "stream")));
+        Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy1", "stream")));
+        Assert.assertFalse(deduplicatorMap.get(dedupKey1).getAlertDeduplication()
+                .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication()));
+
+    }
+
+
+    private Map<DedupKey, DefaultDeduplicator> getAlertDeduplicator(AlertPublisherBolt bolt) throws NoSuchFieldException, IllegalAccessException {
+        Field field = AlertPublisherBolt.class.getDeclaredField("deduplicatorMap");
+        field.setAccessible(true);
+        return (Map<DedupKey, DefaultDeduplicator>) field.get(bolt);
+    }
 }