Making CEP fault handling window processors resilient against issues in CEP. Wait for a given timeout or the first event is received before processing health stats. This is to avoid false positive faulty members. Make CEP fault handling window processor wait until complete topology event is received at the startup.
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 2696271..51e4850 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -35,24 +35,16 @@
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
 public class CEPTopologyEventReceiver {
-
     private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
-
     private FaultHandlingWindowProcessor faultHandler;
     private TopologyEventReceiver topologyEventReceiver;
 
-    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
+    CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-//    @Override
-//    public void execute() {
-//        super.execute();
-//        log.info("CEP topology event receiver thread started");
-//    }
-
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@@ -63,7 +55,7 @@
                 if (!initialized) {
                     try {
                         TopologyManager.acquireReadLock();
-                        log.debug("Complete topology event received to fault handling window processor.");
+                        log.info("Complete topology event received to fault handling window processor.");
                         CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
                         initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
                     } catch (Exception e) {
@@ -81,7 +73,11 @@
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
                 faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
-                log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent
+                            .getMemberId());
+
+                }
             }
         });
 
@@ -94,8 +90,14 @@
                 // do not put this member if we have already received a health event
                 faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
                         System.currentTimeMillis());
-                log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                }
             }
         });
     }
+
+    void destroy() {
+        topologyEventReceiver.terminate();
+    }
 }
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 7aec0d5..5b77723 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -20,7 +20,6 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -45,26 +44,26 @@
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CEP window processor to handle faulty member instances. This window processor is responsible for
  * publishing MemberFault event if health stats are not received within a given time window.
  */
-@SiddhiExtension(namespace = "stratos",
-                 function = "faultHandling")
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
-
     private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
-
+    private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout";
+    private static final int ACTIVATE_TIMEOUT =
+            Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
     private static final int TIME_OUT = 60 * 1000;
-    public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
-    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
-
-    private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
     private ThreadBarrier threadBarrier;
@@ -77,6 +76,9 @@
 
     // Map of member id's to their last received health event time stamp
     private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+    private volatile boolean isActive;
+    private volatile boolean hasMemberTimeStampMapInitialized;
+    private long startTime = System.currentTimeMillis();
 
     // Event receiver to receive topology events published by cloud-controller
     private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
@@ -101,7 +103,11 @@
      *
      * @param event Event received by Siddhi.
      */
-    protected void addDataToMap(InEvent event) {
+    private void addDataToMap(InEvent event) {
+        if (!isActive) {
+            log.info("Received first event. Marking fault handling window processor as active");
+            isActive = true;
+        }
         String id = (String) event.getData()[memberIdAttrIndex];
         //checking whether this member is the topology.
         //sometimes there can be a delay between publishing member terminated events
@@ -143,7 +149,6 @@
      * @param topology Topology model object
      */
     boolean loadTimeStampMapFromTopology(Topology topology) {
-
         long currentTimeStamp = System.currentTimeMillis();
         if (topology == null || topology.getServices() == null) {
             return false;
@@ -164,10 +169,10 @@
                 }
             }
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
+        hasMemberTimeStampMapInitialized = true;
+        if (log.isInfoEnabled()) {
+            log.info("Member timestamps were successfully loaded from the topology: [timestamps] " +
+                    Arrays.toString(memberTimeStampMap.entrySet().toArray()));
         }
         return true;
     }
@@ -222,7 +227,19 @@
     @Override
     public void run() {
         try {
+            // wait until the first event OR given timeout to expire in order to activate this window processor
+            // this is to prevent false positives at the CEP startup
+            if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) {
+                log.info("Activation wait timeout has expired. Marking fault handling window processor as active");
+                isActive = true;
+            }
+            // do not process events until memberTimeStampMap is initialized and window processor is activated
+            // memberTimeStampMap will be initialized only after receiving the complete topology event
+            if (!(isActive && hasMemberTimeStampMapInitialized)) {
+                return;
+            }
             threadBarrier.pass();
+
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
@@ -255,7 +272,7 @@
 
     @Override
     protected Object[] currentState() {
-        return new Object[] { window.currentState() };
+        return new Object[]{window.currentState()};
     }
 
     @Override
@@ -267,8 +284,8 @@
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
-            AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
-
+                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext
+                                siddhiContext) {
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
         } else {
@@ -286,17 +303,13 @@
         MemberFaultEventMap
                 .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-//        executorService = StratosThreadPool
-//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
-//        cepTopologyEventReceiver.setExecutorService(executorService);
-//        cepTopologyEventReceiver.execute();
-
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()) {
-            log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
-                    ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
-                    ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " +
+                            "[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " +
+                            "[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex,
+                    siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT));
         }
     }
 
@@ -329,20 +342,11 @@
     @Override
     public void destroy() {
         // terminate topology listener thread
-//        cepTopologyEventReceiver.terminate();
+        cepTopologyEventReceiver.destroy();
         window = null;
-
-        // Shutdown executor service
-        if (executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down cep extension executor service", e);
-            }
-        }
     }
 
-    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+    ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
 }
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 2696271..51e4850 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -35,24 +35,16 @@
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
 public class CEPTopologyEventReceiver {
-
     private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
-
     private FaultHandlingWindowProcessor faultHandler;
     private TopologyEventReceiver topologyEventReceiver;
 
-    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
+    CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-//    @Override
-//    public void execute() {
-//        super.execute();
-//        log.info("CEP topology event receiver thread started");
-//    }
-
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@@ -63,7 +55,7 @@
                 if (!initialized) {
                     try {
                         TopologyManager.acquireReadLock();
-                        log.debug("Complete topology event received to fault handling window processor.");
+                        log.info("Complete topology event received to fault handling window processor.");
                         CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
                         initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
                     } catch (Exception e) {
@@ -81,7 +73,11 @@
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
                 faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
-                log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent
+                            .getMemberId());
+
+                }
             }
         });
 
@@ -94,8 +90,14 @@
                 // do not put this member if we have already received a health event
                 faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
                         System.currentTimeMillis());
-                log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                }
             }
         });
     }
+
+    void destroy() {
+        topologyEventReceiver.terminate();
+    }
 }
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 2abfda1..0c2ea92 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -14,7 +14,6 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -39,26 +38,26 @@
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CEP window processor to handle faulty member instances. This window processor is responsible for
  * publishing MemberFault event if health stats are not received within a given time window.
  */
-@SiddhiExtension(namespace = "stratos",
-                 function = "faultHandling")
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
-
+    private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout";
+    private static final int ACTIVATE_TIMEOUT =
+            Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
     private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
-
     private static final int TIME_OUT = 60 * 1000;
-    public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
-    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
-
-    private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
     private ThreadBarrier threadBarrier;
@@ -71,6 +70,9 @@
 
     // Map of member id's to their last received health event time stamp
     private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+    private volatile boolean isActive;
+    private volatile boolean hasMemberTimeStampMapInitialized;
+    private long startTime = System.currentTimeMillis();
 
     // Event receiver to receive topology events published by cloud-controller
     private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
@@ -95,7 +97,11 @@
      *
      * @param event Event received by Siddhi.
      */
-    protected void addDataToMap(InEvent event) {
+    private void addDataToMap(InEvent event) {
+        if (!isActive) {
+            log.info("Received first event. Marking fault handling window processor as active");
+            isActive = true;
+        }
         String id = (String) event.getData()[memberIdAttrIndex];
         //checking whether this member is the topology.
         //sometimes there can be a delay between publishing member terminated events
@@ -137,7 +143,6 @@
      * @param topology Topology model object
      */
     boolean loadTimeStampMapFromTopology(Topology topology) {
-
         long currentTimeStamp = System.currentTimeMillis();
         if (topology == null || topology.getServices() == null) {
             return false;
@@ -158,10 +163,10 @@
                 }
             }
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
+        hasMemberTimeStampMapInitialized = true;
+        if (log.isInfoEnabled()) {
+            log.info("Member timestamps were successfully loaded from the topology: [timestamps] " +
+                    Arrays.toString(memberTimeStampMap.entrySet().toArray()));
         }
         return true;
     }
@@ -216,7 +221,19 @@
     @Override
     public void run() {
         try {
+            // wait until the first event OR given timeout to expire in order to activate this window processor
+            // this is to prevent false positives at the CEP startup
+            if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) {
+                log.info("Activation wait timeout has expired. Marking fault handling window processor as active");
+                isActive = true;
+            }
+            // do not process events until memberTimeStampMap is initialized and window processor is activated
+            // memberTimeStampMap will be initialized only after receiving the complete topology event
+            if (!(isActive && hasMemberTimeStampMapInitialized)) {
+                return;
+            }
             threadBarrier.pass();
+
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
@@ -249,7 +266,7 @@
 
     @Override
     protected Object[] currentState() {
-        return new Object[] { window.currentState() };
+        return new Object[]{window.currentState()};
     }
 
     @Override
@@ -260,7 +277,8 @@
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
-            AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext
+                                siddhiContext) {
 
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
@@ -279,17 +297,13 @@
         MemberFaultEventMap
                 .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-//        executorService = StratosThreadPool
-//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
-//        cepTopologyEventReceiver.setExecutorService(executorService);
-//        cepTopologyEventReceiver.execute();
-
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()) {
-            log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
-                    ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
-                    ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " +
+                            "[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " +
+                            "[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex,
+                    siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT));
         }
     }
 
@@ -322,20 +336,11 @@
     @Override
     public void destroy() {
         // terminate topology listener thread
-//        cepTopologyEventReceiver.terminate();
+        cepTopologyEventReceiver.destroy();
         window = null;
-
-        // Shutdown executor service
-        if (executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down cep extension executor service", e);
-            }
-        }
     }
 
-    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+    ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
 }