[EAGLE-1068] Upgrade Siddhi dependency to v5.1.4

<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->

## Purpose
Purpose of this pull request is to upgrade the [Siddhi](https://mvnrepository.com/artifact/io.siddhi) dependency version to `v5.1.4`. With this, it'll also;
- Fix `alert-core` and tests to support new Siddhi APIs.
- Fix existing custom Siddhi extensions to support new Siddhi APIs.
- Improve extension loading (use annotation support instead of `siddhiext`).
- Add the following dependencies (since they are now separated from Siddhi core);
```xml
<dependency>
    <groupId>io.siddhi.extension.execution.regex</groupId>
    <artifactId>siddhi-execution-regex</artifactId>
    <version>5.0.5</version>
</dependency>

<dependency>
    <groupId>io.siddhi.extension.execution.string</groupId>
    <artifactId>siddhi-execution-string</artifactId>
    <version>5.0.5</version>
</dependency>
```

## Remarks
- Fixes https://issues.apache.org/jira/browse/EAGLE-1068
- https://issues.apache.org/jira/browse/EAGLE-1065

---
Contribution Tasks:
 - [x] Make sure the PR title is formatted like:
   `[EAGLE-<Jira issue #>] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
       Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `<Jira issue #>` in the title with the actual Jira issue
       number, if there is one.
 - [ ] If this contribution is large, please file an Apache
       [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt).

---

Author: Grainier <grainier@wso2.com>

Closes #1003 from grainier/EAGLE-1068.
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
index 2210cbd..49aaebf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
@@ -43,12 +43,12 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-extension-string</artifactId>
+            <groupId>io.siddhi.extension.execution.string</groupId>
+            <artifactId>siddhi-execution-string</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java
index e047662..5fcee33 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java
@@ -17,32 +17,66 @@
 
 package org.apache.eagle.alert.siddhiext;
 
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
 
+@Extension(
+        name = "empty",
+        namespace = "str",
+        description = "Returns whether source string is null or empty.",
+        parameters = {
+                @Parameter(name = "source",
+                        description = "Source string.",
+                        type = {DataType.STRING},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"source"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Returns whether source string is null or empty.",
+                type = {DataType.BOOL}),
+        examples = {
+                @Example(
+                        syntax = "str:empty(strAttribute) as isEmpty",
+                        description = "Returns whether strAttribute is null or empty.")
+        }
+)
 public class StringEmptyFunctionExtension extends FunctionExecutor {
     /**
-     * The initialization method for StringEmptyFunctionExtension, this method will be called before the other methods.
+     * The initialization method for StringEmptyFunctionExtension,
+     * this method will be called before the other methods.
      *
-     * @param attributeExpressionExecutors the executors of each function parameter
-     * @param executionPlanContext         the context of the execution plan
+     * @param attributeExpressionExecutors  the executors of each function parameter
+     * @param configReader                  the config reader for the Siddhi app
+     * @param siddhiQueryContext            the context of the Siddhi query
      */
     @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
+                                SiddhiQueryContext siddhiQueryContext) {
         if (attributeExpressionExecutors.length != 1) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to string:empty() function, "
+            throw new SiddhiAppValidationException("Invalid no of arguments passed to str:empty() function, "
                     + "required 1, but found " + attributeExpressionExecutors.length);
         }
 
         Attribute.Type attributeType = attributeExpressionExecutors[0].getReturnType();
         if (attributeType != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the argument of math:string() function, "
-                    + "required " + Attribute.Type.STRING
-                    + ", but found " + attributeType.toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the argument of str:empty() "
+                    + "function, required " + Attribute.Type.STRING + ", but found " + attributeType.toString());
         }
+        return null;
     }
 
     /**
@@ -53,34 +87,18 @@
      * @return the function result
      */
     @Override
-    protected Object execute(Object[] data) {
+    protected Object execute(Object[] data, State state) {
         return null;
     }
 
     @Override
-    protected Object execute(Object data) {
+    protected Object execute(Object data, State state) {
         return !(data == null || ((String) data).isEmpty());
     }
 
     @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
-
-    @Override
     public Attribute.Type getReturnType() {
         return Attribute.Type.BOOL;
     }
 
-    @Override
-    public Object[] currentState() {
-        return null;
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringListSizeFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringListSizeFunctionExtension.java
index 5d3d3ae..7ea3dfe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringListSizeFunctionExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringListSizeFunctionExtension.java
@@ -17,84 +17,91 @@
 
 package org.apache.eagle.alert.siddhiext;
 
-import org.apache.commons.collections.ListUtils;
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
 import org.apache.eagle.alert.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
 
-import java.util.List;
-
+@Extension(
+        name = "listSize",
+        namespace = "str",
+        description = "Returns the size of a string list",
+        parameters = {
+                @Parameter(name = "source.list",
+                        description = "Source string (List).",
+                        type = {DataType.STRING},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"source.list"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Returns the size of a string list",
+                type = {DataType.INT}),
+        examples = {
+                @Example(
+                        syntax = "str:listSize(stringList)",
+                        description = "Returns the size of the stringList")
+        }
+)
 public class StringListSizeFunctionExtension extends FunctionExecutor {
     private static final Logger LOG = LoggerFactory.getLogger(StringListSizeFunctionExtension.class);
 
     /**
-     * The initialization method for StringListSizeFunctionExtension, this method will be called before the other methods.
+     * The initialization method for StringListSizeFunctionExtension,
+     * this method will be called before the other methods.
      *
-     * @param attributeExpressionExecutors the executors of each function parameter
-     * @param executionPlanContext         the context of the execution plan
+     * @param attributeExpressionExecutors  the executors of each function parameter
+     * @param configReader                  the config reader for the Siddhi app
+     * @param siddhiQueryContext            the context of the Siddhi query
      */
     @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
+                                SiddhiQueryContext siddhiQueryContext) {
         if (attributeExpressionExecutors.length != 1) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to string:listSize() function, "
+            throw new SiddhiAppValidationException("Invalid no of arguments passed to str:listSize() function, "
                     + "required 1, but found " + attributeExpressionExecutors.length);
         }
 
         Attribute.Type attributeType = attributeExpressionExecutors[0].getReturnType();
         if (attributeType != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the argument of string:listSize() function, "
-                    + "required " + Attribute.Type.STRING
-                    + ", but found " + attributeType.toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the argument of str:listSize() "
+                    + "function, required " + Attribute.Type.STRING + ", but found " + attributeType.toString());
         }
-    }
-
-    /**
-     * The main execution method which will be called upon event arrival.
-     * when there are more than one function parameter
-     * This method calculates subtraction of two List Of Strings
-     * Each String is a jobs string needs to be loaded
-     * @param data the runtime values of function parameters
-     * @return the function result
-     */
-    @Override
-    protected Object execute(Object[] data) {
         return null;
     }
 
     @Override
-    protected Object execute(Object data) {
+    protected Object execute(Object[] data, State state) {
+        return null;
+    }
+
+    @Override
+    protected Object execute(Object data, State state) {
         try {
-            return JsonUtils.jsonStringToList((String)data).size();
+            return JsonUtils.jsonStringToList((String) data).size();
         } catch (Exception e) {
-            LOG.warn("exception found {}", e);
+            LOG.warn("exception found {0}", e);
             return 0;
         }
     }
 
     @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
-
-    @Override
     public Attribute.Type getReturnType() {
         return Attribute.Type.INT;
     }
 
-    @Override
-    public Object[] currentState() {
-        return null;
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java
index 9d26adf..1085089 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java
@@ -17,43 +17,78 @@
 
 package org.apache.eagle.alert.siddhiext;
 
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
 import org.apache.commons.collections.ListUtils;
 import org.apache.eagle.alert.utils.JsonUtils;
-import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
+@Extension(
+        name = "subtract",
+        namespace = "str",
+        description = "Returns subtraction of two list of strings.",
+        parameters = {
+                @Parameter(name = "ths",
+                        description = "Source list.",
+                        type = {DataType.STRING},
+                        dynamic = true),
+                @Parameter(name = "rhs",
+                        description = "List to subtract.",
+                        type = {DataType.STRING},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"ths", "rhs"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Returns subtraction of two list of strings.",
+                type = {DataType.STRING}),
+        examples = {
+                @Example(
+                        syntax = "str:subtract(ths, rhs) as subStr",
+                        description = "Returns subtraction of two list of strings (ths - rhs).")
+        }
+)
 public class StringSubtractFunctionExtension extends FunctionExecutor {
     private static final Logger LOG = LoggerFactory.getLogger(StringSubtractFunctionExtension.class);
 
     /**
-     * The initialization method for StringSubtractFunctionExtension, this method will be called before the other methods.
+     * The initialization method for StringSubtractFunctionExtension,
+     * this method will be called before the other methods.
      *
-     * @param attributeExpressionExecutors the executors of each function parameter
-     * @param executionPlanContext         the context of the execution plan
+     * @param attributeExpressionExecutors  the executors of each function parameter
+     * @param configReader                  the config reader for the Siddhi app
+     * @param siddhiQueryContext            the context of the Siddhi query
      */
     @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
+                                SiddhiQueryContext siddhiQueryContext) {
         if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to string:subtract() function, "
+            throw new SiddhiAppValidationException("Invalid no of arguments passed to str:subtract() function, "
                     + "required 2, but found " + attributeExpressionExecutors.length);
         }
 
         Attribute.Type attributeType = attributeExpressionExecutors[0].getReturnType();
         if (attributeType != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the argument of string:subtract() function, "
-                    + "required " + Attribute.Type.STRING
-                    + ", but found " + attributeType.toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the argument of str:subtract() "
+                    + "function, required " + Attribute.Type.STRING + ", but found " + attributeType.toString());
         }
+        return null;
     }
 
     /**
@@ -65,42 +100,26 @@
      * @return the function result
      */
     @Override
-    protected Object execute(Object[] data) {
+    protected Object execute(Object[] data, State state) {
         try {
             List<String> ths = JsonUtils.jsonStringToList((String) data[0]);
             List<String> rhs = JsonUtils.jsonStringToList((String) data[1]);
 
             return org.apache.commons.lang.StringUtils.join(ListUtils.subtract(ths, rhs), "\n");
         } catch (Exception e) {
-            LOG.warn("exception found {}", e);
+            LOG.warn("exception found {0}", e);
             return null;
         }
     }
 
     @Override
-    protected Object execute(Object data) {
+    protected Object execute(Object data, State state) {
         return null;
     }
 
     @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
-
-    @Override
     public Attribute.Type getReturnType() {
         return Attribute.Type.STRING;
     }
 
-    @Override
-    public Object[] currentState() {
-        return null;
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
deleted file mode 100644
index 7176611..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension
-subtract=org.apache.eagle.alert.siddhiext.StringSubtractFunctionExtension
-listSize=org.apache.eagle.alert.siddhiext.StringListSizeFunctionExtension
\ No newline at end of file
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java
index 6cb3696..02b5bd0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java
@@ -21,12 +21,12 @@
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.EventPrinter;
 
 import java.util.concurrent.Semaphore;
 
@@ -37,9 +37,9 @@
     public void testStringListSize() throws Exception {
         Semaphore semp = new Semaphore(1);
         String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
-                " from log select string:listSize(switchLabel) as alertKey insert into output; ";
+                " from log select str:listSize(switchLabel) as alertKey insert into output; ";
         SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
+        SiddhiAppRuntime runtime = manager.createSiddhiAppRuntime(ql);
         runtime.addCallback("output", new StreamCallback() {
             @Override
             public void receive(Event[] events) {
@@ -73,10 +73,10 @@
                 " from a = log[resource == \"hadoop.namenode.namenodeinfo.corruptfiles\"],\n" +
                 "b = log[component == a.component and resource == a.resource and host == a.host and a.value != b.value]\n" +
                 "select b.site as site, b.host as host, b.component as component, b.resource as resource, " +
-                "b.timestamp as timestamp, string:listSize(b.value) as newMissingBlocksNumber, string:listSize(a.value) as oldMissingBlocksNumber, string:subtract(b.value, a.value) as missingBlocks\n" +
+                "b.timestamp as timestamp, str:listSize(b.value) as newMissingBlocksNumber, str:listSize(a.value) as oldMissingBlocksNumber, str:subtract(b.value, a.value) as missingBlocks\n" +
                 "insert into output;";
         SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
+        SiddhiAppRuntime runtime = manager.createSiddhiAppRuntime(ql);
         runtime.addCallback("output", new StreamCallback() {
             @Override
             public void receive(Event[] events) {
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
index 4a31c69..7d3c509 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
@@ -20,12 +20,12 @@
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.EventPrinter;
 
 import java.util.concurrent.Semaphore;
 
@@ -36,9 +36,9 @@
     public void testStringSubtract() throws Exception {
         Semaphore semp = new Semaphore(1);
         String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
-                " from log select string:subtract(switchLabel, message) as alertKey insert into output; ";
+                " from log select str:subtract(switchLabel, message) as alertKey insert into output; ";
         SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
+        SiddhiAppRuntime runtime = manager.createSiddhiAppRuntime(ql);
         runtime.addCallback("output", new StreamCallback() {
             @Override
             public void receive(Event[] events) {
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
deleted file mode 100644
index 7176611..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension
-subtract=org.apache.eagle.alert.siddhiext.StringSubtractFunctionExtension
-listSize=org.apache.eagle.alert.siddhiext.StringListSizeFunctionExtension
\ No newline at end of file
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
index fbc2080..0725e47 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
@@ -65,7 +65,7 @@
             <artifactId>swagger-jaxrs</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-core</artifactId>
         </dependency>
     </dependencies>
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
index 5d7eeb1..e43846c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
@@ -30,7 +30,7 @@
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.SiddhiManager;
+import io.siddhi.core.SiddhiManager;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -120,7 +120,7 @@
             // now evaluate
             try {
                 SiddhiManager sm = new SiddhiManager();
-                sm.createExecutionPlanRuntime(builder.toString());
+                sm.createSiddhiAppRuntime(builder.toString());
             } catch (Exception e) {
                 LOG.error(String.format("siddhi creation failed! %s ", builder.toString()), e);
                 state.appendPolicyValidation(pd.getName(), e.getMessage());
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index 40cbdcf..85d1fc2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -57,24 +57,24 @@
             <artifactId>archaius-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-query-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-query-compiler</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-extension-regex</artifactId>
+            <groupId>io.siddhi.extension.execution.regex</groupId>
+            <artifactId>siddhi-execution-regex</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-extension-string</artifactId>
+            <groupId>io.siddhi.extension.execution.string</groupId>
+            <artifactId>siddhi-execution-string</artifactId>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
index 25ebfca..3b42b28 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
@@ -23,8 +23,8 @@
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
 
 /**
  * Created on 8/2/16.
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
index a732e66..edc032f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
@@ -23,8 +23,8 @@
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.AbstractDefinition;
+import io.siddhi.query.api.definition.Attribute;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index 628b2e4..d469a38 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -26,16 +26,16 @@
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.stream.input.InputHandler;
 
 import java.util.List;
 import java.util.Map;
 
 public class SiddhiPolicyHandler implements PolicyStreamHandler {
     private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class);
-    private ExecutionPlanRuntime executionRuntime;
+    private SiddhiAppRuntime executionRuntime;
     private SiddhiManager siddhiManager;
     private Map<String, StreamDefinition> sds;
     private PolicyDefinition policy;
@@ -59,7 +59,7 @@
         this.siddhiManager = new SiddhiManager();
         String plan = generateExecutionPlan(policy, sds);
         try {
-            this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan);
+            this.executionRuntime = siddhiManager.createSiddhiAppRuntime(plan);
             LOG.info("Created siddhi runtime {}", executionRuntime.getName());
         } catch (Exception parserException) {
             LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n", context.getPolicyDefinition().getName(), plan, parserException);
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
index 7ecc36f..722da37 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
@@ -19,7 +19,7 @@
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.wso2.siddhi.query.api.ExecutionPlan;
+import io.siddhi.query.api.SiddhiApp;
 
 import java.util.List;
 import java.util.Map;
@@ -44,7 +44,7 @@
     /**
      * Execution plan.
      */
-    private ExecutionPlan internalExecutionPlan;
+    private SiddhiApp internalSiddhiApp;
 
     private String executionPlanDesc;
 
@@ -58,12 +58,12 @@
         this.executionPlanSource = executionPlanSource;
     }
 
-    public ExecutionPlan getInternalExecutionPlan() {
-        return internalExecutionPlan;
+    public SiddhiApp getInternalSiddhiApp() {
+        return internalSiddhiApp;
     }
 
-    public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
-        this.internalExecutionPlan = internalExecutionPlan;
+    public void setInternalSiddhiApp(SiddhiApp internalSiddhiApp) {
+        this.internalSiddhiApp = internalSiddhiApp;
     }
 
     public String getExecutionPlanDesc() {
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
index 4e6901d..6092161 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
@@ -1,376 +1,409 @@
-/*

- * Licensed to the Apache Software Foundation (ASF) under one or more

- * contributor license agreements.  See the NOTICE file distributed with

- * this work for additional information regarding copyright ownership.

- * The ASF licenses this file to You under the Apache License, Version 2.0

- * (the "License"); you may not use this file except in compliance with

- * the License.  You may obtain a copy of the License at

- * <p>

- * http://www.apache.org/licenses/LICENSE-2.0

- * <p>

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-package org.apache.eagle.alert.engine.interpreter;

-

-import com.google.common.base.Preconditions;

-import org.apache.commons.collections.ListUtils;

-import org.apache.eagle.alert.engine.coordinator.StreamColumn;

-import org.apache.eagle.alert.engine.coordinator.StreamPartition;

-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;

-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;

-import org.slf4j.Logger;

-import org.slf4j.LoggerFactory;

-import org.wso2.siddhi.core.exception.DefinitionNotExistException;

-import org.wso2.siddhi.query.api.ExecutionPlan;

-import org.wso2.siddhi.query.api.exception.DuplicateDefinitionException;

-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;

-import org.wso2.siddhi.query.api.execution.ExecutionElement;

-import org.wso2.siddhi.query.api.execution.query.Query;

-import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;

-import org.wso2.siddhi.query.api.execution.query.input.handler.Window;

-import org.wso2.siddhi.query.api.execution.query.input.state.*;

-import org.wso2.siddhi.query.api.execution.query.input.stream.*;

-import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;

-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;

-import org.wso2.siddhi.query.api.execution.query.selection.Selector;

-import org.wso2.siddhi.query.api.expression.Expression;

-import org.wso2.siddhi.query.api.expression.Variable;

-import org.wso2.siddhi.query.api.expression.condition.Compare;

-import org.wso2.siddhi.query.api.expression.constant.IntConstant;

-import org.wso2.siddhi.query.api.expression.constant.LongConstant;

-import org.wso2.siddhi.query.api.expression.constant.TimeConstant;

-import org.wso2.siddhi.query.compiler.SiddhiCompiler;

-

-import java.util.*;

-import java.util.stream.Collectors;

-

-class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {

-

-    private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class);

-

-    /**

-     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.

-     */

-    private static final String WINDOW_EXTERNAL_TIME = "externalTime";

-

-    private final String executionPlan;

-    private final Map<String,List<StreamColumn>> effectiveInputStreams;

-    private final Map<String, String> effectiveInputStreamsAlias;

-    private final Map<String,List<StreamColumn>> effectiveOutputStreams;

-    private final Map<String,StreamPartition> effectivePartitions;

-    private final PolicyExecutionPlan policyExecutionPlan;

-

-    public PolicyExecutionPlannerImpl(String executionPlan) throws Exception {

-        this.executionPlan = executionPlan;

-        this.effectiveInputStreams = new HashMap<>();

-        this.effectiveInputStreamsAlias = new HashMap<>();

-        this.effectiveOutputStreams = new HashMap<>();

-        this.effectivePartitions = new HashMap<>();

-        this.policyExecutionPlan = doParse();

-    }

-

-    @Override

-    public PolicyExecutionPlan getExecutionPlan() {

-        return policyExecutionPlan;

-    }

-

-    private PolicyExecutionPlan doParse()  throws Exception {

-        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();

-        try {

-            ExecutionPlan executionPlan = SiddhiCompiler.parse(this.executionPlan);

-

-            policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());

-

-            // Set current execution plan as valid

-            policyExecutionPlan.setExecutionPlanSource(this.executionPlan);

-            policyExecutionPlan.setInternalExecutionPlan(executionPlan);

-

-

-            // Go through execution element

-            for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {

-                // -------------

-                // Explain Query

-                // -------------

-                if (executionElement instanceof Query) {

-                    // -----------------------

-                    // Query Level Variables

-                    // -----------------------

-                    InputStream inputStream = ((Query) executionElement).getInputStream();

-                    Selector selector = ((Query) executionElement).getSelector();

-                    Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>();

-

-                    // Inputs stream definitions

-                    for (String streamId : inputStream.getUniqueStreamIds()) {

-                        if (!effectiveInputStreams.containsKey(streamId)) {

-                            org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);

-                            if (streamDefinition != null) {

-                                effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());

-                            } else {

-                                effectiveInputStreams.put(streamId, null);

-                            }

-                        }

-                    }

-

-                    // Window Spec and Partition

-                    if (inputStream instanceof SingleInputStream) {

-                        retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping);

-                        retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector));

-                    } else {

-                        if (inputStream instanceof JoinInputStream) {

-                            // Only Support JOIN/INNER_JOIN Now

-                            if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) {

-                                SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream();

-                                SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream();

-

-                                retrievePartition(findStreamPartition(leftInputStream, selector));

-                                retrievePartition(findStreamPartition(rightInputStream, selector));

-                                retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping);

-                                retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping);

-

-                            } else {

-                                throw new ExecutionPlanValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN");

-                            }

-

-                            Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare();

-

-                            if (joinCondition != null) {

-                                if (joinCondition instanceof Compare) {

-                                    if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) {

-                                        Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression();

-                                        Preconditions.checkNotNull(leftExpression.getStreamId());

-                                        Preconditions.checkNotNull(leftExpression.getAttributeName());

-

-                                        StreamPartition leftPartition = new StreamPartition();

-                                        leftPartition.setType(StreamPartition.Type.GROUPBY);

-                                        leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName()));

-                                        leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));

-                                        retrievePartition(leftPartition);

-

-                                        Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression();

-                                        Preconditions.checkNotNull(rightExpression.getStreamId());

-                                        Preconditions.checkNotNull(rightExpression.getAttributeName());

-                                        StreamPartition rightPartition = new StreamPartition();

-                                        rightPartition.setType(StreamPartition.Type.GROUPBY);

-                                        rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName()));

-                                        rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));

-                                        retrievePartition(leftPartition);

-                                    } else {

-                                        throw new ExecutionPlanValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition);

-                                    }

-                                } else {

-                                    throw new ExecutionPlanValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition);

-                                }

-                            }

-                        } else if (inputStream instanceof StateInputStream) {

-                            // Group By Spec

-                            List<Variable> groupBy = selector.getGroupByList();

-                            if (groupBy.size() >= 0) {

-                                Map<String, List<Variable>> streamGroupBy = new HashMap<>();

-                                for (String streamId : inputStream.getUniqueStreamIds()) {

-                                    streamGroupBy.put(streamId, new ArrayList<>());

-                                }

-

-                                collectStreamReferenceIdMapping(((StateInputStream)inputStream).getStateElement());

-

-                                for (Variable variable : groupBy) {

-                                    // Not stream not set, then should be all streams' same field

-                                    if (variable.getStreamId() == null) {

-                                        for (String streamId : inputStream.getUniqueStreamIds()) {

-                                            streamGroupBy.get(streamId).add(variable);

-                                        }

-                                    } else {

-                                        String streamId = variable.getStreamId();

-                                        if (!this.effectiveInputStreamsAlias.containsKey(streamId)) {

-                                            streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);

-                                        } else {

-                                            streamId = this.effectiveInputStreamsAlias.get(streamId);

-                                        }

-                                        if (streamGroupBy.containsKey(streamId)) {

-                                            streamGroupBy.get(streamId).add(variable);

-                                        } else {

-                                            throw new DefinitionNotExistException(streamId);

-                                        }

-                                    }

-                                }

-                                for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) {

-                                    if (entry.getValue().size() > 0) {

-                                        StreamPartition partition = generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()])));

-                                        if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN)

-                                                || ((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.SEQUENCE)) {

-                                            if (effectivePartitions.containsKey(partition.getStreamId())) {

-                                                StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());

-                                                if (!existingPartition.equals(partition)

-                                                        && existingPartition.getType().equals(partition.getType())

-                                                        && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())) {

-                                                    partition.setSortSpec(existingPartition.getSortSpec());

-                                                }

-                                            }

-                                        }

-                                        retrievePartition(partition);

-                                    }

-                                }

-                            }

-                        }

-                    }

-

-                    // Output streams

-                    OutputStream outputStream = ((Query) executionElement).getOutputStream();

-                    effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));

-                } else {

-                    LOG.warn("Unhandled execution element: {}", executionElement.toString());

-                }

-            }

-            // Set effective input streams

-            policyExecutionPlan.setInputStreams(effectiveInputStreams);

-

-            // Set effective output streams

-            policyExecutionPlan.setOutputStreams(effectiveOutputStreams);

-

-            // Set Partitions

-            for (String streamId : effectiveInputStreams.keySet()) {

-                // Use shuffle partition by default

-                if (!effectivePartitions.containsKey(streamId)) {

-                    StreamPartition shufflePartition = new StreamPartition();

-                    shufflePartition.setStreamId(streamId);

-                    shufflePartition.setType(StreamPartition.Type.SHUFFLE);

-                    effectivePartitions.put(streamId, shufflePartition);

-                }

-            }

-            policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values()));

-        } catch (Exception ex) {

-            LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex);

-            throw ex;

-        }

-        return policyExecutionPlan;

-    }

-

-    private void collectStreamReferenceIdMapping(StateElement stateElement) {

-        if (stateElement instanceof LogicalStateElement) {

-            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement1());

-            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement2());

-        } else if (stateElement instanceof CountStateElement) {

-            collectStreamReferenceIdMapping(((CountStateElement) stateElement).getStreamStateElement());

-        } else if (stateElement instanceof EveryStateElement) {

-            collectStreamReferenceIdMapping(((EveryStateElement) stateElement).getStateElement());

-        } else if (stateElement instanceof NextStateElement) {

-            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getStateElement());

-            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getNextStateElement());

-        } else if (stateElement instanceof StreamStateElement) {

-            BasicSingleInputStream basicSingleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream();

-            this.effectiveInputStreamsAlias.put(basicSingleInputStream.getStreamReferenceId(), basicSingleInputStream.getStreamId());

-        }

-    }

-

-    private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) {

-        Preconditions.checkNotNull(variable.getStreamId(), "streamId");

-        if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) {

-            throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId());

-        } else if (streamMap.containsKey(variable.getStreamId())) {

-            return variable.getStreamId();

-        } else if (aliasMap.containsKey(variable.getStreamId())) {

-            return aliasMap.get(variable.getStreamId()).getStreamId();

-        } else {

-            throw new DefinitionNotExistException(variable.getStreamId());

-        }

-    }

-

-    private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) {

-        // Window Spec

-        List<Window> windows = new ArrayList<>();

-        for (StreamHandler streamHandler : inputStream.getStreamHandlers()) {

-            if (streamHandler instanceof Window) {

-                windows.add((Window) streamHandler);

-            }

-        }

-

-        // Group By Spec

-        List<Variable> groupBy = selector.getGroupByList();

-        if (windows.size() > 0 || groupBy.size() >= 0) {

-            return generatePartition(inputStream.getStreamId(), windows, groupBy);

-        } else {

-            return null;

-        }

-    }

-

-    private void retrievePartition(StreamPartition partition) {

-        if (partition == null) {

-            return;

-        }

-

-        if (!effectivePartitions.containsKey(partition.getStreamId())) {

-            effectivePartitions.put(partition.getStreamId(), partition);

-        } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) {

-            StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());

-            // If same Type & Columns but different sort spec, then use larger

-            if (existingPartition.getType().equals(partition.getType())

-                && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())

-                && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis()

-                || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) {

-                effectivePartitions.put(partition.getStreamId(), partition);

-            } else {

-                // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode

-                throw new ExecutionPlanValidationException("You have incompatible partitions on stream " + partition.getStreamId()

-                    + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + "");

-            }

-        }

-    }

-

-    private void retrieveAliasForQuery(SingleInputStream inputStream, Map<String, SingleInputStream> aliasStreamMapping) {

-        if (inputStream.getStreamReferenceId() != null) {

-            if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) {

-                throw new ExecutionPlanValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream);

-            } else {

-                aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream);

-            }

-        }

-    }

-

-    private StreamPartition generatePartition(String streamId, List<Window> windows, List<Variable> groupBy) {

-        StreamPartition partition = new StreamPartition();

-        partition.setStreamId(streamId);

-        StreamSortSpec sortSpec = null;

-        if (windows != null && windows.size() > 0) {

-            for (Window window : windows) {

-                if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {

-                    sortSpec = new StreamSortSpec();

-                    sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window));

-                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5);

-                }

-            }

-        }

-        partition.setSortSpec(sortSpec);

-        if (groupBy != null && groupBy.size() > 0) {

-            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));

-            partition.setType(StreamPartition.Type.GROUPBY);

-        } else {

-            partition.setType(StreamPartition.Type.SHUFFLE);

-        }

-        return partition;

-    }

-

-    private static int getExternalTimeWindowSize(Window window) {

-        Expression windowSize = window.getParameters()[1];

-        if (windowSize instanceof TimeConstant) {

-            return ((TimeConstant) windowSize).getValue().intValue();

-        } else if (windowSize instanceof IntConstant) {

-            return ((IntConstant) windowSize).getValue();

-        } else if (windowSize instanceof LongConstant) {

-            return ((LongConstant) windowSize).getValue().intValue();

-        } else {

-            throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString());

-        }

-    }

-

-    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {

-        return outputAttributeList.stream().map(outputAttribute -> {

-            StreamColumn streamColumn = new StreamColumn();

-            streamColumn.setName(outputAttribute.getRename());

-            streamColumn.setDescription(outputAttribute.getExpression().toString());

-            return streamColumn;

-        }).collect(Collectors.toList());

-    }

-}

+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.google.common.base.Preconditions;
+import io.siddhi.core.exception.DefinitionNotExistException;
+import io.siddhi.query.api.SiddhiApp;
+import io.siddhi.query.api.exception.DuplicateDefinitionException;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
+import io.siddhi.query.api.execution.ExecutionElement;
+import io.siddhi.query.api.execution.query.Query;
+import io.siddhi.query.api.execution.query.input.handler.StreamHandler;
+import io.siddhi.query.api.execution.query.input.handler.Window;
+import io.siddhi.query.api.execution.query.input.state.CountStateElement;
+import io.siddhi.query.api.execution.query.input.state.EveryStateElement;
+import io.siddhi.query.api.execution.query.input.state.LogicalStateElement;
+import io.siddhi.query.api.execution.query.input.state.NextStateElement;
+import io.siddhi.query.api.execution.query.input.state.StateElement;
+import io.siddhi.query.api.execution.query.input.state.StreamStateElement;
+import io.siddhi.query.api.execution.query.input.stream.BasicSingleInputStream;
+import io.siddhi.query.api.execution.query.input.stream.InputStream;
+import io.siddhi.query.api.execution.query.input.stream.JoinInputStream;
+import io.siddhi.query.api.execution.query.input.stream.SingleInputStream;
+import io.siddhi.query.api.execution.query.input.stream.StateInputStream;
+import io.siddhi.query.api.execution.query.output.stream.OutputStream;
+import io.siddhi.query.api.execution.query.selection.OutputAttribute;
+import io.siddhi.query.api.execution.query.selection.Selector;
+import io.siddhi.query.api.expression.Expression;
+import io.siddhi.query.api.expression.Variable;
+import io.siddhi.query.api.expression.condition.Compare;
+import io.siddhi.query.api.expression.constant.IntConstant;
+import io.siddhi.query.api.expression.constant.LongConstant;
+import io.siddhi.query.api.expression.constant.TimeConstant;
+import io.siddhi.query.compiler.SiddhiCompiler;
+import io.siddhi.query.compiler.exception.SiddhiParserException;
+import org.apache.commons.collections.ListUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class);
+
+    /**
+     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
+     */
+    private static final String WINDOW_EXTERNAL_TIME = "externalTime";
+
+    private final String executionPlan;
+    private final Map<String,List<StreamColumn>> effectiveInputStreams;
+    private final Map<String, String> effectiveInputStreamsAlias;
+    private final Map<String,List<StreamColumn>> effectiveOutputStreams;
+    private final Map<String,StreamPartition> effectivePartitions;
+    private final PolicyExecutionPlan policyExecutionPlan;
+
+    public PolicyExecutionPlannerImpl(String executionPlan) throws Exception {
+        this.executionPlan = validateAndGet(executionPlan);
+        this.effectiveInputStreams = new HashMap<>();
+        this.effectiveInputStreamsAlias = new HashMap<>();
+        this.effectiveOutputStreams = new HashMap<>();
+        this.effectivePartitions = new HashMap<>();
+        this.policyExecutionPlan = doParse();
+    }
+
+    @Override
+    public PolicyExecutionPlan getExecutionPlan() {
+        return policyExecutionPlan;
+    }
+
+    private String validateAndGet(String executionPlan) {
+        try {
+            SiddhiCompiler.parse(executionPlan);
+            return executionPlan;
+        } catch (SiddhiParserException e) {
+            // There should be at least 1 stream definition for compiler to parse the execution plan.
+            // Therefore, try prepending a IgnoreStream definition to the execution plan.
+            String ignoreStreamDef = "define stream IgnoreStream (ignored bool); ";
+            try {
+                String epWithStreamDef = ignoreStreamDef + executionPlan;
+                SiddhiCompiler.parse(epWithStreamDef);
+                return epWithStreamDef;
+            } catch (Exception ex) {
+                return executionPlan;
+            }
+        }
+    }
+
+    private PolicyExecutionPlan doParse()  throws Exception {
+        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
+        try {
+            SiddhiApp siddhiApp = SiddhiCompiler.parse(this.executionPlan);
+            policyExecutionPlan.setExecutionPlanDesc(siddhiApp.toString());
+
+            // Set current execution plan as valid
+            policyExecutionPlan.setExecutionPlanSource(this.executionPlan);
+            policyExecutionPlan.setInternalSiddhiApp(siddhiApp);
+
+
+            // Go through execution element
+            for (ExecutionElement executionElement : siddhiApp.getExecutionElementList()) {
+                // -------------
+                // Explain Query
+                // -------------
+                if (executionElement instanceof Query) {
+                    // -----------------------
+                    // Query Level Variables
+                    // -----------------------
+                    InputStream inputStream = ((Query) executionElement).getInputStream();
+                    Selector selector = ((Query) executionElement).getSelector();
+                    Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>();
+
+                    // Inputs stream definitions
+                    for (String streamId : inputStream.getUniqueStreamIds()) {
+                        if (!effectiveInputStreams.containsKey(streamId)) {
+                            io.siddhi.query.api.definition.StreamDefinition streamDefinition = siddhiApp.getStreamDefinitionMap().get(streamId);
+                            if (streamDefinition != null) {
+                                effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
+                            } else {
+                                effectiveInputStreams.put(streamId, null);
+                            }
+                        }
+                    }
+
+                    // Window Spec and Partition
+                    if (inputStream instanceof SingleInputStream) {
+                        retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping);
+                        retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector));
+                    } else {
+                        if (inputStream instanceof JoinInputStream) {
+                            // Only Support JOIN/INNER_JOIN Now
+                            if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) {
+                                SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream();
+                                SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream();
+
+                                retrievePartition(findStreamPartition(leftInputStream, selector));
+                                retrievePartition(findStreamPartition(rightInputStream, selector));
+                                retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping);
+                                retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping);
+
+                            } else {
+                                throw new SiddhiAppValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN");
+                            }
+
+                            Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare();
+
+                            if (joinCondition != null) {
+                                if (joinCondition instanceof Compare) {
+                                    if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) {
+                                        Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression();
+                                        Preconditions.checkNotNull(leftExpression.getStreamId());
+                                        Preconditions.checkNotNull(leftExpression.getAttributeName());
+
+                                        StreamPartition leftPartition = new StreamPartition();
+                                        leftPartition.setType(StreamPartition.Type.GROUPBY);
+                                        leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName()));
+                                        leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
+                                        retrievePartition(leftPartition);
+
+                                        Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression();
+                                        Preconditions.checkNotNull(rightExpression.getStreamId());
+                                        Preconditions.checkNotNull(rightExpression.getAttributeName());
+                                        StreamPartition rightPartition = new StreamPartition();
+                                        rightPartition.setType(StreamPartition.Type.GROUPBY);
+                                        rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName()));
+                                        rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
+                                        retrievePartition(leftPartition);
+                                    } else {
+                                        throw new SiddhiAppValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition);
+                                    }
+                                } else {
+                                    throw new SiddhiAppValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition);
+                                }
+                            }
+                        } else if (inputStream instanceof StateInputStream) {
+                            // Group By Spec
+                            List<Variable> groupBy = selector.getGroupByList();
+                            if (groupBy.size() >= 0) {
+                                Map<String, List<Variable>> streamGroupBy = new HashMap<>();
+                                for (String streamId : inputStream.getUniqueStreamIds()) {
+                                    streamGroupBy.put(streamId, new ArrayList<>());
+                                }
+
+                                collectStreamReferenceIdMapping(((StateInputStream)inputStream).getStateElement());
+
+                                for (Variable variable : groupBy) {
+                                    // Not stream not set, then should be all streams' same field
+                                    if (variable.getStreamId() == null) {
+                                        for (String streamId : inputStream.getUniqueStreamIds()) {
+                                            streamGroupBy.get(streamId).add(variable);
+                                        }
+                                    } else {
+                                        String streamId = variable.getStreamId();
+                                        if (!this.effectiveInputStreamsAlias.containsKey(streamId)) {
+                                            streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);
+                                        } else {
+                                            streamId = this.effectiveInputStreamsAlias.get(streamId);
+                                        }
+                                        if (streamGroupBy.containsKey(streamId)) {
+                                            streamGroupBy.get(streamId).add(variable);
+                                        } else {
+                                            throw new DefinitionNotExistException(streamId);
+                                        }
+                                    }
+                                }
+                                for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) {
+                                    if (entry.getValue().size() > 0) {
+                                        StreamPartition partition = generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()])));
+                                        if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN)
+                                                || ((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.SEQUENCE)) {
+                                            if (effectivePartitions.containsKey(partition.getStreamId())) {
+                                                StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
+                                                if (!existingPartition.equals(partition)
+                                                        && existingPartition.getType().equals(partition.getType())
+                                                        && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())) {
+                                                    partition.setSortSpec(existingPartition.getSortSpec());
+                                                }
+                                            }
+                                        }
+                                        retrievePartition(partition);
+                                    }
+                                }
+                            }
+                        }
+                    }
+
+                    // Output streams
+                    OutputStream outputStream = ((Query) executionElement).getOutputStream();
+                    effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
+                } else {
+                    LOG.warn("Unhandled execution element: {}", executionElement.toString());
+                }
+            }
+            // Set effective input streams
+            policyExecutionPlan.setInputStreams(effectiveInputStreams);
+
+            // Set effective output streams
+            policyExecutionPlan.setOutputStreams(effectiveOutputStreams);
+
+            // Set Partitions
+            for (String streamId : effectiveInputStreams.keySet()) {
+                // Use shuffle partition by default
+                if (!effectivePartitions.containsKey(streamId)) {
+                    StreamPartition shufflePartition = new StreamPartition();
+                    shufflePartition.setStreamId(streamId);
+                    shufflePartition.setType(StreamPartition.Type.SHUFFLE);
+                    effectivePartitions.put(streamId, shufflePartition);
+                }
+            }
+            policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values()));
+        } catch (Exception ex) {
+            LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex);
+            throw ex;
+        }
+        return policyExecutionPlan;
+    }
+
+    private void collectStreamReferenceIdMapping(StateElement stateElement) {
+        if (stateElement instanceof LogicalStateElement) {
+            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement1());
+            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement2());
+        } else if (stateElement instanceof CountStateElement) {
+            collectStreamReferenceIdMapping(((CountStateElement) stateElement).getStreamStateElement());
+        } else if (stateElement instanceof EveryStateElement) {
+            collectStreamReferenceIdMapping(((EveryStateElement) stateElement).getStateElement());
+        } else if (stateElement instanceof NextStateElement) {
+            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getStateElement());
+            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getNextStateElement());
+        } else if (stateElement instanceof StreamStateElement) {
+            BasicSingleInputStream basicSingleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream();
+            this.effectiveInputStreamsAlias.put(basicSingleInputStream.getStreamReferenceId(), basicSingleInputStream.getStreamId());
+        }
+    }
+
+    private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) {
+        Preconditions.checkNotNull(variable.getStreamId(), "streamId");
+        if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) {
+            throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId(),
+                    variable.getQueryContextStartIndex(), variable.getQueryContextEndIndex());
+        } else if (streamMap.containsKey(variable.getStreamId())) {
+            return variable.getStreamId();
+        } else if (aliasMap.containsKey(variable.getStreamId())) {
+            return aliasMap.get(variable.getStreamId()).getStreamId();
+        } else {
+            throw new DefinitionNotExistException(variable.getStreamId());
+        }
+    }
+
+    private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) {
+        // Window Spec
+        List<Window> windows = new ArrayList<>();
+        for (StreamHandler streamHandler : inputStream.getStreamHandlers()) {
+            if (streamHandler instanceof Window) {
+                windows.add((Window) streamHandler);
+            }
+        }
+
+        // Group By Spec
+        List<Variable> groupBy = selector.getGroupByList();
+        if (windows.size() > 0 || groupBy.size() >= 0) {
+            return generatePartition(inputStream.getStreamId(), windows, groupBy);
+        } else {
+            return null;
+        }
+    }
+
+    private void retrievePartition(StreamPartition partition) {
+        if (partition == null) {
+            return;
+        }
+
+        if (!effectivePartitions.containsKey(partition.getStreamId())) {
+            effectivePartitions.put(partition.getStreamId(), partition);
+        } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) {
+            StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
+            // If same Type & Columns but different sort spec, then use larger
+            if (existingPartition.getType().equals(partition.getType())
+                && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())
+                && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis()
+                || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) {
+                effectivePartitions.put(partition.getStreamId(), partition);
+            } else {
+                // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode
+                throw new SiddhiAppValidationException("You have incompatible partitions on stream " + partition.getStreamId()
+                    + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + "");
+            }
+        }
+    }
+
+    private void retrieveAliasForQuery(SingleInputStream inputStream, Map<String, SingleInputStream> aliasStreamMapping) {
+        if (inputStream.getStreamReferenceId() != null) {
+            if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) {
+                throw new SiddhiAppValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream);
+            } else {
+                aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream);
+            }
+        }
+    }
+
+    private StreamPartition generatePartition(String streamId, List<Window> windows, List<Variable> groupBy) {
+        StreamPartition partition = new StreamPartition();
+        partition.setStreamId(streamId);
+        StreamSortSpec sortSpec = null;
+        if (windows != null && windows.size() > 0) {
+            for (Window window : windows) {
+                if (window.getName().equals(WINDOW_EXTERNAL_TIME)) {
+                    sortSpec = new StreamSortSpec();
+                    sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window));
+                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5);
+                }
+            }
+        }
+        partition.setSortSpec(sortSpec);
+        if (groupBy != null && groupBy.size() > 0) {
+            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
+            partition.setType(StreamPartition.Type.GROUPBY);
+        } else {
+            partition.setType(StreamPartition.Type.SHUFFLE);
+        }
+        return partition;
+    }
+
+    private static int getExternalTimeWindowSize(Window window) {
+        Expression windowSize = window.getParameters()[1];
+        if (windowSize instanceof TimeConstant) {
+            return ((TimeConstant) windowSize).getValue().intValue();
+        } else if (windowSize instanceof IntConstant) {
+            return ((IntConstant) windowSize).getValue();
+        } else if (windowSize instanceof LongConstant) {
+            return ((LongConstant) windowSize).getValue().intValue();
+        } else {
+            throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString());
+        }
+    }
+
+    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
+        return outputAttributeList.stream().map(outputAttribute -> {
+            StreamColumn streamColumn = new StreamColumn();
+            streamColumn.setName(outputAttribute.getRename());
+            streamColumn.setDescription(outputAttribute.getExpression().toString());
+            return streamColumn;
+        }).collect(Collectors.toList());
+    }
+}
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
index 599f349..ebb58e1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
@@ -19,50 +19,117 @@
 package org.apache.eagle.alert.engine.siddhi.extension;
 
 import com.google.common.collect.ImmutableList;
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.query.processor.ProcessingMode;
+import io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregatorExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.Attribute.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
 
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Map;
 
 /**
  * @since Apr 1, 2016.
  */
-public class AttributeCollectAggregator extends AttributeAggregator {
+@Extension(
+        name = "collect",
+        namespace = "eagle",
+        description = "Collects values in to a list, and returns the list.",
+        parameters = {
+                @Parameter(name = "value",
+                        description = "The value that need to be collected.",
+                        type = {DataType.INT, DataType.LONG, DataType.DOUBLE, DataType.FLOAT,
+                                DataType.STRING, DataType.BOOL, DataType.OBJECT},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"value"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Collects and return values in a list",
+                type = {DataType.OBJECT}),
+        examples = @Example(
+                syntax = "eagle:collect(hosts)",
+                description = "Collects and return all hosts."
+        )
+)
+public class AttributeCollectAggregator
+        extends AttributeAggregatorExecutor<AttributeCollectAggregator.AggregatorState> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
 
-    private LinkedList<Object> value;
-
-    public AttributeCollectAggregator() {
-        value = new LinkedList<Object>();
+    /**
+     * The initialization method for AttributeAggregatorExecutor
+     *
+     * @param attributeExpressionExecutors are the executors of each attributes in the function
+     * @param processingMode               query processing mode
+     * @param outputExpectsExpiredEvents   is expired events sent as output
+     * @param configReader                 this hold the {@link AttributeCollectAggregator} configuration reader.
+     * @param siddhiQueryContext           Siddhi query runtime context
+     */
+    @Override
+    protected StateFactory<AggregatorState> init(ExpressionExecutor[] attributeExpressionExecutors,
+                                                 ProcessingMode processingMode,
+                                                 boolean outputExpectsExpiredEvents,
+                                                 ConfigReader configReader,
+                                                 SiddhiQueryContext siddhiQueryContext) {
+        // TODO: Support max of elements?
+        return AggregatorState::new;
     }
 
     @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
-
-    @Override
-    public Object[] currentState() {
-        return value.toArray();
-    }
-
-    @Override
-    public void restoreState(Object[] arg0) {
-        value = new LinkedList<Object>();
-        if (arg0 != null) {
-            for (Object o : arg0) {
-                value.add(o);
-            }
+    public Object processAdd(Object data, AggregatorState state) {
+        state.value.add(data);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + state.value);
         }
+        return ImmutableList.copyOf(state.value);
+    }
+
+    @Override
+    public Object processAdd(Object[] data, AggregatorState state) {
+        state.value.add(data);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + state.value);
+        }
+        return ImmutableList.copyOf(state.value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object data, AggregatorState state) {
+        state.value.remove(data);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processRemove: current values are : " + state.value);
+        }
+        return ImmutableList.copyOf(state.value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object[] data, AggregatorState state) {
+        state.value.remove(data);
+        LOG.info("processRemove: current values are : " + state.value);
+        return ImmutableList.copyOf(state.value);
+    }
+
+    @Override
+    public Object reset(AggregatorState state) {
+        state.value.clear();
+        return state.value;
     }
 
     @Override
@@ -70,51 +137,25 @@
         return Attribute.Type.OBJECT;
     }
 
-    @Override
-    protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
-        // TODO: Support max of elements?
-    }
+    class AggregatorState extends State {
+        private LinkedList<Object> value = new LinkedList<>();
 
-    @Override
-    public Object processAdd(Object arg0) {
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
+        @Override
+        public boolean canDestroy() {
+            return value.isEmpty();
         }
-        return ImmutableList.copyOf(value);
-    }
 
-    @Override
-    public Object processAdd(Object[] arg0) {
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
+        @Override
+        public Map<String, Object> snapshot() {
+            Map<String, Object> state = new HashMap<>();
+            state.put("valueList", value);
+            return state;
         }
-        return ImmutableList.copyOf(value);
-    }
 
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object arg0) {
-        value.remove(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processRemove: current values are : " + value);
+        @Override
+        public void restore(Map<String, Object> state) {
+            value = (LinkedList<Object>) state.get("valueList");
         }
-        return ImmutableList.copyOf(value);
-    }
-
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object[] arg0) {
-        value.remove(arg0);
-        LOG.info("processRemove: current values are : " + value);
-        return ImmutableList.copyOf(value);
-    }
-
-    @Override
-    public Object reset() {
-        value.clear();
-        return value;
     }
 
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
index 101d05b..d058d35 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
@@ -17,47 +17,118 @@
 package org.apache.eagle.alert.engine.siddhi.extension;
 
 import com.google.common.collect.ImmutableList;
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.query.processor.ProcessingMode;
+import io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregatorExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.Attribute.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
 
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Map;
 
-public class AttributeCollectWithDistinctAggregator extends AttributeAggregator {
+@Extension(
+        name = "collectWithDistinct",
+        namespace = "eagle",
+        description = "Collects distinct values in to a list, and returns the list.",
+        parameters = {
+                @Parameter(name = "value",
+                        description = "The value that need to be collected.",
+                        type = {DataType.INT, DataType.LONG, DataType.DOUBLE, DataType.FLOAT,
+                                DataType.STRING, DataType.BOOL, DataType.OBJECT},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"value"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Collects and return distinct values.",
+                type = {DataType.OBJECT}),
+        examples = @Example(
+                syntax = "eagle:collectWithDistinct(hosts)",
+                description = "Collects and return distinct hosts."
+        )
+)
+public class AttributeCollectWithDistinctAggregator
+        extends AttributeAggregatorExecutor<AttributeCollectWithDistinctAggregator.AggregatorState> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
 
-    private LinkedList<Object> value;
-
-    public AttributeCollectWithDistinctAggregator() {
-        value = new LinkedList<Object>();
+    /**
+     * The initialization method for AttributeAggregatorExecutor
+     *
+     * @param attributeExpressionExecutors are the executors of each attributes in the function
+     * @param processingMode               query processing mode
+     * @param outputExpectsExpiredEvents   is expired events sent as output
+     * @param configReader                 this hold the {@link AttributeCollectAggregator} configuration reader.
+     * @param siddhiQueryContext           Siddhi query runtime context
+     */
+    @Override
+    protected StateFactory<AggregatorState> init(ExpressionExecutor[] attributeExpressionExecutors,
+                                                 ProcessingMode processingMode,
+                                                 boolean outputExpectsExpiredEvents,
+                                                 ConfigReader configReader,
+                                                 SiddhiQueryContext siddhiQueryContext) {
+        // TODO: Support max of elements?
+        return AggregatorState::new;
     }
 
     @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
-
-    @Override
-    public Object[] currentState() {
-        return value.toArray();
-    }
-
-    @Override
-    public void restoreState(Object[] arg0) {
-        value = new LinkedList<Object>();
-        if (arg0 != null) {
-            for (Object o : arg0) {
-                value.add(o);
-            }
+    public Object processAdd(Object data, AggregatorState state) {
+        // AttributeAggregator.process is already synchronized
+        state.value.remove(data);
+        state.value.add(data);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + state.value);
         }
+        return ImmutableList.copyOf(state.value);
+    }
+
+    @Override
+    public Object processAdd(Object[] data, AggregatorState state) {
+        // AttributeAggregator.process is already synchronized
+        state.value.remove(data);
+        state.value.add(data);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + state.value);
+        }
+        return ImmutableList.copyOf(state.value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object data, AggregatorState state) {
+        state.value.remove(data);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processRemove: current values are : " + state.value);
+        }
+        return ImmutableList.copyOf(state.value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object[] data, AggregatorState state) {
+        state.value.remove(data);
+        LOG.info("processRemove: current values are : " + state.value);
+        return ImmutableList.copyOf(state.value);
+    }
+
+    @Override
+    public Object reset(AggregatorState state) {
+        state.value.clear();
+        return state.value;
     }
 
     @Override
@@ -65,59 +136,25 @@
         return Attribute.Type.OBJECT;
     }
 
-    @Override
-    protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
-        // TODO: Support max of elements?
-    }
+    class AggregatorState extends State {
+        private LinkedList<Object> value = new LinkedList<>();
 
-    @Override
-    public Object processAdd(Object arg0) {
-        // AttributeAggregator.process is already synchronized
-        if (value.contains(arg0)) {
-            value.remove(arg0);
+        @Override
+        public boolean canDestroy() {
+            return value.isEmpty();
         }
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
 
-    @Override
-    public Object processAdd(Object[] arg0) {
-        // AttributeAggregator.process is already synchronized
-        if (value.contains(arg0)) {
-            value.remove(arg0);
+        @Override
+        public Map<String, Object> snapshot() {
+            Map<String, Object> state = new HashMap<>();
+            state.put("valueList", value);
+            return state;
         }
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
+
+        @Override
+        public void restore(Map<String, Object> state) {
+            value = (LinkedList<Object>) state.get("valueList");
         }
-        return ImmutableList.copyOf(value);
-    }
-
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object arg0) {
-        value.remove(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processRemove: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
-
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object[] arg0) {
-        value.remove(arg0);
-        LOG.info("processRemove: current values are : " + value);
-        return ImmutableList.copyOf(value);
-    }
-
-    @Override
-    public Object reset() {
-        value.clear();
-        return value;
     }
 
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
index 27df63b..713c42e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
@@ -17,40 +17,87 @@
 
 package org.apache.eagle.alert.engine.siddhi.extension;
 
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.exception.SiddhiAppRuntimeException;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
 
+@Extension(
+        name = "containsIgnoreCase",
+        namespace = "str",
+        description = "Returns whether a string contains another given string.",
+        parameters = {
+                @Parameter(name = "first.string",
+                        description = "Source string.",
+                        type = {DataType.STRING},
+                        dynamic = true),
+                @Parameter(name = "second.string",
+                        description = "Regex string.",
+                        type = {DataType.STRING},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"first.string", "second.string"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Returns whether 'first.string' contains 'second.string'.",
+                type = {DataType.BOOL}),
+        examples = {
+                @Example(
+                        syntax = "str:containsIgnoreCase(stringA, stringB) as isContains",
+                        description = "Returns whether 'stringA' contains 'stringB'.")
+        }
+)
 public class ContainsIgnoreCaseExtension extends FunctionExecutor {
 
-    Attribute.Type returnType = Attribute.Type.BOOL;
-
+    /**
+     * The initialization method for ContainsIgnoreCaseExtension,
+     * this method will be called before the other methods.
+     *
+     * @param attributeExpressionExecutors the executors of each function parameter
+     * @param configReader                 the config reader for the Siddhi app
+     * @param siddhiQueryContext           the context of the Siddhi query
+     */
     @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
+                                SiddhiQueryContext siddhiQueryContext) {
         if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, "
-                + "but found " + attributeExpressionExecutors.length);
+            throw new SiddhiAppValidationException("Invalid no of arguments passed to str:containsIgnoreCase() "
+                    + "function, required 2, but found " + attributeExpressionExecutors.length);
         }
         if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the first argument of "
+                    + "str:containsIgnoreCase() function, required " + Attribute.Type.STRING + ", but found "
+                    + attributeExpressionExecutors[0].getReturnType().toString());
         }
         if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the second argument of "
+                    + "str:containsIgnoreCase() function, required " + Attribute.Type.STRING + ", but found "
+                    + attributeExpressionExecutors[1].getReturnType().toString());
         }
+        return null;
     }
 
     @Override
-    protected Object execute(Object[] data) {
+    protected Object execute(Object[] data, State state) {
         if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null");
+            throw new SiddhiAppRuntimeException("Invalid input given to str:containsIgnoreCase() function. "
+                    + "First argument cannot be null");
         }
         if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
+            throw new SiddhiAppRuntimeException("Invalid input given to str:containsIgnoreCase() function. "
+                    + "Second argument cannot be null");
         }
         String str1 = (String) data[0];
         String str2 = (String) data[1];
@@ -58,31 +105,15 @@
     }
 
     @Override
-    protected Object execute(Object data) {
-        return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
-    }
-
-    @Override
-    public void start() {
-        //Nothing to start
-    }
-
-    @Override
-    public void stop() {
-        //Nothing to stop
+    protected Object execute(Object data, State state) {
+        // Since the containsIgnoreCase function takes in 2 parameters,
+        // this method does not get called. Hence, not implemented.
+        return null;
     }
 
     @Override
     public Attribute.Type getReturnType() {
-        return returnType;
+        return Attribute.Type.BOOL;
     }
 
-    @Override
-    public Object[] currentState() {
-        return new Object[] {};
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
index 1292e05..15bb807 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
@@ -17,40 +17,87 @@
 
 package org.apache.eagle.alert.engine.siddhi.extension;
 
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.exception.SiddhiAppRuntimeException;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
 
+@Extension(
+        name = "equalsIgnoreCase",
+        namespace = "str",
+        description = "Returns whether a string is equal to another string.",
+        parameters = {
+                @Parameter(name = "first.string",
+                        description = "First string.",
+                        type = {DataType.STRING},
+                        dynamic = true),
+                @Parameter(name = "second.string",
+                        description = "Second string.",
+                        type = {DataType.STRING},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"first.string", "second.string"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Returns whether a string is equal to another string.",
+                type = {DataType.BOOL}),
+        examples = {
+                @Example(
+                        syntax = "str:equalsIgnoreCase(stringA, stringB) as isEqual",
+                        description = "Returns whether stringA is equal to stringB.")
+        }
+)
 public class EqualsIgnoreCaseExtension extends FunctionExecutor {
 
-    Attribute.Type returnType = Attribute.Type.BOOL;
-
+    /**
+     * The initialization method for EqualsIgnoreCaseExtension,
+     * this method will be called before the other methods.
+     *
+     * @param attributeExpressionExecutors the executors of each function parameter
+     * @param configReader                 the config reader for the Siddhi app
+     * @param siddhiQueryContext           the context of the Siddhi query
+     */
     @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
+                                SiddhiQueryContext siddhiQueryContext) {
         if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, "
-                + "but found " + attributeExpressionExecutors.length);
+            throw new SiddhiAppValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() "
+                    + "function, required 2, but found " + attributeExpressionExecutors.length);
         }
         if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the first argument of "
+                    + "str:equalsIgnoreCase() function, required " + Attribute.Type.STRING + ", but found "
+                    + attributeExpressionExecutors[0].getReturnType().toString());
         }
         if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the second argument of "
+                    + "str:equalsIgnoreCase() function, required " + Attribute.Type.STRING + ", but found "
+                    + attributeExpressionExecutors[1].getReturnType().toString());
         }
+        return null;
     }
 
     @Override
-    protected Object execute(Object[] data) {
+    protected Object execute(Object[] data, State state) {
         if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null");
+            throw new SiddhiAppRuntimeException("Invalid input given to str:equalsIgnoreCase() function. "
+                    + "First argument cannot be null");
         }
         if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
+            throw new SiddhiAppRuntimeException("Invalid input given to str:equalsIgnoreCase() function. "
+                    + "Second argument cannot be null");
         }
         String str1 = (String) data[0];
         String str2 = (String) data[1];
@@ -58,31 +105,15 @@
     }
 
     @Override
-    protected Object execute(Object data) {
-        return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
-    }
-
-    @Override
-    public void start() {
-        //Nothing to start
-    }
-
-    @Override
-    public void stop() {
-        //Nothing to stop
+    protected Object execute(Object data, State state) {
+        // Since the equalsIgnoreCase function takes in 2 parameters,
+        // this method does not get called. Hence, not implemented.
+        return null;
     }
 
     @Override
     public Attribute.Type getReturnType() {
-        return returnType;
+        return Attribute.Type.BOOL;
     }
 
-    @Override
-    public Object[] currentState() {
-        return new Object[] {};
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
index fe2280f..1dba551 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
@@ -17,62 +17,110 @@
 
 package org.apache.eagle.alert.engine.siddhi.extension;
 
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.extension.string.RegexpFunctionExtension;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import io.siddhi.annotation.Example;
+import io.siddhi.annotation.Extension;
+import io.siddhi.annotation.Parameter;
+import io.siddhi.annotation.ParameterOverload;
+import io.siddhi.annotation.ReturnAttribute;
+import io.siddhi.annotation.util.DataType;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.exception.SiddhiAppRuntimeException;
+import io.siddhi.core.executor.ConstantExpressionExecutor;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
 
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
- * regexpIgnoreCase(string, regex)
+ *
  * Tells whether or not this 'string' matches the given regular expression 'regex'.
  * Accept Type(s): (STRING,STRING)
  * Return Type(s): BOOLEAN
  */
-public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
+@Extension(
+        name = "regexpIgnoreCase",
+        namespace = "str",
+        description = "Returns whether 'source' string matches the given regular expression 'regex'.",
+        parameters = {
+                @Parameter(name = "source",
+                        description = "Source string.",
+                        type = {DataType.STRING},
+                        dynamic = true),
+                @Parameter(name = "regex",
+                        description = "Regex string.",
+                        type = {DataType.STRING},
+                        dynamic = true)
+        },
+        parameterOverloads = {
+                @ParameterOverload(parameterNames = {"source", "regex"})
+        },
+        returnAttributes = @ReturnAttribute(
+                description = "Returns whether 'source' matches the given regular expression 'regex'.",
+                type = {DataType.BOOL}),
+        examples = {
+                @Example(
+                        syntax = "str:regexpIgnoreCase(string, regex)",
+                        description = "Returns whether 'source' matches the given regular expression 'regex'.")
+        }
+)
+public class RegexpIgnoreCaseFunctionExtension extends FunctionExecutor {
 
     //state-variables
-    boolean isRegexConstant = false;
-    String regexConstant;
-    Pattern patternConstant;
+    private boolean isRegexConstant = false;
+    private Pattern patternConstant;
 
+    /**
+     * The initialization method for EqualsIgnoreCaseExtension,
+     * this method will be called before the other methods.
+     *
+     * @param attributeExpressionExecutors the executors of each function parameter
+     * @param configReader                 the config reader for the Siddhi app
+     * @param siddhiQueryContext           the context of the Siddhi query
+     */
     @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
+                                SiddhiQueryContext siddhiQueryContext) {
         if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, "
-                + "but found " + attributeExpressionExecutors.length);
+            throw new SiddhiAppValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, "
+                    + "required 2, but found " + attributeExpressionExecutors.length);
         }
         if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the first argument of "
+                    + "str:regexpIgnoreCase() function, required " + Attribute.Type.STRING + ", but found "
+                    + attributeExpressionExecutors[0].getReturnType().toString());
         }
         if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
+            throw new SiddhiAppValidationException("Invalid parameter type found for the second argument of "
+                    + "str:regexpIgnoreCase() function, required " + Attribute.Type.STRING + ", but found "
+                    + attributeExpressionExecutors[1].getReturnType().toString());
         }
         if (attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor) {
-            isRegexConstant = true;
-            regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
+            String regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
             patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
+            isRegexConstant = true;
         }
+        return null;
     }
 
     @Override
-    protected Object execute(Object[] data) {
+    protected Object execute(Object[] data, State state) {
         String regex;
         Pattern pattern;
         Matcher matcher;
 
         if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null");
+            throw new SiddhiAppRuntimeException("Invalid input given to str:regexpIgnoreCase() function. "
+                    + "First argument cannot be null");
         }
         if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null");
+            throw new SiddhiAppRuntimeException("Invalid input given to str:regexpIgnoreCase() function. "
+                    + "Second argument cannot be null");
         }
         String source = (String) data[0];
 
@@ -87,4 +135,15 @@
             return matcher.matches();
         }
     }
+
+    @Override
+    protected Object execute(Object data, State state) {
+        return null;
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return Attribute.Type.BOOL;
+    }
+
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
deleted file mode 100644
index 16569a4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
-collectWithDistinct=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectWithDistinctAggregator
\ No newline at end of file
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/str.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/str.siddhiext
deleted file mode 100644
index 7569989..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/str.siddhiext
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension
-coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension
-concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension
-length=org.wso2.siddhi.extension.string.LengthFunctionExtension
-lower=org.wso2.siddhi.extension.string.LowerFunctionExtension
-regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension
-repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension
-replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension
-replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension
-reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension
-strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension
-substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension
-trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
-upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
-hex=org.wso2.siddhi.extension.string.HexFunctionExtension
-unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-contains=org.wso2.siddhi.extension.string.ContainsFunctionExtension
-
-# Eagle Siddhi Extension
-equalsIgnoreCase=org.apache.eagle.alert.engine.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=org.apache.eagle.alert.engine.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=org.apache.eagle.alert.engine.siddhi.extension.RegexpIgnoreCaseFunctionExtension
-
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
index 5c2c404..6ea486c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
@@ -22,12 +22,12 @@
 import org.apache.eagle.alert.engine.UnitTopologyMain;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.EventPrinter;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -88,7 +88,7 @@
         String s1 = " define stream esStream(instanceUuid string, timestamp long, logLevel string, message string, reqId string, host string, component string); ";
         s1 += " define stream ifStream(instanceUuid string, timestamp long, reqId string, message string, host string); ";
         s1 += "from esStream#window.externalTime(timestamp, 20 min) as a join ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == b.instanceUuid  within 10 min select logLevel, a.host as aHost, a.component, a.message as logMessage, b.message as failMessage, a.timestamp as t1, b.timestamp as t2, b.host as bHost, count(1) as errorCount group by component insert into log_stream_join_output; ";
-        ExecutionPlanRuntime epr = sm.createExecutionPlanRuntime(s1);
+        SiddhiAppRuntime epr = sm.createSiddhiAppRuntime(s1);
 
         epr.addCallback("log_stream_join_output", new StreamCallback() {
             @Override
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
index 4047fc1..2b9e9e0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
@@ -26,8 +26,8 @@
 import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
 import org.junit.Assert;
 import org.junit.Test;
-import org.wso2.siddhi.core.exception.DefinitionNotExistException;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import io.siddhi.core.exception.DefinitionNotExistException;
+import io.siddhi.query.api.exception.SiddhiAppValidationException;
 
 import java.util.*;
 
@@ -75,7 +75,7 @@
         Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
     }
 
-    @Test(expected = ExecutionPlanValidationException.class)
+    @Test(expected = SiddhiAppValidationException.class)
     public void testParseSingleStreamPolicyQueryWithConflictPartition() throws Exception {
         PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
             "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 5 min) "
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
index 79b939c..0b525c6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
@@ -17,11 +17,11 @@
 package org.apache.eagle.alert.engine.nodata;
 
 import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.EventPrinter;
 
 /**
  * Since 6/27/16.
@@ -29,7 +29,7 @@
 public class TestEventTable {
     @Test
     public void test() throws Exception {
-        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+        SiddhiAppRuntime runtime = new SiddhiManager().createSiddhiAppRuntime(
             "define stream expectStream (key string, src string);" +
                 "define stream appearStream (key string, src string);" +
                 "define table expectTable (key string, src string);" +
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
index fe70630..4050cb0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -17,11 +17,11 @@
 package org.apache.eagle.alert.engine.nodata;
 
 import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.EventPrinter;
 
 /**
  * Since 6/27/16.
@@ -33,7 +33,7 @@
 //        String[] appearHosts = new String[]{"host_6","host_7","host_8"};
 //        String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"};
 
-        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+        SiddhiAppRuntime runtime = new SiddhiManager().createSiddhiAppRuntime(
             "define stream appearStream (key string, src string);" +
                 "define stream expectStream (key string, src string);" +
                 "define table expectTable (key string, src string);" +
@@ -46,7 +46,7 @@
 //                        "from joinStream[k2 is null] select k1 insert current events into missingStream;"
         );
 
-//        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+//        SiddhiAppRuntime runtime = new SiddhiManager().createSiddhiAppRuntime(
 //                "define stream appearStream (key string, src string);"+
 //                        "define stream expectStream (key string, src string);"+
 //                        "define table expectTable (key string, src string);"+
@@ -87,7 +87,7 @@
      */
     @Test
     public void testMissingBlock() throws Exception {
-        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+        SiddhiAppRuntime runtime = new SiddhiManager().createSiddhiAppRuntime(
             "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);" +
                 "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> " +
                 "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and " +
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
index 9520b62..e0fd04b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
@@ -19,12 +19,12 @@
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.EventPrinter;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -64,7 +64,7 @@
 
     @Test
     public void testPolicy_grpby() {
-        String ql = " from syslog_stream#window.time(1min) select name, namespace, timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
+        String ql = " from syslog_stream#window.time(1min) select name, namespace, timestamp, dims_hostname, count() as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
         StreamCallback sc = new StreamCallback() {
             @Override
             public void receive(Event[] arg0) {
@@ -75,7 +75,7 @@
         };
 
         String executionPlan = streams + ql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan);
         runtime.addCallback("syslog_severity_check_output", sc);
         runtime.start();
     }
@@ -88,7 +88,7 @@
             + "namespace, "
             + "timestamp, "
             + "dims_hostname, "
-            + "count(*) as abortCount "
+            + "count() as abortCount "
             + "group by dims_hostname "
             + "having abortCount > 3 insert into syslog_severity_check_output; ";
 
@@ -114,7 +114,7 @@
         };
 
         String executionPlan = streams + sql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan);
         runtime.addCallback("syslog_severity_check_output", sc);
         runtime.start();
         InputHandler handler = runtime.getInputHandler("syslog_stream");
@@ -166,7 +166,7 @@
     @Ignore
     @Test
     public void testPolicy_regex() throws Exception {
-        String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
+        String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count() as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
 
         AtomicBoolean checked = new AtomicBoolean();
         StreamCallback sc = new StreamCallback() {
@@ -179,7 +179,7 @@
         };
 
         String executionPlan = streams + sql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan);
         runtime.addCallback("syslog_severity_check_output", sc);
         runtime.start();
 
@@ -214,7 +214,7 @@
         };
 
         String executionPlan = streams + sql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan);
         runtime.addCallback("syslog_severity_check_output", sc);
         runtime.start();
         InputHandler handler = runtime.getInputHandler("syslog_stream");
@@ -255,7 +255,7 @@
         String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
             " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; ";
         SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
+        SiddhiAppRuntime runtime = manager.createSiddhiAppRuntime(ql);
         runtime.addCallback("output", new StreamCallback() {
             @Override
             public void receive(Event[] events) {
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
index 7694623..6abfc31 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
@@ -21,11 +21,11 @@
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -45,7 +45,7 @@
         ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
 
         SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(ql);
 
         InputHandler input = runtime.getInputHandler("s1");
         runtime.addCallback("output", new StreamCallback() {
@@ -143,6 +143,6 @@
         ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
 
         SiddhiManager sm = new SiddhiManager();
-        sm.createExecutionPlanRuntime(ql);
+        sm.createSiddhiAppRuntime(ql);
     }
 }
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
deleted file mode 100644
index 4ce9805..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/policy-sample.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/policy-sample.json
index 56a53e7..0d8a6a2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/policy-sample.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/policy-sample.json
@@ -10,7 +10,7 @@
   ],
   "definition": {
     "type": "siddhiCEPEngine",
-    "value": "select count(*) from in-stream-1"
+    "value": "select count() from in-stream-1"
   },
   "partitionSpec": [
     {
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/pom.xml
index f387ea8..7e03447 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/pom.xml
@@ -166,7 +166,7 @@
                 <version>${metrics.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.wso2.siddhi</groupId>
+                <groupId>io.siddhi</groupId>
                 <artifactId>siddhi-core</artifactId>
                 <version>${siddhi.version}</version>
                 <exclusions>
@@ -177,24 +177,24 @@
                 </exclusions>
             </dependency>
             <dependency>
-                <groupId>org.wso2.siddhi</groupId>
+                <groupId>io.siddhi</groupId>
                 <artifactId>siddhi-query-api</artifactId>
                 <version>${siddhi.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.wso2.siddhi</groupId>
+                <groupId>io.siddhi</groupId>
                 <artifactId>siddhi-query-compiler</artifactId>
                 <version>${siddhi.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.wso2.siddhi</groupId>
-                <artifactId>siddhi-extension-regex</artifactId>
-                <version>${siddhi.version}</version>
+                <groupId>io.siddhi.extension.execution.regex</groupId>
+                <artifactId>siddhi-execution-regex</artifactId>
+                <version>${siddhi-execution-regex.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.wso2.siddhi</groupId>
-                <artifactId>siddhi-extension-string</artifactId>
-                <version>${siddhi.version}</version>
+                <groupId>io.siddhi.extension.execution.string</groupId>
+                <artifactId>siddhi-execution-string</artifactId>
+                <version>${siddhi-execution-string.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
index 1c85b58..f65a17e 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
@@ -1,149 +1,149 @@
-/*

- * Licensed to the Apache Software Foundation (ASF) under one or more

- * contributor license agreements.  See the NOTICE file distributed with

- * this work for additional information regarding copyright ownership.

- * The ASF licenses this file to You under the Apache License, Version 2.0

- * (the "License"); you may not use this file except in compliance with

- * the License.  You may obtain a copy of the License at

- * <p>

- * http://www.apache.org/licenses/LICENSE-2.0

- * <p>

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-package org.apache.eagle.app.environment.builder;

-

-import org.apache.eagle.alert.engine.coordinator.StreamColumn;

-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;

-import org.slf4j.Logger;

-import org.slf4j.LoggerFactory;

-import org.wso2.siddhi.core.ExecutionPlanRuntime;

-import org.wso2.siddhi.core.SiddhiManager;

-import org.wso2.siddhi.core.event.Event;

-import org.wso2.siddhi.core.stream.input.InputHandler;

-import org.wso2.siddhi.core.stream.output.StreamCallback;

-

-import java.util.HashMap;

-import java.util.Map;

-

-import static org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter.convertFromSiddiDefinition;

-

-public class CEPFunction implements TransformFunction {

-

-    private static final Logger LOG = LoggerFactory.getLogger(CEPFunction.class);

-

-    private ExecutionPlanRuntime runtime;

-    private SiddhiManager siddhiManager;

-    private final CEPDefinition cepDefinition;

-    private Collector collector;

-

-    public CEPFunction(CEPDefinition cepDefinition) {

-        this.cepDefinition = cepDefinition;

-    }

-

-    public CEPFunction(String siddhiQuery, String inputStreamId, String outputStreamId) {

-        this.cepDefinition = new CEPDefinition(siddhiQuery,inputStreamId, outputStreamId);

-    }

-

-    @Override

-    public String getName() {

-        return "CEPFunction";

-    }

-

-    @Override

-    public void open(Collector collector) {

-        this.collector = collector;

-        this.siddhiManager = new SiddhiManager();

-        this.runtime = siddhiManager.createExecutionPlanRuntime(cepDefinition.getSiddhiQuery());

-        if (runtime.getStreamDefinitionMap().containsKey(cepDefinition.outputStreamId)) {

-            runtime.addCallback(cepDefinition.outputStreamId, new StreamCallback() {

-                @Override

-                public void receive(Event[] events) {

-                    for (Event e : events) {

-                        StreamDefinition schema = convertFromSiddiDefinition(runtime.getStreamDefinitionMap().get(cepDefinition.outputStreamId));

-                        Map<String, Object> event = new HashMap<>();

-                        for (StreamColumn column : schema.getColumns()) {

-                            Object obj = e.getData()[schema.getColumnIndex(column.getName())];

-                            if (obj == null) {

-                                event.put(column.getName(), null);

-                                continue;

-                            }

-                            event.put(column.getName(), obj);

-                        }

-                        collector.collect(event.toString(), event);

-                    }

-                }

-            });

-        } else {

-            throw new IllegalStateException("Undefined output stream " + cepDefinition.outputStreamId);

-        }

-        runtime.start();

-    }

-

-    @Override

-    public void transform(Map event) {

-        String streamId = cepDefinition.getInputStreamId();

-        InputHandler inputHandler = runtime.getInputHandler(streamId);

-

-        if (inputHandler != null) {

-            try {

-                inputHandler.send(event.values().toArray());

-            } catch (InterruptedException e) {

-                LOG.error(e.getMessage(), e);

-            }

-            if (LOG.isDebugEnabled()) {

-                LOG.debug("sent event to siddhi stream {} ", streamId);

-            }

-        } else {

-            LOG.warn("No input handler found for stream {}", streamId);

-        }

-    }

-

-    @Override

-    public void close() {

-        LOG.info("Closing handler for query {}", this.cepDefinition.getSiddhiQuery());

-        this.runtime.shutdown();

-        LOG.info("Shutdown siddhi runtime {}", this.runtime.getName());

-        this.siddhiManager.shutdown();

-        LOG.info("Shutdown siddhi manager {}", this.siddhiManager);

-    }

-

-    public static class CEPDefinition {

-        private String inputStreamId;

-        private String outputStreamId;

-        private String siddhiQuery;

-

-        public CEPDefinition(String siddhiQuery, String inputStreamId, String outputStreamId) {

-            this.siddhiQuery = siddhiQuery;

-            this.inputStreamId = inputStreamId;

-            this.outputStreamId = outputStreamId;

-        }

-

-        public String getSiddhiQuery() {

-            return siddhiQuery;

-        }

-

-        public void setSiddhiQuery(String siddhiQuery) {

-            this.siddhiQuery = siddhiQuery;

-        }

-

-        public String getOutputStreamId() {

-            return outputStreamId;

-        }

-

-        public void setOutputStreamId(String outputStreamId) {

-            this.outputStreamId = outputStreamId;

-        }

-

-        public String getInputStreamId() {

-            return inputStreamId;

-        }

-

-        public void setInputStreamId(String inputStreamId) {

-            this.inputStreamId = inputStreamId;

-        }

-    }

+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter.convertFromSiddiDefinition;
+
+public class CEPFunction implements TransformFunction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CEPFunction.class);
+
+    private SiddhiAppRuntime runtime;
+    private SiddhiManager siddhiManager;
+    private final CEPDefinition cepDefinition;
+    private Collector collector;
+
+    public CEPFunction(CEPDefinition cepDefinition) {
+        this.cepDefinition = cepDefinition;
+    }
+
+    public CEPFunction(String siddhiQuery, String inputStreamId, String outputStreamId) {
+        this.cepDefinition = new CEPDefinition(siddhiQuery,inputStreamId, outputStreamId);
+    }
+
+    @Override
+    public String getName() {
+        return "CEPFunction";
+    }
+
+    @Override
+    public void open(Collector collector) {
+        this.collector = collector;
+        this.siddhiManager = new SiddhiManager();
+        this.runtime = siddhiManager.createSiddhiAppRuntime(cepDefinition.getSiddhiQuery());
+        if (runtime.getStreamDefinitionMap().containsKey(cepDefinition.outputStreamId)) {
+            runtime.addCallback(cepDefinition.outputStreamId, new StreamCallback() {
+                @Override
+                public void receive(Event[] events) {
+                    for (Event e : events) {
+                        StreamDefinition schema = convertFromSiddiDefinition(runtime.getStreamDefinitionMap().get(cepDefinition.outputStreamId));
+                        Map<String, Object> event = new HashMap<>();
+                        for (StreamColumn column : schema.getColumns()) {
+                            Object obj = e.getData()[schema.getColumnIndex(column.getName())];
+                            if (obj == null) {
+                                event.put(column.getName(), null);
+                                continue;
+                            }
+                            event.put(column.getName(), obj);
+                        }
+                        collector.collect(event.toString(), event);
+                    }
+                }
+            });
+        } else {
+            throw new IllegalStateException("Undefined output stream " + cepDefinition.outputStreamId);
+        }
+        runtime.start();
+    }
+
+    @Override
+    public void transform(Map event) {
+        String streamId = cepDefinition.getInputStreamId();
+        InputHandler inputHandler = runtime.getInputHandler(streamId);
+
+        if (inputHandler != null) {
+            try {
+                inputHandler.send(event.values().toArray());
+            } catch (InterruptedException e) {
+                LOG.error(e.getMessage(), e);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("sent event to siddhi stream {} ", streamId);
+            }
+        } else {
+            LOG.warn("No input handler found for stream {}", streamId);
+        }
+    }
+
+    @Override
+    public void close() {
+        LOG.info("Closing handler for query {}", this.cepDefinition.getSiddhiQuery());
+        this.runtime.shutdown();
+        LOG.info("Shutdown siddhi runtime {}", this.runtime.getName());
+        this.siddhiManager.shutdown();
+        LOG.info("Shutdown siddhi manager {}", this.siddhiManager);
+    }
+
+    public static class CEPDefinition {
+        private String inputStreamId;
+        private String outputStreamId;
+        private String siddhiQuery;
+
+        public CEPDefinition(String siddhiQuery, String inputStreamId, String outputStreamId) {
+            this.siddhiQuery = siddhiQuery;
+            this.inputStreamId = inputStreamId;
+            this.outputStreamId = outputStreamId;
+        }
+
+        public String getSiddhiQuery() {
+            return siddhiQuery;
+        }
+
+        public void setSiddhiQuery(String siddhiQuery) {
+            this.siddhiQuery = siddhiQuery;
+        }
+
+        public String getOutputStreamId() {
+            return outputStreamId;
+        }
+
+        public void setOutputStreamId(String outputStreamId) {
+            this.outputStreamId = outputStreamId;
+        }
+
+        public String getInputStreamId() {
+            return inputStreamId;
+        }
+
+        public void setInputStreamId(String inputStreamId) {
+            this.inputStreamId = inputStreamId;
+        }
+    }
 }
\ No newline at end of file
diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml
index 4af212a..cfa4b0c 100644
--- a/eagle-core/eagle-common/pom.xml
+++ b/eagle-core/eagle-common/pom.xml
@@ -32,7 +32,7 @@
 
     <dependencies>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-core</artifactId>
             <exclusions>
                 <exclusion>
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
index cbe0ff9..655b79b 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
@@ -18,11 +18,11 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.query.output.callback.QueryCallback;
+import io.siddhi.core.stream.input.InputHandler;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -56,7 +56,7 @@
 
         String query = buildSiddhiAggQuery();
         SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(query);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(query);
 
         input = runtime.getInputHandler("s");
 
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
index d12ff87..bf23459 100644
--- a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
@@ -18,11 +18,11 @@
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.query.output.callback.QueryCallback;
+import io.siddhi.core.stream.input.InputHandler;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -36,7 +36,7 @@
             " @info(name='query') " +
             " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
         SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(ql);
 
         InputHandler input = runtime.getInputHandler("s");
 
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
index 6d03c70..fa60649 100644
--- a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
@@ -20,11 +20,11 @@
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.query.output.callback.QueryCallback;
+import io.siddhi.core.stream.input.InputHandler;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -39,7 +39,7 @@
             " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
         System.out.println("query: " + ql);
         SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+        SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(ql);
 
         InputHandler input = runtime.getInputHandler("s");
 
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/pom.xml b/eagle-core/eagle-metadata/eagle-metadata-jdbc/pom.xml
index c02b22e..ab5f8eb 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/pom.xml
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/pom.xml
@@ -49,6 +49,12 @@
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
index 78fa010..4f188f9 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
@@ -51,7 +51,7 @@
 
     /**
      * add datapoint which has a list of values for different aggregate functions
-     * for example, sum(numHosts), count(*), avg(timespan) etc
+     * for example, sum(numHosts), count(), avg(timespan) etc
      * @param timestamp
      * @param values
      */
diff --git a/eagle-jpm/eagle-jpm-service/pom.xml b/eagle-jpm/eagle-jpm-service/pom.xml
index bf7ed01..5b6a6b3 100644
--- a/eagle-jpm/eagle-jpm-service/pom.xml
+++ b/eagle-jpm/eagle-jpm-service/pom.xml
@@ -37,6 +37,12 @@
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-service-base</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java
index 0585663..e57486e 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java
@@ -17,7 +17,7 @@
 package org.apache.eagle.security.auditlog;
 
 /**
- * @see org.wso2.siddhi.query.api.definition.Attribute.Type
+ * @see io.siddhi.query.api.definition.Attribute.Type
  */
 public enum AttributeType {
 	STRING,
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
index e06143c..e683ecc 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
@@ -24,11 +24,11 @@
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.query.output.callback.QueryCallback;
+import io.siddhi.core.stream.input.InputHandler;
 
 import java.util.*;
 
@@ -156,14 +156,14 @@
         }
 
         LOG.info("patterns: " + sb.toString());
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(sb.toString());
+        SiddhiAppRuntime SiddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(sb.toString());
 
         for(HdfsUserCommandPatternEntity rule : list){
-            executionPlanRuntime.addCallback(rule.getTags().get("userCommand"), new GenericQueryCallback(rule.getFieldSelector(), rule.getFieldModifier()));
+            SiddhiAppRuntime.addCallback(rule.getTags().get("userCommand"), new GenericQueryCallback(rule.getFieldSelector(), rule.getFieldModifier()));
         }
 
-        inputHandler = executionPlanRuntime.getInputHandler(streamName);
-        executionPlanRuntime.start();
+        inputHandler = SiddhiAppRuntime.getInputHandler(streamName);
+        SiddhiAppRuntime.start();
     }
 
     public void flatMap(List<Object> input, OutputCollector collector) {
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java
index 8fe6e09..bfb2388 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java
@@ -20,12 +20,12 @@
 
 import org.apache.eagle.common.DateTimeUtil;
 import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.query.output.callback.QueryCallback;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.util.EventPrinter;
 
 public class TestSiddhiPattern {
     @Test
@@ -40,17 +40,17 @@
                 "select a.user as user, b.cmd as cmd, a.src as src " +
                 "insert into outputStreams";
 
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+        SiddhiAppRuntime SiddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(cseEventStream + query);
 
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+        SiddhiAppRuntime.addCallback("query1", new QueryCallback() {
             @Override
             public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
                 EventPrinter.print(timeStamp, inEvents, removeEvents);
             }
         });
 
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
+        InputHandler inputHandler = SiddhiAppRuntime.getInputHandler("eventStream");
+        SiddhiAppRuntime.start();
         long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
         System.out.println("curTime : " + curTime);
         inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "getfileinfo"});
@@ -62,7 +62,7 @@
         Thread.sleep(100);
         inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "getfileinfo"});
         Thread.sleep(100);
-        executionPlanRuntime.shutdown();
+        SiddhiAppRuntime.shutdown();
 
     }
 
@@ -83,23 +83,23 @@
                 "select a.user as user, b.cmd as cmd, a.src as src " +
                 "insert into outputStream";
 
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query1+query2);
+        SiddhiAppRuntime SiddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(cseEventStream + query1+query2);
 
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+        SiddhiAppRuntime.addCallback("query1", new QueryCallback() {
             @Override
             public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
                 EventPrinter.print(timeStamp, inEvents, removeEvents);
             }
         });
-        executionPlanRuntime.addCallback("query2", new QueryCallback() {
+        SiddhiAppRuntime.addCallback("query2", new QueryCallback() {
             @Override
             public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
                 EventPrinter.print(timeStamp, inEvents, removeEvents);
             }
         });
 
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
+        InputHandler inputHandler = SiddhiAppRuntime.getInputHandler("eventStream");
+        SiddhiAppRuntime.start();
         long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
         System.out.println("curTime : " + curTime);
         inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "getfileinfo"});
@@ -111,7 +111,7 @@
         Thread.sleep(100);
         inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "getfileinfo"});
         Thread.sleep(100);
-        executionPlanRuntime.shutdown();
+        SiddhiAppRuntime.shutdown();
 
     }
 }
diff --git a/pom.xml b/pom.xml
index a6b614c..7d479c3 100755
--- a/pom.xml
+++ b/pom.xml
@@ -281,7 +281,9 @@
         <javax.mail.version>1.4</javax.mail.version>
         <!--<extcos4.version>0.4b</extcos4.version>-->
         <!--<extcos3.version>0.3b</extcos3.version>-->
-        <siddhi.version>3.1.1</siddhi.version>
+        <siddhi.version>5.1.4</siddhi.version>
+        <siddhi-execution-regex.version>5.0.5</siddhi-execution-regex.version>
+        <siddhi-execution-string.version>5.0.5</siddhi-execution-string.version>
 
         <!-- Testing -->
         <junit.version>4.12</junit.version>
@@ -703,14 +705,14 @@
                 <version>${velocity.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.wso2.siddhi</groupId>
+                <groupId>io.siddhi</groupId>
                 <artifactId>siddhi-core</artifactId>
                 <version>${siddhi.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.wso2.siddhi</groupId>
-                <artifactId>siddhi-extension-string</artifactId>
-                <version>${siddhi.version}</version>
+                <groupId>io.siddhi.extension.execution.string</groupId>
+                <artifactId>siddhi-execution-string</artifactId>
+                <version>${siddhi-execution-string.version}</version>
             </dependency>
 
             <!-- Testing Dependencies -->
@@ -1276,7 +1278,6 @@
                                 <exclude>**/velocity.log*</exclude>
                                 <!-- all json files should be excluded -->
                                 <exclude>**/*.json</exclude>
-                                <exclude>**/resources/eagle.siddhiext</exclude>
                                 <exclude>**/test/resources/securityAuditLog</exclude>
                                 <exclude>**/resources/**/ml-policyDef-UserProfile.txt</exclude>
                                 <exclude>**/test/resources/onelinehiveauditlog.txt</exclude>