diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 2696271..51e4850 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -35,24 +35,16 @@
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
 public class CEPTopologyEventReceiver {
-
     private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
-
     private FaultHandlingWindowProcessor faultHandler;
     private TopologyEventReceiver topologyEventReceiver;
 
-    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
+    CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-//    @Override
-//    public void execute() {
-//        super.execute();
-//        log.info("CEP topology event receiver thread started");
-//    }
-
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@@ -63,7 +55,7 @@
                 if (!initialized) {
                     try {
                         TopologyManager.acquireReadLock();
-                        log.debug("Complete topology event received to fault handling window processor.");
+                        log.info("Complete topology event received to fault handling window processor.");
                         CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
                         initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
                     } catch (Exception e) {
@@ -81,7 +73,11 @@
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
                 faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
-                log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent
+                            .getMemberId());
+
+                }
             }
         });
 
@@ -94,8 +90,14 @@
                 // do not put this member if we have already received a health event
                 faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
                         System.currentTimeMillis());
-                log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                }
             }
         });
     }
+
+    void destroy() {
+        topologyEventReceiver.terminate();
+    }
 }
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 7aec0d5..5b77723 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -20,7 +20,6 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -45,26 +44,26 @@
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CEP window processor to handle faulty member instances. This window processor is responsible for
  * publishing MemberFault event if health stats are not received within a given time window.
  */
-@SiddhiExtension(namespace = "stratos",
-                 function = "faultHandling")
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
-
     private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
-
+    private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout";
+    private static final int ACTIVATE_TIMEOUT =
+            Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
     private static final int TIME_OUT = 60 * 1000;
-    public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
-    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
-
-    private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
     private ThreadBarrier threadBarrier;
@@ -77,6 +76,9 @@
 
     // Map of member id's to their last received health event time stamp
     private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+    private volatile boolean isActive;
+    private volatile boolean hasMemberTimeStampMapInitialized;
+    private long startTime = System.currentTimeMillis();
 
     // Event receiver to receive topology events published by cloud-controller
     private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
@@ -101,7 +103,11 @@
      *
      * @param event Event received by Siddhi.
      */
-    protected void addDataToMap(InEvent event) {
+    private void addDataToMap(InEvent event) {
+        if (!isActive) {
+            log.info("Received first event. Marking fault handling window processor as active");
+            isActive = true;
+        }
         String id = (String) event.getData()[memberIdAttrIndex];
         //checking whether this member is the topology.
         //sometimes there can be a delay between publishing member terminated events
@@ -143,7 +149,6 @@
      * @param topology Topology model object
      */
     boolean loadTimeStampMapFromTopology(Topology topology) {
-
         long currentTimeStamp = System.currentTimeMillis();
         if (topology == null || topology.getServices() == null) {
             return false;
@@ -164,10 +169,10 @@
                 }
             }
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
+        hasMemberTimeStampMapInitialized = true;
+        if (log.isInfoEnabled()) {
+            log.info("Member timestamps were successfully loaded from the topology: [timestamps] " +
+                    Arrays.toString(memberTimeStampMap.entrySet().toArray()));
         }
         return true;
     }
@@ -222,7 +227,19 @@
     @Override
     public void run() {
         try {
+            // wait until the first event OR given timeout to expire in order to activate this window processor
+            // this is to prevent false positives at the CEP startup
+            if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) {
+                log.info("Activation wait timeout has expired. Marking fault handling window processor as active");
+                isActive = true;
+            }
+            // do not process events until memberTimeStampMap is initialized and window processor is activated
+            // memberTimeStampMap will be initialized only after receiving the complete topology event
+            if (!(isActive && hasMemberTimeStampMapInitialized)) {
+                return;
+            }
             threadBarrier.pass();
+
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
@@ -255,7 +272,7 @@
 
     @Override
     protected Object[] currentState() {
-        return new Object[] { window.currentState() };
+        return new Object[]{window.currentState()};
     }
 
     @Override
@@ -267,8 +284,8 @@
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
-            AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
-
+                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext
+                                siddhiContext) {
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
         } else {
@@ -286,17 +303,13 @@
         MemberFaultEventMap
                 .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-//        executorService = StratosThreadPool
-//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
-//        cepTopologyEventReceiver.setExecutorService(executorService);
-//        cepTopologyEventReceiver.execute();
-
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()) {
-            log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
-                    ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
-                    ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " +
+                            "[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " +
+                            "[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex,
+                    siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT));
         }
     }
 
@@ -329,20 +342,11 @@
     @Override
     public void destroy() {
         // terminate topology listener thread
-//        cepTopologyEventReceiver.terminate();
+        cepTopologyEventReceiver.destroy();
         window = null;
-
-        // Shutdown executor service
-        if (executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down cep extension executor service", e);
-            }
-        }
     }
 
-    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+    ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
 }
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 2696271..51e4850 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -35,24 +35,16 @@
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
 public class CEPTopologyEventReceiver {
-
     private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
-
     private FaultHandlingWindowProcessor faultHandler;
     private TopologyEventReceiver topologyEventReceiver;
 
-    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
+    CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-//    @Override
-//    public void execute() {
-//        super.execute();
-//        log.info("CEP topology event receiver thread started");
-//    }
-
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@@ -63,7 +55,7 @@
                 if (!initialized) {
                     try {
                         TopologyManager.acquireReadLock();
-                        log.debug("Complete topology event received to fault handling window processor.");
+                        log.info("Complete topology event received to fault handling window processor.");
                         CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
                         initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
                     } catch (Exception e) {
@@ -81,7 +73,11 @@
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
                 faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
-                log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent
+                            .getMemberId());
+
+                }
             }
         });
 
@@ -94,8 +90,14 @@
                 // do not put this member if we have already received a health event
                 faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
                         System.currentTimeMillis());
-                log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+                }
             }
         });
     }
+
+    void destroy() {
+        topologyEventReceiver.terminate();
+    }
 }
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 2abfda1..0c2ea92 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -14,7 +14,6 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -39,26 +38,26 @@
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CEP window processor to handle faulty member instances. This window processor is responsible for
  * publishing MemberFault event if health stats are not received within a given time window.
  */
-@SiddhiExtension(namespace = "stratos",
-                 function = "faultHandling")
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
-
+    private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout";
+    private static final int ACTIVATE_TIMEOUT =
+            Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
     private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
-
     private static final int TIME_OUT = 60 * 1000;
-    public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
-    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
-
-    private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
     private ThreadBarrier threadBarrier;
@@ -71,6 +70,9 @@
 
     // Map of member id's to their last received health event time stamp
     private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+    private volatile boolean isActive;
+    private volatile boolean hasMemberTimeStampMapInitialized;
+    private long startTime = System.currentTimeMillis();
 
     // Event receiver to receive topology events published by cloud-controller
     private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
@@ -95,7 +97,11 @@
      *
      * @param event Event received by Siddhi.
      */
-    protected void addDataToMap(InEvent event) {
+    private void addDataToMap(InEvent event) {
+        if (!isActive) {
+            log.info("Received first event. Marking fault handling window processor as active");
+            isActive = true;
+        }
         String id = (String) event.getData()[memberIdAttrIndex];
         //checking whether this member is the topology.
         //sometimes there can be a delay between publishing member terminated events
@@ -137,7 +143,6 @@
      * @param topology Topology model object
      */
     boolean loadTimeStampMapFromTopology(Topology topology) {
-
         long currentTimeStamp = System.currentTimeMillis();
         if (topology == null || topology.getServices() == null) {
             return false;
@@ -158,10 +163,10 @@
                 }
             }
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
+        hasMemberTimeStampMapInitialized = true;
+        if (log.isInfoEnabled()) {
+            log.info("Member timestamps were successfully loaded from the topology: [timestamps] " +
+                    Arrays.toString(memberTimeStampMap.entrySet().toArray()));
         }
         return true;
     }
@@ -216,7 +221,19 @@
     @Override
     public void run() {
         try {
+            // wait until the first event OR given timeout to expire in order to activate this window processor
+            // this is to prevent false positives at the CEP startup
+            if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) {
+                log.info("Activation wait timeout has expired. Marking fault handling window processor as active");
+                isActive = true;
+            }
+            // do not process events until memberTimeStampMap is initialized and window processor is activated
+            // memberTimeStampMap will be initialized only after receiving the complete topology event
+            if (!(isActive && hasMemberTimeStampMapInitialized)) {
+                return;
+            }
             threadBarrier.pass();
+
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
@@ -249,7 +266,7 @@
 
     @Override
     protected Object[] currentState() {
-        return new Object[] { window.currentState() };
+        return new Object[]{window.currentState()};
     }
 
     @Override
@@ -260,7 +277,8 @@
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
-            AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext
+                                siddhiContext) {
 
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
@@ -279,17 +297,13 @@
         MemberFaultEventMap
                 .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-//        executorService = StratosThreadPool
-//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
-//        cepTopologyEventReceiver.setExecutorService(executorService);
-//        cepTopologyEventReceiver.execute();
-
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()) {
-            log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
-                    ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
-                    ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " +
+                            "[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " +
+                            "[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex,
+                    siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT));
         }
     }
 
@@ -322,20 +336,11 @@
     @Override
     public void destroy() {
         // terminate topology listener thread
-//        cepTopologyEventReceiver.terminate();
+        cepTopologyEventReceiver.destroy();
         window = null;
-
-        // Shutdown executor service
-        if (executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down cep extension executor service", e);
-            }
-        }
     }
 
-    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+    ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
 }
