[BAHIR-144] Add flink-library-siddhi

This closes #22
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
new file mode 100644
index 0000000..91c6797
--- /dev/null
+++ b/flink-library-siddhi/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-library-siddhi_2.11</artifactId>
+    <name>flink-library-siddhi</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <siddhi.version>4.0.0-M120</siddhi.version>
+    </properties>
+
+    <dependencies>
+        <!-- core dependencies -->
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-core</artifactId>
+            <version>${siddhi.version}</version>
+            <exclusions>
+                <exclusion>  <!-- declare the exclusion here -->
+                    <groupId>org.apache.directory.jdbm</groupId>
+                    <artifactId>apacheds-jdbm1</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-query-api</artifactId>
+            <version>${siddhi.version}</version>
+            <exclusions>
+                <exclusion>  <!-- declare the exclusion here -->
+                    <groupId>org.apache.directory.jdbm</groupId>
+                    <artifactId>apacheds-jdbm1</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Core streaming API -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <repositories>
+        <repository>
+            <id>wso2-maven2-repository</id>
+            <name>WSO2 Maven2 Repository</name>
+            <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
+        </repository>
+    </repositories>
+</project>
\ No newline at end of file
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
new file mode 100644
index 0000000..a63dbf6
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Siddhi CEP Environment, provides utility methods to
+ *
+ * <ul>
+ *     <li>Initialize SiddhiCEP environment based on {@link StreamExecutionEnvironment}</li>
+ *     <li>Register {@link SiddhiStream} with field-based StreamSchema and bind with physical source {@link DataStream}</li>
+ *     <li>Define rich-featured Siddhi CEP execution plan with SQL-Like query for SiddhiStreamOperator</li>
+ *     <li>Transform and connect source DataStream to SiddhiStreamOperator</li>
+ *     <li>Register customizable siddhi plugins to extend built-in CEP functions</li>
+ * </ul>
+ * </p>
+ *
+ * @see SiddhiStream
+ * @see StreamSchema
+ * @see SiddhiStreamOperator
+ */
+@PublicEvolving
+public class SiddhiCEP {
+    private final StreamExecutionEnvironment executionEnvironment;
+    private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
+    private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
+    private final Map<String, Class<?>> extensions = new HashMap<>();
+
+    /**
+     * @param streamExecutionEnvironment Stream Execution Environment
+     */
+    private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
+        this.executionEnvironment = streamExecutionEnvironment;
+    }
+
+    /**
+     * @see DataStream
+     * @return Siddhi streamId and source DataStream mapping.
+     */
+    public Map<String, DataStream<?>> getDataStreams() {
+        return this.dataStreams;
+    }
+
+    /**
+     * @see SiddhiStreamSchema
+     * @return Siddhi streamId and stream schema mapping.
+     */
+    public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
+        return this.dataStreamSchemas;
+    }
+
+    /**
+     * @param streamId Siddhi streamId to check.
+     * @return whether the given streamId is defined in current SiddhiCEP environment.
+     */
+    public boolean isStreamDefined(String streamId) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        return dataStreams.containsKey(streamId);
+    }
+
+    /**
+     * @return Registered siddhi extensions.
+     */
+    public Map<String, Class<?>> getExtensions() {
+        return this.extensions;
+    }
+
+    /**
+     * Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException}
+     * @param streamId Siddhi streamId to check.
+     * @throws UndefinedStreamException throws if given streamId is not defined
+     */
+    public void checkStreamDefined(String streamId) throws UndefinedStreamException {
+        Preconditions.checkNotNull(streamId,"streamId");
+        if (!isStreamDefined(streamId)) {
+            throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
+        }
+    }
+
+    /**
+     * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema,
+     * and select as initial source stream to connect to siddhi operator.
+     *
+     * @param streamId Unique siddhi streamId
+     * @param dataStream DataStream to bind to the siddhi stream.
+     * @param fieldNames Siddhi stream schema field names
+     *
+     * @see #registerStream(String, DataStream, String...)
+     * @see #from(String)
+     */
+    public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        Preconditions.checkNotNull(dataStream,"dataStream");
+        Preconditions.checkNotNull(fieldNames,"fieldNames");
+        SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment());
+        return environment.from(streamId, dataStream, fieldNames);
+    }
+
+    /**
+     * Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields,
+     * and select the registered stream as initial stream to connect to Siddhi Runtime.
+     *
+     * @see #registerStream(String, DataStream, String...)
+     * @see #from(String)
+     */
+    public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        Preconditions.checkNotNull(dataStream,"dataStream");
+        Preconditions.checkNotNull(fieldNames,"fieldNames");
+        this.registerStream(streamId, dataStream, fieldNames);
+        return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+    }
+
+    /**
+     * Select stream by streamId  as initial stream to connect to Siddhi Runtime.
+     *
+     * @param streamId Siddhi Stream Name
+     * @param <T> Stream Generic Type
+     */
+    public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+    }
+
+    /**
+     * Select one stream and union other streams by streamId to connect to Siddhi Stream Operator.
+     *
+     * @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context.
+     * @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context.
+     *
+     * @return The UnionSiddhiStream Builder
+     */
+    public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) {
+        Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+        Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+        return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds);
+    }
+
+    /**
+     * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema.
+     *
+     * @param streamId Unique siddhi streamId
+     * @param dataStream DataStream to bind to the siddhi stream.
+     * @param fieldNames Siddhi stream schema field names
+     */
+    public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        Preconditions.checkNotNull(dataStream,"dataStream");
+        Preconditions.checkNotNull(fieldNames,"fieldNames");
+        if (isStreamDefined(streamId)) {
+            throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
+        }
+        dataStreams.put(streamId, dataStream);
+        SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+        schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+        dataStreamSchemas.put(streamId, schema);
+    }
+
+    /**
+     * @return Current StreamExecutionEnvironment.
+     */
+    public StreamExecutionEnvironment getExecutionEnvironment() {
+        return executionEnvironment;
+    }
+
+    /**
+     * Register Siddhi CEP Extensions
+     *
+     * @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a>
+     * @param extensionName Unique siddhi extension name
+     * @param extensionClass Siddhi Extension class
+     */
+    public void registerExtension(String extensionName, Class<?> extensionClass) {
+        if (extensions.containsKey(extensionName)) {
+            throw new IllegalArgumentException("Extension named " + extensionName + " already registered");
+        }
+        extensions.put(extensionName, extensionClass);
+    }
+
+    /**
+     * Get registered source DataStream with Siddhi streamId.
+     *
+     * @param streamId Siddhi streamId
+     * @return The source DataStream registered with Siddhi streamId
+     */
+    public <T> DataStream<T> getDataStream(String streamId) {
+        if (this.dataStreams.containsKey(streamId)) {
+            return (DataStream<T>) this.dataStreams.get(streamId);
+        } else {
+            throw new UndefinedStreamException("Undefined stream " + streamId);
+        }
+    }
+
+    /**
+     * Create new SiddhiCEP instance.
+     *
+     * @param streamExecutionEnvironment StreamExecutionEnvironment
+     * @return New SiddhiCEP instance.
+     */
+    public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
+        return new SiddhiCEP(streamExecutionEnvironment);
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
new file mode 100644
index 0000000..43d7436
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Stream API
+ */
+@PublicEvolving
+public abstract class SiddhiStream {
+    private final SiddhiCEP cepEnvironment;
+
+    /**
+     * @param cepEnvironment SiddhiCEP cepEnvironment.
+     */
+    public SiddhiStream(SiddhiCEP cepEnvironment) {
+        Preconditions.checkNotNull(cepEnvironment,"SiddhiCEP cepEnvironment is null");
+        this.cepEnvironment = cepEnvironment;
+    }
+
+    /**
+     * @return current SiddhiCEP cepEnvironment.
+     */
+    protected SiddhiCEP getCepEnvironment() {
+        return this.cepEnvironment;
+    }
+
+    /**
+     * @return Transform SiddhiStream to physical DataStream
+     */
+    protected abstract DataStream<Tuple2<String, Object>> toDataStream();
+
+    /**
+     * Convert DataStream&lt;T&gt; to DataStream&lt;Tuple2&lt;String,T&gt;&gt;.
+     * If it's KeyedStream. pass through original keySelector
+     */
+    protected <T> DataStream<Tuple2<String, Object>> convertDataStream(DataStream<T> dataStream, String streamId) {
+        final String streamIdInClosure = streamId;
+        DataStream<Tuple2<String, Object>> resultStream = dataStream.map(new MapFunction<T, Tuple2<String, Object>>() {
+            @Override
+            public Tuple2<String, Object> map(T value) throws Exception {
+                return Tuple2.of(streamIdInClosure, (Object) value);
+            }
+        });
+        if (dataStream instanceof KeyedStream) {
+            final KeySelector<T, Object> keySelector = ((KeyedStream<T, Object>) dataStream).getKeySelector();
+            final KeySelector<Tuple2<String, Object>, Object> keySelectorInClosure = new KeySelector<Tuple2<String, Object>, Object>() {
+                @Override
+                public Object getKey(Tuple2<String, Object> value) throws Exception {
+                    return keySelector.getKey((T) value.f1);
+                }
+            };
+            return resultStream.keyBy(keySelectorInClosure);
+        } else {
+            return resultStream;
+        }
+    }
+
+    /**
+     * ExecutableStream context to define execution logic, i.e. SiddhiCEP execution plan.
+     */
+    public abstract static class ExecutableStream extends SiddhiStream {
+        public ExecutableStream(SiddhiCEP environment) {
+            super(environment);
+        }
+
+        /**
+         * Siddhi Continuous Query Language (CQL)
+         *
+         * @param executionPlan Siddhi SQL-Like execution plan query
+         * @return ExecutionSiddhiStream context
+         */
+        public ExecutionSiddhiStream cql(String executionPlan) {
+            Preconditions.checkNotNull(executionPlan,"executionPlan");
+            return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getCepEnvironment());
+        }
+    }
+
+    /**
+     * Initial Single Siddhi Stream Context
+     */
+    public static class SingleSiddhiStream<T> extends ExecutableStream {
+        private final String streamId;
+
+        public SingleSiddhiStream(String streamId, SiddhiCEP environment) {
+            super(environment);
+            environment.checkStreamDefined(streamId);
+            this.streamId = streamId;
+        }
+
+
+        /**
+         * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and as the first stream of {@link UnionSiddhiStream}
+         *
+         * @param streamId Unique siddhi streamId
+         * @param dataStream DataStream to bind to the siddhi stream.
+         * @param fieldNames Siddhi stream schema field names
+         *
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+            getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+            return union(streamId);
+        }
+
+        /**
+         * @param streamIds Defined siddhi streamIds to union
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String... streamIds) {
+            Preconditions.checkNotNull(streamIds,"streamIds");
+            return new UnionSiddhiStream<T>(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment());
+        }
+
+        @Override
+        protected DataStream<Tuple2<String, Object>> toDataStream() {
+            return convertDataStream(getCepEnvironment().getDataStream(this.streamId), this.streamId);
+        }
+    }
+
+    public static class UnionSiddhiStream<T> extends ExecutableStream {
+        private String firstStreamId;
+        private List<String> unionStreamIds;
+
+        public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) {
+            super(environment);
+            Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+            Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+            environment.checkStreamDefined(firstStreamId);
+            for (String unionStreamId : unionStreamIds) {
+                environment.checkStreamDefined(unionStreamId);
+            }
+            this.firstStreamId = firstStreamId;
+            this.unionStreamIds = unionStreamIds;
+        }
+
+        /**
+         * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and continue to union it with current stream.
+         *
+         * @param streamId Unique siddhi streamId
+         * @param dataStream DataStream to bind to the siddhi stream.
+         * @param fieldNames Siddhi stream schema field names
+         *
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+            Preconditions.checkNotNull(streamId,"streamId");
+            Preconditions.checkNotNull(dataStream,"dataStream");
+            Preconditions.checkNotNull(fieldNames,"fieldNames");
+            getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+            return union(streamId);
+        }
+
+        /**
+         * @param streamId another defined streamId to union with.
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String... streamId) {
+            List<String> newUnionStreamIds = new LinkedList<>();
+            newUnionStreamIds.addAll(unionStreamIds);
+            newUnionStreamIds.addAll(Arrays.asList(streamId));
+            return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment());
+        }
+
+        @Override
+        protected DataStream<Tuple2<String, Object>> toDataStream() {
+            final String localFirstStreamId = firstStreamId;
+            final List<String> localUnionStreamIds = this.unionStreamIds;
+            DataStream<Tuple2<String, Object>> dataStream = convertDataStream(getCepEnvironment().<T>getDataStream(localFirstStreamId), this.firstStreamId);
+            for (String unionStreamId : localUnionStreamIds) {
+                dataStream = dataStream.union(convertDataStream(getCepEnvironment().<T>getDataStream(unionStreamId), unionStreamId));
+            }
+            return dataStream;
+        }
+    }
+
+    public static class ExecutionSiddhiStream {
+        private final DataStream<Tuple2<String, Object>> dataStream;
+        private final SiddhiCEP environment;
+        private final String executionPlan;
+
+        public ExecutionSiddhiStream(DataStream<Tuple2<String, Object>> dataStream, String executionPlan, SiddhiCEP environment) {
+            this.executionPlan = executionPlan;
+            this.dataStream = dataStream;
+            this.environment = environment;
+        }
+
+        /**
+         * @param outStreamId The <code>streamId</code> to return as data stream.
+         * @param <T>         Type information should match with stream definition.
+         *                    During execution phase, it will automatically build type information based on stream definition.
+         * @return Return output stream as Tuple
+         * @see SiddhiTypeFactory
+         */
+        public <T extends Tuple> DataStream<T> returns(String outStreamId) {
+            SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+            siddhiContext.setExecutionPlan(executionPlan);
+            siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+            siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+            siddhiContext.setOutputStreamId(outStreamId);
+            siddhiContext.setExtensions(environment.getExtensions());
+            siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+            TypeInformation<T> typeInformation =
+                SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId);
+            siddhiContext.setOutputStreamType(typeInformation);
+            return returnsInternal(siddhiContext);
+        }
+
+        /**
+         * @return Return output stream as <code>DataStream&lt;Map&lt;String,Object&gt;&gt;</code>,
+         * out type is <code>LinkedHashMap&lt;String,Object&gt;</code> and guarantee field order
+         * as defined in siddhi execution plan
+         * @see java.util.LinkedHashMap
+         */
+        public DataStream<Map<String, Object>> returnAsMap(String outStreamId) {
+            return this.returnsInternal(outStreamId, SiddhiTypeFactory.getMapTypeInformation());
+        }
+
+        /**
+         * @param outStreamId OutStreamId
+         * @param outType     Output type class
+         * @param <T>         Output type
+         * @return Return output stream as POJO class.
+         */
+        public <T> DataStream<T> returns(String outStreamId, Class<T> outType) {
+            TypeInformation<T> typeInformation = TypeExtractor.getForClass(outType);
+            return returnsInternal(outStreamId, typeInformation);
+        }
+
+        private <T> DataStream<T> returnsInternal(String outStreamId, TypeInformation<T> typeInformation) {
+            SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+            siddhiContext.setExecutionPlan(executionPlan);
+            siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+            siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+            siddhiContext.setOutputStreamId(outStreamId);
+            siddhiContext.setOutputStreamType(typeInformation);
+            siddhiContext.setExtensions(environment.getExtensions());
+            siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+            return returnsInternal(siddhiContext);
+        }
+
+        private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) {
+            return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream);
+        }
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
new file mode 100644
index 0000000..f65cc81
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class DuplicatedStreamException extends RuntimeException {
+    public DuplicatedStreamException(String message) {
+        super(message);
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
new file mode 100644
index 0000000..26254c2
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class UndefinedStreamException extends RuntimeException {
+    public UndefinedStreamException(String message) {
+        super(message);
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
new file mode 100755
index 0000000..8cb6d67
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+/**
+ * <h1>Siddhi Runtime Operator</h1>
+ *
+ * A flink Stream Operator to integrate with native siddhi execution runtime, extension and type schema mechanism/
+ *
+ * <ul>
+ * <li>
+ * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
+ * </li>
+ * <li>
+ * Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId
+ * </li>
+ * <li>
+ * Convert native {@link StreamRecord} to Siddhi {@link org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send to Siddhi Runtime.
+ * </li>
+ * <li>
+ * Listen output callback event and convert as expected output type according to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then output as typed DataStream.
+ * </li>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ *
+ * @param <IN>  Input Element Type
+ * @param <OUT> Output Element Type
+ */
+public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT>
+    implements OneInputStreamOperator<IN, OUT> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+    private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
+    private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
+
+    private final SiddhiOperatorContext siddhiPlan;
+    private final String executionExpression;
+    private final boolean isProcessingTime;
+    private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;
+
+    private transient SiddhiManager siddhiManager;
+    private transient SiddhiAppRuntime siddhiRuntime;
+    private transient Map<String, InputHandler> inputStreamHandlers;
+
+    // queue to buffer out of order stream records
+    private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
+
+    private transient ListState<byte[]> siddhiRuntimeState;
+    private transient ListState<byte[]> queuedRecordsState;
+
+    /**
+     * @param siddhiPlan Siddhi CEP  Execution Plan
+     */
+    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+        validate(siddhiPlan);
+        this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+        this.siddhiPlan = siddhiPlan;
+        this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+        this.streamRecordSerializers = new HashMap<>();
+
+        registerStreamRecordSerializers();
+    }
+
+    /**
+     * Register StreamRecordSerializer based on {@link StreamSchema}
+     */
+    private void registerStreamRecordSerializers() {
+        for (String streamId : this.siddhiPlan.getInputStreams()) {
+            streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
+        }
+    }
+
+    protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig);
+
+    protected StreamElementSerializer<IN> getStreamRecordSerializer(String streamId) {
+        if (streamRecordSerializers.containsKey(streamId)) {
+            return streamRecordSerializers.get(streamId);
+        } else {
+            throw new UndefinedStreamException("Stream " + streamId + " not defined");
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> element) throws Exception {
+        String streamId = getStreamId(element.getValue());
+        StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+
+        if (isProcessingTime) {
+            processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
+            this.checkpointSiddhiRuntimeState();
+        } else {
+            PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+            // event time processing
+            // we have to buffer the elements until we receive the proper watermark
+            if (getExecutionConfig().isObjectReuseEnabled()) {
+                // copy the StreamRecord so that it cannot be changed
+                priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
+            } else {
+                priorityQueue.offer(element);
+            }
+            this.checkpointRecordQueueState();
+        }
+    }
+
+    protected abstract void processEvent(String streamId, StreamSchema<IN> schema, IN value, long timestamp) throws Exception;
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+            StreamRecord<IN> streamRecord = priorityQueue.poll();
+            String streamId = getStreamId(streamRecord.getValue());
+            long timestamp = streamRecord.getTimestamp();
+            StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+            processEvent(streamId, schema, streamRecord.getValue(), timestamp);
+        }
+        output.emitWatermark(mark);
+    }
+
+    public abstract String getStreamId(IN record);
+
+    public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
+        return priorityQueue;
+    }
+
+    protected SiddhiAppRuntime getSiddhiRuntime() {
+        return this.siddhiRuntime;
+    }
+
+    public InputHandler getSiddhiInputHandler(String streamId) {
+        return inputStreamHandlers.get(streamId);
+    }
+
+    protected SiddhiOperatorContext getSiddhiPlan() {
+        return this.siddhiPlan;
+    }
+
+    @Override
+    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+        super.setup(containingTask, config, output);
+        if (priorityQueue == null) {
+            priorityQueue = new PriorityQueue<>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
+        }
+        startSiddhiRuntime();
+    }
+
+    /**
+     * Send input data to siddhi runtime
+     */
+    protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException {
+        this.getSiddhiInputHandler(streamId).send(timestamp, data);
+    }
+
+    /**
+     * Validate execution plan during building DAG before submitting to execution environment and fail-fast.
+     */
+    private static void validate(final SiddhiOperatorContext siddhiPlan) {
+        SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
+        try {
+            siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan());
+        } finally {
+            siddhiManager.shutdown();
+        }
+    }
+
+    /**
+     * Create and start execution runtime
+     */
+    private void startSiddhiRuntime() {
+        if (this.siddhiRuntime == null) {
+            this.siddhiManager = this.siddhiPlan.createSiddhiManager();
+            for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
+                this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
+            }
+            this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression);
+            this.siddhiRuntime.start();
+            registerInputAndOutput(this.siddhiRuntime);
+            LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
+        } else {
+            throw new IllegalStateException("Siddhi has already been initialized");
+        }
+    }
+
+
+    private void shutdownSiddhiRuntime() {
+        if (this.siddhiRuntime != null) {
+            this.siddhiRuntime.shutdown();
+            LOGGER.info("Siddhi {} shutdown", this.siddhiRuntime.getName());
+            this.siddhiRuntime = null;
+            this.siddhiManager.shutdown();
+            this.siddhiManager = null;
+            this.inputStreamHandlers = null;
+        } else {
+            throw new IllegalStateException("Siddhi has already shutdown");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void registerInputAndOutput(SiddhiAppRuntime runtime) {
+        AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId());
+        runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output));
+        inputStreamHandlers = new HashMap<>();
+        for (String inputStreamId : this.siddhiPlan.getInputStreams()) {
+            inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId));
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        shutdownSiddhiRuntime();
+        this.siddhiRuntimeState.clear();
+        super.dispose();
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        checkpointSiddhiRuntimeState();
+        checkpointRecordQueueState();
+    }
+
+    private void restoreState() throws Exception {
+        LOGGER.info("Restore siddhi state");
+        final Iterator<byte[]> siddhiState = siddhiRuntimeState.get().iterator();
+        if (siddhiState.hasNext()) {
+            this.siddhiRuntime.restore(siddhiState.next());
+        }
+
+        LOGGER.info("Restore queued records state");
+        final Iterator<byte[]> queueState = queuedRecordsState.get().iterator();
+        if (queueState.hasNext()) {
+            final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(queueState.next());
+            final DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(byteArrayInputStream);
+            try {
+                this.priorityQueue = restoreQueuerState(dataInputView);
+            } finally {
+                dataInputView.close();
+                byteArrayInputStream.close();
+            }
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        if (siddhiRuntimeState == null) {
+            siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
+                    new BytePrimitiveArraySerializer()));
+        }
+        if (queuedRecordsState == null) {
+            queuedRecordsState = context.getOperatorStateStore().getListState(
+                new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
+        }
+        if (context.isRestored()) {
+            restoreState();
+        }
+    }
+
+
+    private void checkpointSiddhiRuntimeState() throws Exception {
+        this.siddhiRuntimeState.clear();
+        this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot());
+        this.queuedRecordsState.clear();
+    }
+
+    private void checkpointRecordQueueState() throws Exception {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        final DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStream);
+        try {
+            snapshotQueueState(this.priorityQueue, dataOutputView);
+            this.queuedRecordsState.clear();
+            this.queuedRecordsState.add(byteArrayOutputStream.toByteArray());
+        } finally {
+            dataOutputView.close();
+            byteArrayOutputStream.close();
+        }
+    }
+
+    protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> queue, DataOutputView dataOutputView) throws IOException;
+
+    protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView dataInputView) throws IOException;
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
new file mode 100644
index 0000000..f760938
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SiddhiCEP Operator Context Metadata including input/output stream (streamId, TypeInformation) as well execution plan query,
+ * and execution environment context like TimeCharacteristic and ExecutionConfig.
+ */
+public class SiddhiOperatorContext implements Serializable {
+    private ExecutionConfig executionConfig;
+    private Map<String, SiddhiStreamSchema<?>> inputStreamSchemas;
+    private final Map<String, Class<?>> siddhiExtensions;
+    private String outputStreamId;
+    private TypeInformation outputStreamType;
+    private TimeCharacteristic timeCharacteristic;
+    private String name;
+    private String executionPlan;
+
+    public SiddhiOperatorContext() {
+        inputStreamSchemas = new HashMap<>();
+        siddhiExtensions = new HashMap<>();
+    }
+
+    /**
+     * @param extensions siddhi extensions to register
+     */
+    public void setExtensions(Map<String, Class<?>> extensions) {
+        Preconditions.checkNotNull(extensions,"extensions");
+        siddhiExtensions.putAll(extensions);
+    }
+
+    /**
+     * @return registered siddhi extensions
+     */
+    public Map<String, Class<?>> getExtensions() {
+        return siddhiExtensions;
+    }
+
+    /**
+     * @return Siddhi Stream Operator Name in format of "Siddhi: execution query ... (query length)"
+     */
+    public String getName() {
+        if (this.name == null) {
+            if (executionPlan.length() > 100) {
+                return String.format("Siddhi: %s ... (%s)", executionPlan.substring(0, 100), executionPlan.length() - 100);
+            } else {
+                return String.format("Siddhi: %s", executionPlan);
+            }
+        } else {
+            return this.name;
+        }
+    }
+
+    /**
+     * @return Source siddhi stream IDs
+     */
+    public List<String> getInputStreams() {
+        Object[] keys = this.inputStreamSchemas.keySet().toArray();
+        List<String> result = new ArrayList<>(keys.length);
+        for (Object key : keys) {
+            result.add((String) key);
+        }
+        return result;
+    }
+
+    /**
+     * @return Siddhi CEP cql-like execution plan
+     */
+    public String getExecutionPlan() {
+        return executionPlan;
+    }
+
+    /**
+     * Stream definition + execution expression
+     */
+    public String getFinalExecutionPlan() {
+        Preconditions.checkNotNull(executionPlan, "Execution plan is not set");
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<String, SiddhiStreamSchema<?>> entry : inputStreamSchemas.entrySet()) {
+            sb.append(entry.getValue().getStreamDefinitionExpression(entry.getKey()));
+        }
+        sb.append(this.getExecutionPlan());
+        return sb.toString();
+    }
+
+    /**
+     * @return Siddhi Stream Operator output type information
+     */
+    public TypeInformation getOutputStreamType() {
+        return outputStreamType;
+    }
+
+    /**
+     * @return Siddhi output streamId for callback
+     */
+    public String getOutputStreamId() {
+        return outputStreamId;
+    }
+
+    /**
+     * @param inputStreamId Siddhi streamId
+     * @return StreamSchema for given siddhi streamId
+     *
+     * @throws UndefinedStreamException throws if stream is not defined
+     */
+    @SuppressWarnings("unchecked")
+    public <IN> StreamSchema<IN> getInputStreamSchema(String inputStreamId) {
+        Preconditions.checkNotNull(inputStreamId,"inputStreamId");
+
+        if (!inputStreamSchemas.containsKey(inputStreamId)) {
+            throw new UndefinedStreamException("Input stream: " + inputStreamId + " is not found");
+        }
+        return (StreamSchema<IN>) inputStreamSchemas.get(inputStreamId);
+    }
+
+    /**
+     * @param outputStreamId Siddhi output streamId, which must exist in siddhi execution plan
+     */
+    public void setOutputStreamId(String outputStreamId) {
+        Preconditions.checkNotNull(outputStreamId,"outputStreamId");
+        this.outputStreamId = outputStreamId;
+    }
+
+    /**
+     * @param outputStreamType Output stream TypeInformation
+     */
+    public void setOutputStreamType(TypeInformation outputStreamType) {
+        Preconditions.checkNotNull(outputStreamType,"outputStreamType");
+        this.outputStreamType = outputStreamType;
+    }
+
+    /**
+     * @return Returns execution environment TimeCharacteristic
+     */
+    public TimeCharacteristic getTimeCharacteristic() {
+        return timeCharacteristic;
+    }
+
+    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+        Preconditions.checkNotNull(timeCharacteristic,"timeCharacteristic");
+        this.timeCharacteristic = timeCharacteristic;
+    }
+
+    /**
+     * @param executionPlan Siddhi SQL-Like exeuction plan query
+     */
+    public void setExecutionPlan(String executionPlan) {
+        Preconditions.checkNotNull(executionPlan,"executionPlan");
+        this.executionPlan = executionPlan;
+    }
+
+    /**
+     * @return Returns input stream ID and  schema mapping
+     */
+    public Map<String, SiddhiStreamSchema<?>> getInputStreamSchemas() {
+        return inputStreamSchemas;
+    }
+
+    /**
+     * @param inputStreamSchemas input stream ID and  schema mapping
+     */
+    public void setInputStreamSchemas(Map<String, SiddhiStreamSchema<?>> inputStreamSchemas) {
+        Preconditions.checkNotNull(inputStreamSchemas,"inputStreamSchemas");
+        this.inputStreamSchemas = inputStreamSchemas;
+    }
+
+    public void setName(String name) {
+        Preconditions.checkNotNull(name,"name");
+        this.name = name;
+    }
+
+    /**
+     * @return Created new SiddhiManager instance with registered siddhi extensions
+     */
+    public SiddhiManager createSiddhiManager() {
+        SiddhiManager siddhiManager = new SiddhiManager();
+        for (Map.Entry<String, Class<?>> entry : getExtensions().entrySet()) {
+            siddhiManager.setExtension(entry.getKey(), entry.getValue());
+        }
+        return siddhiManager;
+    }
+
+    /**
+     * @return StreamExecutionEnvironment ExecutionConfig
+     */
+    public ExecutionConfig getExecutionConfig() {
+        return executionConfig;
+    }
+
+    /**
+     * @param executionConfig StreamExecutionEnvironment ExecutionConfig
+     */
+    public void setExecutionConfig(ExecutionConfig executionConfig) {
+        Preconditions.checkNotNull(executionConfig,"executionConfig");
+        this.executionConfig = executionConfig;
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
new file mode 100755
index 0000000..5c54ad8
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN>
+ */
+public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
+
+    public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) {
+        super(siddhiPlan);
+    }
+
+    @Override
+    protected StreamElementSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) {
+        TypeInformation<Tuple2<String, IN>> tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation((TypeInformation<IN>) streamSchema.getTypeInfo());
+        return new StreamElementSerializer<>(tuple2TypeInformation.createSerializer(executionConfig));
+    }
+
+    @Override
+    protected void processEvent(String streamId, StreamSchema<Tuple2<String, IN>> schema, Tuple2<String, IN> value, long timestamp) throws InterruptedException {
+        send(value.f0, getSiddhiPlan().getInputStreamSchema(value.f0).getStreamSerializer().getRow(value.f1), timestamp);
+    }
+
+    @Override
+    public String getStreamId(Tuple2<String, IN> record) {
+        return record.f0;
+    }
+
+    @Override
+    protected void snapshotQueueState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> queue, DataOutputView dataOutputView) throws IOException {
+        dataOutputView.writeInt(queue.size());
+        for (StreamRecord<Tuple2<String, IN>> record : queue) {
+            String streamId = record.getValue().f0;
+            dataOutputView.writeUTF(streamId);
+            this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView);
+        }
+    }
+
+    @Override
+    protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
+        int sizeOfQueue = dataInputView.readInt();
+        PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue);
+        for (int i = 0; i < sizeOfQueue; i++) {
+            String streamId = dataInputView.readUTF();
+            StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView);
+            priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord());
+        }
+        return priorityQueue;
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
new file mode 100755
index 0000000..7af37ce
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamInMemOutputHandler<R> extends StreamCallback {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamInMemOutputHandler.class);
+
+    private final AbstractDefinition definition;
+    private final TypeInformation<R> typeInfo;
+    private final ObjectMapper objectMapper;
+
+
+    private final LinkedList<StreamRecord<R>> collectedRecords;
+
+    public StreamInMemOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition) {
+        this.typeInfo = typeInfo;
+        this.definition = definition;
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.collectedRecords = new LinkedList<>();
+    }
+
+    @Override
+    public void receive(Event[] events) {
+        for (Event event : events) {
+            if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+                collectedRecords.add(new StreamRecord<R>((R) toMap(event), event.getTimestamp()));
+            } else if (typeInfo.isTupleType()) {
+                Tuple tuple = this.toTuple(event);
+                collectedRecords.add(new StreamRecord<R>((R) tuple, event.getTimestamp()));
+            } else if (typeInfo instanceof PojoTypeInfo) {
+                R obj;
+                try {
+                    obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+                } catch (IllegalArgumentException ex) {
+                    LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+                    throw ex;
+                }
+                collectedRecords.add(new StreamRecord<R>(obj, event.getTimestamp()));
+            } else {
+                throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+            }
+        }
+    }
+
+
+    @Override
+    public synchronized void stopProcessing() {
+        super.stopProcessing();
+        this.collectedRecords.clear();
+    }
+
+    private Map<String, Object> toMap(Event event) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+            map.put(definition.getAttributeNameArray()[i], event.getData(i));
+        }
+        return map;
+    }
+
+    private <T extends Tuple> T toTuple(Event event) {
+        return SiddhiTupleFactory.newTuple(event.getData());
+    }
+
+    public LinkedList<StreamRecord<R>> getCollectedRecords() {
+        return collectedRecords;
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
new file mode 100755
index 0000000..8840dac
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamOutputHandler<R> extends StreamCallback {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamOutputHandler.class);
+
+    private final AbstractDefinition definition;
+    private final Output<StreamRecord<R>> output;
+    private final TypeInformation<R> typeInfo;
+    private final ObjectMapper objectMapper;
+
+    public StreamOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition, Output<StreamRecord<R>> output) {
+        this.typeInfo = typeInfo;
+        this.definition = definition;
+        this.output = output;
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public void receive(Event[] events) {
+        StreamRecord<R> reusableRecord = new StreamRecord<>(null, 0L);
+        for (Event event : events) {
+            if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+                reusableRecord.replace(toMap(event), event.getTimestamp());
+                output.collect(reusableRecord);
+            } else if (typeInfo.isTupleType()) {
+                Tuple tuple = this.toTuple(event);
+                reusableRecord.replace(tuple, event.getTimestamp());
+                output.collect(reusableRecord);
+            } else if (typeInfo instanceof PojoTypeInfo) {
+                R obj;
+                try {
+                    obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+                } catch (IllegalArgumentException ex) {
+                    LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+                    throw ex;
+                }
+                reusableRecord.replace(obj, event.getTimestamp());
+                output.collect(reusableRecord);
+            } else {
+                throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+            }
+        }
+    }
+
+
+    @Override
+    public synchronized void stopProcessing() {
+        super.stopProcessing();
+    }
+
+    private Map<String, Object> toMap(Event event) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+            map.put(definition.getAttributeNameArray()[i], event.getData(i));
+        }
+        return map;
+    }
+
+    private <T extends Tuple> T toTuple(Event event) {
+        return SiddhiTupleFactory.newTuple(event.getData());
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
new file mode 100644
index 0000000..049681c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Stream Record Timestamp Comparator
+ */
+public class StreamRecordComparator<IN> implements Comparator<StreamRecord<IN>>, Serializable {
+    private static final long serialVersionUID = 1581054988433915305L;
+
+    @Override
+    public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+        if (o1.getTimestamp() < o2.getTimestamp()) {
+            return -1;
+        } else if (o1.getTimestamp() > o2.getTimestamp()) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
new file mode 100644
index 0000000..6b1ceae
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h1> Features </h1>
+ * <ul>
+ * <li>
+ * Integrate Siddhi CEP as an  stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
+ * <ul>
+ * <li>Filter</li>
+ * <li>Join</li>
+ * <li>Aggregation</li>
+ * <li>Group by</li>
+ * <li>Having</li>
+ * <li>Window</li>
+ * <li>Conditions and Expressions</li>
+ * <li>Pattern processing</li>
+ * <li>Sequence processing</li>
+ * <li>Event Tables</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
+ * <ul>
+ * <li>Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.</li>
+ * <li>Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan</li>
+ * <li>Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ * <p/>
+ * <h1>Example</h1>
+ * <pre>
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ *
+ * cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
+ *
+ * cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
+ * cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
+ *
+ * DataStream&lt;Tuple4&lt;Integer,String,Integer,String&gt;&gt; output = cep
+ *     .from("inputStream1").union("inputStream2")
+ *     .cql(
+ *         "from every s1 = inputStream1[id == 2] "
+ *          + " -> s2 = inputStream2[id == 3] "
+ *          + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+ *          + "insert into outputStream"
+ *     )
+ *     .returns("outputStream");
+ *
+ * env.execute();
+ * </pre>
+ *
+ * @see <a href="https://github.com/wso2/siddhi">https://github.com/wso2/siddhi</a>
+ */
+package org.apache.flink.streaming.siddhi;
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
new file mode 100644
index 0000000..2a3a04c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Siddhi specific Stream Schema.
+ *
+ * @param <T> Siddhi stream element type
+ */
+public class SiddhiStreamSchema<T> extends StreamSchema<T> {
+    private static final String DEFINE_STREAM_TEMPLATE = "define stream %s (%s);";
+
+    public SiddhiStreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
+        super(typeInfo, fieldNames);
+    }
+
+    public SiddhiStreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+        super(typeInfo, fieldIndexes, fieldNames);
+    }
+
+    public StreamDefinition getStreamDefinition(String streamId) {
+        StreamDefinition streamDefinition = StreamDefinition.id(streamId);
+        for (int i = 0; i < getFieldNames().length; i++) {
+            streamDefinition.attribute(getFieldNames()[i], SiddhiTypeFactory.getAttributeType(getFieldTypes()[i]));
+        }
+        return streamDefinition;
+    }
+
+    public String getStreamDefinitionExpression(StreamDefinition streamDefinition) {
+        List<String> columns = new ArrayList<>();
+        Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+        for (Attribute attribute : streamDefinition.getAttributeList()) {
+            columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+        }
+        return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+    }
+
+    public String getStreamDefinitionExpression(String streamId) {
+        StreamDefinition streamDefinition = getStreamDefinition(streamId);
+        List<String> columns = new ArrayList<>();
+        Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+        for (Attribute attribute : streamDefinition.getAttributeList()) {
+            columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+        }
+        return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java
new file mode 100644
index 0000000..c851631
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Generic Field-based Stream Schema
+ *
+ * @param <T> Stream element type
+ */
+public class StreamSchema<T> implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSchema.class);
+    private final TypeInformation<T> typeInfo;
+    private final int[] fieldIndexes;
+    private final String[] fieldNames;
+    private TypeInformation[] fieldTypes;
+    private final StreamSerializer<T> streamSerializer;
+    private TypeSerializer<T> typeSerializer;
+
+    public StreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
+        Preconditions.checkNotNull(fieldNames, "Field name is required");
+        this.typeInfo = typeInfo;
+        this.fieldNames = fieldNames;
+        this.fieldIndexes = getFieldIndexes(typeInfo, fieldNames);
+        this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames);
+        this.streamSerializer = new StreamSerializer<>(this);
+    }
+
+    public StreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+        this.typeInfo = typeInfo;
+        this.fieldIndexes = fieldIndexes;
+        this.fieldNames = fieldNames;
+        this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames);
+        this.streamSerializer = new StreamSerializer<>(this);
+    }
+
+    public boolean isAtomicType() {
+        return typeInfo instanceof AtomicType;
+    }
+
+    public boolean isTupleType() {
+        return typeInfo instanceof TupleTypeInfo;
+    }
+
+    public boolean isPojoType() {
+        return typeInfo instanceof PojoTypeInfo;
+    }
+
+    public boolean isCaseClassType() {
+        return typeInfo instanceof CaseClassTypeInfo;
+    }
+
+    public boolean isCompositeType() {
+        return typeInfo instanceof CompositeType;
+    }
+
+    private <E> int[] getFieldIndexes(TypeInformation<E> typeInfo, String... fieldNames) {
+        int[] result;
+        if (isAtomicType()) {
+            result = new int[]{0};
+        } else if (isTupleType()) {
+            result = new int[fieldNames.length];
+            for (int i = 0; i < fieldNames.length; i++) {
+                result[i] = i;
+            }
+        } else if (isPojoType()) {
+            result = new int[fieldNames.length];
+            for (int i = 0; i < fieldNames.length; i++) {
+                int index = ((PojoTypeInfo) typeInfo).getFieldIndex(fieldNames[i]);
+                if (index < 0) {
+                    throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo);
+                }
+                result[i] = index;
+            }
+        } else if (isCaseClassType()) {
+            result = new int[fieldNames.length];
+            for (int i = 0; i < fieldNames.length; i++) {
+                int index = ((CaseClassTypeInfo) typeInfo).getFieldIndex(fieldNames[i]);
+                if (index < 0) {
+                    throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo);
+                }
+                result[i] = index;
+            }
+        } else {
+            throw new IllegalArgumentException("Failed to get field index from " + typeInfo);
+        }
+        return result;
+    }
+
+
+    private <E> TypeInformation[] getFieldTypes(TypeInformation<E> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+        TypeInformation[] fieldTypes;
+        if (isCompositeType()) {
+            CompositeType cType = (CompositeType) typeInfo;
+            if (fieldNames.length != cType.getArity()) {
+                // throw new IllegalArgumentException("Arity of type (" + cType.getFieldNames().length+ ") " +
+                // "not equal to number of field names " + fieldNames.length + ".");
+                LOGGER.warn("Arity of type (" + cType.getFieldNames().length + ") " +
+                    "not equal to number of field names " + fieldNames.length + ".");
+            }
+            fieldTypes = new TypeInformation[fieldIndexes.length];
+            for (int i = 0; i < fieldIndexes.length; i++) {
+                fieldTypes[i] = cType.getTypeAt(fieldIndexes[i]);
+            }
+        } else if (isAtomicType()) {
+            if (fieldIndexes.length != 1 || fieldIndexes[0] != 0) {
+                throw new IllegalArgumentException(
+                    "Non-composite input type may have only a single field and its index must be 0.");
+            }
+            fieldTypes = new TypeInformation[]{typeInfo};
+        } else {
+            throw new IllegalArgumentException(
+                "Illegal input type info"
+            );
+        }
+        return fieldTypes;
+    }
+
+    public TypeInformation<T> getTypeInfo() {
+        return typeInfo;
+    }
+
+    public int[] getFieldIndexes() {
+        return fieldIndexes;
+    }
+
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    public TypeInformation[] getFieldTypes() {
+        return fieldTypes;
+    }
+
+    public StreamSerializer<T> getStreamSerializer() {
+        return streamSerializer;
+    }
+
+    public TypeSerializer<T> getTypeSerializer() {
+        return typeSerializer;
+    }
+
+    public void setTypeSerializer(TypeSerializer<T> typeSerializer) {
+        this.typeSerializer = typeSerializer;
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java
new file mode 100644
index 0000000..760afbe
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+
+/**
+ * Stream Serialization and Field Extraction Methods.
+ */
+public class StreamSerializer<T> implements Serializable {
+    private final StreamSchema<T> schema;
+
+    public StreamSerializer(StreamSchema<T> schema) {
+        this.schema = schema;
+    }
+
+    public Object[] getRow(T input) {
+        Preconditions.checkArgument(input.getClass() == schema.getTypeInfo().getTypeClass(),
+            "Invalid input type: " + input + ", expected: " + schema.getTypeInfo());
+
+        Object[] data;
+        if (schema.isAtomicType()) {
+            data = new Object[]{input};
+        } else if (schema.isTupleType()) {
+            Tuple tuple = (Tuple) input;
+            data = new Object[schema.getFieldIndexes().length];
+            for (int i = 0; i < schema.getFieldIndexes().length; i++) {
+                data[i] = tuple.getField(schema.getFieldIndexes()[i]);
+            }
+        } else if (schema.isPojoType() || schema.isCaseClassType()) {
+            data = new Object[schema.getFieldIndexes().length];
+            for (int i = 0; i < schema.getFieldNames().length; i++) {
+                data[i] = getFieldValue(schema.getFieldNames()[i], input);
+            }
+        } else {
+            throw new IllegalArgumentException("Failed to get field values from " + schema.getTypeInfo());
+        }
+        return data;
+    }
+
+    private Object getFieldValue(String fieldName, T input) {
+        // TODO: Cache Field Accessor
+        Field field = TypeExtractor.getDeclaredField(schema.getTypeInfo().getTypeClass(), fieldName);
+        if (field == null) {
+            throw new IllegalArgumentException(fieldName + " is not found in " + schema.getTypeInfo());
+        }
+        if (!field.isAccessible()) {
+            field.setAccessible(true);
+        }
+        try {
+            return field.get(input);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
new file mode 100644
index 0000000..20ca535
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Convert SiddhiCEPExecutionPlan to SiddhiCEP Operator and build output DataStream
+ */
+public class SiddhiStreamFactory {
+    @SuppressWarnings("unchecked")
+    public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream) {
+        return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context));
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
new file mode 100644
index 0000000..88c15eb
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Siddhi Tuple Utility methods
+ */
+public class SiddhiTupleFactory {
+    /**
+     * Convert object array to type of Tuple{N} where N is between 0 to 25.
+     *
+     * @throws IllegalArgumentException if rows's length > 25
+     */
+    public static <T extends Tuple> T newTuple(Object[] row) {
+        Preconditions.checkNotNull(row, "Tuple row is null");
+        switch (row.length) {
+            case 0:
+                return setTupleValue(new Tuple0(), row);
+            case 1:
+                return setTupleValue(new Tuple1(), row);
+            case 2:
+                return setTupleValue(new Tuple2(), row);
+            case 3:
+                return setTupleValue(new Tuple3(), row);
+            case 4:
+                return setTupleValue(new Tuple4(), row);
+            case 5:
+                return setTupleValue(new Tuple5(), row);
+            case 6:
+                return setTupleValue(new Tuple6(), row);
+            case 7:
+                return setTupleValue(new Tuple7(), row);
+            case 8:
+                return setTupleValue(new Tuple8(), row);
+            case 9:
+                return setTupleValue(new Tuple9(), row);
+            case 10:
+                return setTupleValue(new Tuple10(), row);
+            case 11:
+                return setTupleValue(new Tuple11(), row);
+            case 12:
+                return setTupleValue(new Tuple12(), row);
+            case 13:
+                return setTupleValue(new Tuple13(), row);
+            case 14:
+                return setTupleValue(new Tuple14(), row);
+            case 15:
+                return setTupleValue(new Tuple15(), row);
+            case 16:
+                return setTupleValue(new Tuple16(), row);
+            case 17:
+                return setTupleValue(new Tuple17(), row);
+            case 18:
+                return setTupleValue(new Tuple18(), row);
+            case 19:
+                return setTupleValue(new Tuple19(), row);
+            case 20:
+                return setTupleValue(new Tuple20(), row);
+            case 21:
+                return setTupleValue(new Tuple21(), row);
+            case 22:
+                return setTupleValue(new Tuple22(), row);
+            case 23:
+                return setTupleValue(new Tuple23(), row);
+            case 24:
+                return setTupleValue(new Tuple24(), row);
+            case 25:
+                return setTupleValue(new Tuple25(), row);
+            default:
+                throw new IllegalArgumentException("Too long row: " + row.length + ", unable to convert to Tuple");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Tuple> T setTupleValue(Tuple tuple, Object[] row) {
+        if (row.length != tuple.getArity()) {
+            throw new IllegalArgumentException("Row length" + row.length + " is not equal with tuple's arity: " + tuple.getArity());
+        }
+        for (int i = 0; i < row.length; i++) {
+            tuple.setField(row[i], i);
+        }
+        return (T) tuple;
+    }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
new file mode 100644
index 0000000..22405c9
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi Type Utils for conversion between Java Type, Siddhi Field Type, Stream Definition, and Flink Type Information.
+ */
+public class SiddhiTypeFactory {
+    private static final Map<Class<?>, Attribute.Type> JAVA_TO_SIDDHI_TYPE = new HashMap<>();
+    private static final Map<Attribute.Type, Class<?>> SIDDHI_TO_JAVA_TYPE = new HashMap<>();
+
+    static {
+        registerType(String.class, Attribute.Type.STRING);
+        registerType(Integer.class, Attribute.Type.INT);
+        registerType(int.class, Attribute.Type.INT);
+        registerType(Long.class, Attribute.Type.LONG);
+        registerType(long.class, Attribute.Type.LONG);
+        registerType(Float.class, Attribute.Type.FLOAT);
+        registerType(float.class, Attribute.Type.FLOAT);
+        registerType(Double.class, Attribute.Type.DOUBLE);
+        registerType(double.class, Attribute.Type.DOUBLE);
+        registerType(Boolean.class, Attribute.Type.BOOL);
+        registerType(boolean.class, Attribute.Type.BOOL);
+    }
+
+    public static void registerType(Class<?> javaType, Attribute.Type siddhiType) {
+        if (JAVA_TO_SIDDHI_TYPE.containsKey(javaType)) {
+            throw new IllegalArgumentException("Java type: " + javaType + " or siddhi type: " + siddhiType + " were already registered");
+        }
+        JAVA_TO_SIDDHI_TYPE.put(javaType, siddhiType);
+        SIDDHI_TO_JAVA_TYPE.put(siddhiType, javaType);
+    }
+
+    public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) {
+        SiddhiManager siddhiManager = null;
+        SiddhiAppRuntime runtime = null;
+        try {
+            siddhiManager = new SiddhiManager();
+            runtime = siddhiManager.createSiddhiAppRuntime(executionPlan);
+            Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap();
+            if (definitionMap.containsKey(streamId)) {
+                return definitionMap.get(streamId);
+            } else {
+                throw new IllegalArgumentException("Unknown stream id" + streamId);
+            }
+        } finally {
+            if (runtime != null) {
+                runtime.shutdown();
+            }
+            if (siddhiManager != null) {
+                siddhiManager.shutdown();
+            }
+        }
+    }
+
+    public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(AbstractDefinition definition) {
+        int tupleSize = definition.getAttributeList().size();
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("Tuple").append(tupleSize);
+        stringBuilder.append("<");
+        List<String> attributeTypes = new ArrayList<>();
+        for (Attribute attribute : definition.getAttributeList()) {
+            attributeTypes.add(getJavaType(attribute.getType()).getName());
+        }
+        stringBuilder.append(StringUtils.join(attributeTypes, ","));
+        stringBuilder.append(">");
+        try {
+            return TypeInfoParser.parse(stringBuilder.toString());
+        } catch (IllegalArgumentException ex) {
+            throw new IllegalArgumentException("Unable to parse " + stringBuilder.toString(), ex);
+        }
+    }
+
+    public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(String executionPlan, String streamId) {
+        return getTupleTypeInformation(getStreamDefinition(executionPlan, streamId));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static final TypeInformation<?> MAP_TYPE_INFORMATION = TypeExtractor.createTypeInfo(new HashMap<String, Object>().getClass());
+
+    public static TypeInformation<Map<String, Object>> getMapTypeInformation() {
+        return (TypeInformation<Map<String, Object>>) MAP_TYPE_INFORMATION;
+    }
+
+    public static <F> Attribute.Type getAttributeType(TypeInformation<F> fieldType) {
+        if (JAVA_TO_SIDDHI_TYPE.containsKey(fieldType.getTypeClass())) {
+            return JAVA_TO_SIDDHI_TYPE.get(fieldType.getTypeClass());
+        } else {
+            return Attribute.Type.OBJECT
+                ;
+        }
+    }
+
+    public static Class<?> getJavaType(Attribute.Type attributeType) {
+        if (!SIDDHI_TO_JAVA_TYPE.containsKey(attributeType)) {
+            throw new IllegalArgumentException("Unable to get java type for siddhi attribute type: " + attributeType);
+        }
+        return SIDDHI_TO_JAVA_TYPE.get(attributeType);
+    }
+
+    public static <T> TypeInformation<Tuple2<String, T>> getStreamTupleTypeInformation(TypeInformation<T> typeInformation) {
+        return TypeInfoParser.parse("Tuple2<String," + typeInformation.getTypeClass().getName() + ">");
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
new file mode 100755
index 0000000..5c16c71
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Flink-siddhi library integration test cases
+ */
+public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {
+
+    @Rule
+    public transient TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Test
+    public void testSimpleWriteAndRead() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.fromElements(
+            Event.of(1, "start", 1.0),
+            Event.of(2, "middle", 2.0),
+            Event.of(3, "end", 3.0),
+            Event.of(4, "start", 4.0),
+            Event.of(5, "middle", 5.0),
+            Event.of(6, "end", 6.0)
+        );
+
+        String path = tempFolder.newFile().toURI().toString();
+        input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
+            @Override
+            public Event map(Event event) throws Exception {
+                return event;
+            }
+        })).writeAsText(path);
+        env.execute();
+        Assert.assertEquals(6, getLineCount(path));
+    }
+
+    @Test
+    public void testSimplePojoStreamAndReturnPojo() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.fromElements(
+            Event.of(1, "start", 1.0),
+            Event.of(2, "middle", 2.0),
+            Event.of(3, "end", 3.0),
+            Event.of(4, "start", 4.0),
+            Event.of(5, "middle", 5.0),
+            Event.of(6, "end", 6.0)
+        );
+
+        DataStream<Event> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price")
+            .cql("from inputStream insert into  outputStream")
+            .returns("outputStream", Event.class);
+        String path = tempFolder.newFile().toURI().toString();
+        output.print();
+        env.execute();
+    }
+
+    @Test
+    public void testUnboundedPojoSourceAndReturnTuple() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+        DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream");
+
+        DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
+            @Override
+            public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception {
+                return value.f1;
+            }
+        });
+        String resultPath = tempFolder.newFile().toURI().toString();
+        following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testUnboundedTupleSourceAndReturnTuple() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Tuple4<Integer, String, Double, Long>> input = env
+            .addSource(new RandomTupleSource(5).closeDelay(1500)).keyBy(1);
+
+        DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<String> input = env.addSource(new RandomWordSource(5).closeDelay(1500));
+
+        DataStream<Tuple1<String>> output = SiddhiCEP
+            .define("wordStream", input, "words")
+            .cql("from wordStream select words insert into  outputStream")
+            .returns("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test(expected = InvalidTypesException.class)
+    public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5).closeDelay(1500));
+
+        DataStream<Tuple5<Long, Integer, String, Double, Long>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream");
+
+        DataStream<Long> following = output.map(new MapFunction<Tuple5<Long, Integer, String, Double, Long>, Long>() {
+            @Override
+            public Long map(Tuple5<Long, Integer, String, Double, Long> value) throws Exception {
+                return value.f0;
+            }
+        });
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+        env.execute();
+    }
+
+    @Test
+    public void testUnboundedPojoStreamAndReturnMap() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+        DataStream<Map<String, Object>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+        input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
+            @Override
+            public long extractAscendingTimestamp(Event element) {
+                return element.getTimestamp();
+            }
+        });
+
+        DataStream<Event> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream", Event.class);
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+
+    @Test
+    public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(2), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(2), "input2");
+        DataStream<Event> input3 = env.addSource(new RandomEventSource(2), "input2");
+        DataStream<Event> output = SiddhiCEP
+            .define("inputStream1", input1, "id", "name", "price", "timestamp")
+            .union("inputStream2", input2, "id", "name", "price", "timestamp")
+            .union("inputStream3", input3, "id", "name", "price", "timestamp")
+            .cql(
+                "from inputStream1 select timestamp, id, name, price insert into outputStream;"
+                    + "from inputStream2 select timestamp, id, name, price insert into outputStream;"
+                    + "from inputStream3 select timestamp, id, name, price insert into outputStream;"
+            )
+            .returns("outputStream", Event.class);
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(6, getLineCount(resultPath));
+    }
+
+    /**
+     * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Joins</a>
+     */
+    @Test
+    public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
+
+        DataStream<? extends Map> output = SiddhiCEP
+            .define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp")
+            .union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp")
+            .cql(
+                "from inputStream1#window.length(5) as s1 "
+                    + "join inputStream2#window.time(500) as s2 "
+                    + "on s1.id == s2.id "
+                    + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+                    + "insert into JoinStream;"
+            )
+            .returnAsMap("JoinStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    /**
+     * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Patterns</a>
+     */
+    @Test
+    public void testUnboundedPojoStreamSimplePatternMatch() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input2");
+
+        DataStream<Map<String, Object>> output = SiddhiCEP
+            .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
+            .union("inputStream2", input2.keyBy("name"), "id", "name", "price", "timestamp")
+            .cql(
+                "from every s1 = inputStream1[id == 2] "
+                    + " -> s2 = inputStream2[id == 3] "
+                    + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+                    + "insert into outputStream"
+            )
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(1, getLineCount(resultPath));
+        compareResultsByLinesInMemory("{id_1=2, name_1=test_event, id_2=3, name_2=test_event}", resultPath);
+    }
+
+    /**
+     * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Sequences</a>
+     */
+    @Test
+    public void testUnboundedPojoStreamSimpleSequences() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
+        DataStream<Map<String, Object>> output = SiddhiCEP
+            .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
+            .union("inputStream2", input1.keyBy("name"), "id", "name", "price", "timestamp")
+            .cql(
+                "from every s1 = inputStream1[id == 2]+ , "
+                    + "s2 = inputStream2[id == 3]? "
+                    + "within 1000 second "
+                    + "select s1[0].name as n1, s2.name as n2 "
+                    + "insert into outputStream"
+            )
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(1, getLineCount(resultPath));
+    }
+
+    private static int getLineCount(String resPath) throws IOException {
+        List<String> result = new LinkedList<>();
+        readAllResultLines(result, resPath);
+        return result.size();
+    }
+
+    @Test
+    public void testCustomizeSiddhiFunctionExtension() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+        cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
+
+        DataStream<Map<String, Object>> output = cep
+            .from("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into  outputStream")
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
+
+        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+        cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
+
+        cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
+        cep.registerStream("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp");
+
+        DataStream<Tuple4<Long, String, Double, Double>> output = cep
+            .from("inputStream1").union("inputStream2")
+            .cql(
+                "from inputStream1#window.length(5) as s1 "
+                    + "join inputStream2#window.time(500) as s2 "
+                    + "on s1.id == s2.id "
+                    + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+                    + "insert into JoinStream;"
+            )
+            .returns("JoinStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test(expected = UndefinedStreamException.class)
+    public void testTriggerUndefinedStreamException() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+
+        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+        cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
+
+        DataStream<Map<String, Object>> output = cep
+            .from("inputStream1").union("inputStream2")
+            .cql(
+                "from inputStream1#window.length(5) as s1 "
+                    + "join inputStream2#window.time(500) as s2 "
+                    + "on s1.id == s2.id "
+                    + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+                    + "insert into JoinStream;"
+            )
+            .returnAsMap("JoinStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
new file mode 100644
index 0000000..582f1cd
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.extension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.wso2.siddhi.core.config.SiddhiAppContext;
+import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.core.util.config.ConfigReader;
+import org.wso2.siddhi.query.api.definition.Attribute;
+
+public class CustomPlusFunctionExtension extends FunctionExecutor {
+    private Attribute.Type returnType;
+
+    /**
+     * The initialization method for FunctionExecutor, this method will be called before the other methods
+     */
+    @Override
+    protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
+        for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) {
+            Attribute.Type attributeType = expressionExecutor.getReturnType();
+            if (attributeType == Attribute.Type.DOUBLE) {
+                returnType = attributeType;
+
+            } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) {
+                throw new SiddhiAppCreationException("Plus cannot have parameters with types String or Bool");
+            } else {
+                returnType = Attribute.Type.LONG;
+            }
+        }
+    }
+
+    /**
+     * The main execution method which will be called upon event arrival
+     * when there are more then one function parameter
+     *
+     * @param data the runtime values of function parameters
+     * @return the function result
+     */
+    @Override
+    protected Object execute(Object[] data) {
+        if (returnType == Attribute.Type.DOUBLE) {
+            double total = 0;
+            for (Object aObj : data) {
+                total += Double.parseDouble(String.valueOf(aObj));
+            }
+
+            return total;
+        } else {
+            long total = 0;
+            for (Object aObj : data) {
+                total += Long.parseLong(String.valueOf(aObj));
+            }
+            return total;
+        }
+    }
+
+    /**
+     * The main execution method which will be called upon event arrival
+     * when there are zero or one function parameter
+     *
+     * @param data null if the function parameter count is zero or
+     *             runtime data value of the function parameter
+     * @return the function result
+     */
+    @Override
+    protected Object execute(Object data) {
+        if (returnType == Attribute.Type.DOUBLE) {
+            return Double.parseDouble(String.valueOf(data));
+        } else {
+            return Long.parseLong(String.valueOf(data));
+        }
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+
+    @Override
+    public Map<String, Object> currentState() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public void restoreState(Map<String, Object> map) {
+
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
new file mode 100644
index 0000000..d271c89
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SiddhiSyntaxTest {
+
+    private SiddhiManager siddhiManager;
+
+    @Before
+    public void setUp() {
+        siddhiManager = new SiddhiManager();
+    }
+
+    @After
+    public void after() {
+        siddhiManager = new SiddhiManager();
+    }
+
+    @Test
+    public void testSimplePlan() throws InterruptedException {
+        SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
+            "define stream inStream (name string, value double);"
+                + "from inStream insert into outStream");
+        runtime.start();
+
+        final List<Object[]> received = new ArrayList<>(3);
+        InputHandler inputHandler = runtime.getInputHandler("inStream");
+        Assert.assertNotNull(inputHandler);
+
+        try {
+            runtime.getInputHandler("unknownStream");
+            Assert.fail("Should throw exception for getting input handler for unknown streamId.");
+        } catch (Exception ex) {
+            // Expected exception for getting input handler for illegal streamId.
+        }
+
+        runtime.addCallback("outStream", new StreamCallback() {
+            @Override
+            public void receive(Event[] events) {
+                for (Event event : events) {
+                    received.add(event.getData());
+                }
+            }
+        });
+
+        inputHandler.send(new Object[]{"a", 1.1});
+        inputHandler.send(new Object[]{"b", 1.2});
+        inputHandler.send(new Object[]{"c", 1.3});
+        Thread.sleep(100);
+        Assert.assertEquals(3, received.size());
+        Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1});
+        Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2});
+        Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3});
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
new file mode 100644
index 0000000..db05e9d
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Test;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import static org.junit.Assert.*;
+
+public class SiddhiExecutionPlanSchemaTest {
+    @Test
+    public void testStreamSchemaWithPojo() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+
+        SiddhiStreamSchema<Event> schema = new SiddhiStreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(4, schema.getFieldIndexes().length);
+
+        StreamDefinition streamDefinition = schema.getStreamDefinition("test_stream");
+        assertArrayEquals(new String[]{"id", "timestamp", "name", "price"}, streamDefinition.getAttributeNameArray());
+
+        assertEquals(Attribute.Type.INT, streamDefinition.getAttributeType("id"));
+        assertEquals(Attribute.Type.LONG, streamDefinition.getAttributeType("timestamp"));
+        assertEquals(Attribute.Type.STRING, streamDefinition.getAttributeType("name"));
+        assertEquals(Attribute.Type.DOUBLE, streamDefinition.getAttributeType("price"));
+
+        assertEquals("define stream test_stream (id int,timestamp long,name string,price double);", schema.getStreamDefinitionExpression("test_stream"));
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
new file mode 100644
index 0000000..f876b2b
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StreamSchemaTest {
+    @Test
+    public void testStreamSchemaWithPojo() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+        StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(4, schema.getFieldIndexes().length);
+        assertEquals(Event.class, schema.getTypeInfo().getTypeClass());
+    }
+
+    @Test
+    public void testStreamSchemaWithTuple() {
+        TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+        StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+        assertEquals(4, schema.getFieldIndexes().length);
+        assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+    }
+
+    @Test
+    public void testStreamSchemaWithPrimitive() {
+        TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+        StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
+        assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+        assertEquals(1, schema.getFieldIndexes().length);
+        assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testStreamSchemaWithPojoAndUnknownField() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price", "unknown");
+    }
+
+    @Test
+    public void testStreamTupleSerializerWithPojo() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+        StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(Event.class, schema.getTypeInfo().getTypeClass());
+
+        TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+        assertEquals("Java Tuple2<String, GenericType<" + Event.class.getName() + ">>", tuple2TypeInformation.toString());
+    }
+
+    @Test
+    public void testStreamTupleSerializerWithTuple() {
+        TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+        StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+        TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+        assertEquals("Java Tuple2<String, GenericType<" + Tuple4.class.getName() + ">>", tuple2TypeInformation.toString());
+    }
+
+    @Test
+    public void testStreamTupleSerializerWithPrimitive() {
+        TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+        StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
+        assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+        TypeInformation<Tuple2<String, String>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+        assertEquals("Java Tuple2<String, String>", tuple2TypeInformation.toString());
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java
new file mode 100644
index 0000000..190208c
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamSerializerTest {
+    private static final long CURRENT = System.currentTimeMillis();
+
+    @Test
+    public void testSimplePojoRead() {
+        Event event = new Event();
+        event.setId(1);
+        event.setName("test");
+        event.setPrice(56.7);
+        event.setTimestamp(CURRENT);
+
+        StreamSchema<Event> schema = new StreamSchema<>(TypeExtractor.createTypeInfo(Event.class), "id", "name", "price", "timestamp");
+        StreamSerializer<Event> reader = new StreamSerializer<>(schema);
+        Assert.assertArrayEquals(new Object[]{1, "test", 56.7, CURRENT}, reader.getRow(event));
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
new file mode 100644
index 0000000..357e1d2
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.Objects;
+
+public class Event {
+    private long timestamp;
+    private String name;
+    private double price;
+    private int id;
+
+    public double getPrice() {
+        return price;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "Event(" + id + ", " + name + ", " + price + ", " + timestamp + ")";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Event) {
+            Event other = (Event) obj;
+
+            return name.equals(other.name) && price == other.price && id == other.id && timestamp == other.timestamp;
+        } else {
+            return false;
+        }
+    }
+
+    public static Event of(int id, String name, double price) {
+        Event event = new Event();
+        event.setId(id);
+        event.setName(name);
+        event.setPrice(price);
+        event.setTimestamp(System.currentTimeMillis());
+        return event;
+    }
+
+    public static Event of(int id, String name, double price, long timestamp) {
+        Event event = new Event();
+        event.setId(id);
+        event.setName(name);
+        event.setPrice(price);
+        event.setTimestamp(timestamp);
+        return event;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, price, id);
+    }
+
+    public static TypeSerializer<Event> createTypeSerializer() {
+        TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+
+        return typeInformation.createSerializer(new ExecutionConfig());
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public void setPrice(double price) {
+        this.price = price;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
new file mode 100644
index 0000000..bb95fdd
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomEventSource implements SourceFunction<Event> {
+    private final int count;
+    private final Random random;
+    private final long initialTimestamp;
+
+    private volatile boolean isRunning = true;
+    private volatile int number = 0;
+    private volatile long closeDelayTimestamp = 1000;
+
+    public RandomEventSource(int count, long initialTimestamp) {
+        this.count = count;
+        this.random = new Random();
+        this.initialTimestamp = initialTimestamp;
+    }
+
+    public RandomEventSource() {
+        this(Integer.MAX_VALUE, System.currentTimeMillis());
+    }
+
+    public RandomEventSource(int count) {
+        this(count, System.currentTimeMillis());
+    }
+
+    public RandomEventSource closeDelay(long delayTimestamp) {
+        this.closeDelayTimestamp = delayTimestamp;
+        return this;
+    }
+
+    @Override
+    public void run(SourceContext<Event> ctx) throws Exception {
+        while (isRunning) {
+            ctx.collect(Event.of(number, "test_event", random.nextDouble(), initialTimestamp + 1000 * number));
+            number++;
+            if (number >= this.count) {
+                cancel();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        this.isRunning = false;
+        try {
+            Thread.sleep(closeDelayTimestamp);
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
new file mode 100644
index 0000000..35121f7
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomTupleSource implements SourceFunction<Tuple4<Integer, String, Double, Long>> {
+    private final int count;
+    private final Random random;
+    private final long initialTimestamp;
+
+    private volatile boolean isRunning = true;
+    private volatile int number = 0;
+    private long closeDelayTimestamp;
+
+    public RandomTupleSource(int count, long initialTimestamp) {
+        this.count = count;
+        this.random = new Random();
+        this.initialTimestamp = initialTimestamp;
+    }
+
+    public RandomTupleSource() {
+        this(Integer.MAX_VALUE, System.currentTimeMillis());
+    }
+
+    public RandomTupleSource(int count) {
+        this(count, System.currentTimeMillis());
+    }
+
+
+    public RandomTupleSource closeDelay(long delayTimestamp) {
+        this.closeDelayTimestamp = delayTimestamp;
+        return this;
+    }
+
+    @Override
+    public void run(SourceContext<Tuple4<Integer, String, Double, Long>> ctx) throws Exception {
+        while (isRunning) {
+            ctx.collect(Tuple4.of(number, "test_tuple", random.nextDouble(), initialTimestamp + 1000 * number));
+            number++;
+            if (number >= this.count) {
+                cancel();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        this.isRunning = false;
+        try {
+            Thread.sleep(this.closeDelayTimestamp);
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
new file mode 100644
index 0000000..19d904f
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomWordSource implements SourceFunction<String> {
+    private static final String[] WORDS = new String[] {
+        "To be, or not to be,--that is the question:--",
+        "Whether 'tis nobler in the mind to suffer",
+        "The slings and arrows of outrageous fortune",
+        "Or to take arms against a sea of troubles,",
+        "And by opposing end them?--To die,--to sleep,--",
+        "No more; and by a sleep to say we end",
+        "The heartache, and the thousand natural shocks",
+        "That flesh is heir to,--'tis a consummation",
+        "Devoutly to be wish'd. To die,--to sleep;--",
+        "To sleep! perchance to dream:--ay, there's the rub;",
+        "For in that sleep of death what dreams may come,",
+        "When we have shuffled off this mortal coil,",
+        "Must give us pause: there's the respect",
+        "That makes calamity of so long life;",
+        "For who would bear the whips and scorns of time,",
+        "The oppressor's wrong, the proud man's contumely,",
+        "The pangs of despis'd love, the law's delay,",
+        "The insolence of office, and the spurns",
+        "That patient merit of the unworthy takes,",
+        "When he himself might his quietus make",
+        "With a bare bodkin? who would these fardels bear,",
+        "To grunt and sweat under a weary life,",
+        "But that the dread of something after death,--",
+        "The undiscover'd country, from whose bourn",
+        "No traveller returns,--puzzles the will,",
+        "And makes us rather bear those ills we have",
+        "Than fly to others that we know not of?",
+        "Thus conscience does make cowards of us all;",
+        "And thus the native hue of resolution",
+        "Is sicklied o'er with the pale cast of thought;",
+        "And enterprises of great pith and moment,",
+        "With this regard, their currents turn awry,",
+        "And lose the name of action.--Soft you now!",
+        "The fair Ophelia!--Nymph, in thy orisons",
+        "Be all my sins remember'd."
+    };
+
+    private final int count;
+    private final Random random;
+    private final long initialTimestamp;
+
+    private volatile boolean isRunning = true;
+    private volatile int number = 0;
+    private long closeDelayTimestamp;
+
+    public RandomWordSource(int count, long initialTimestamp) {
+        this.count = count;
+        this.random = new Random();
+        this.initialTimestamp = initialTimestamp;
+    }
+
+    public RandomWordSource() {
+        this(Integer.MAX_VALUE, System.currentTimeMillis());
+    }
+
+    public RandomWordSource(int count) {
+        this(count, System.currentTimeMillis());
+    }
+
+
+    public RandomWordSource closeDelay(long delayTimestamp) {
+        this.closeDelayTimestamp = delayTimestamp;
+        return this;
+    }
+
+    @Override
+    public void run(SourceContext<String> ctx) throws Exception {
+        while (isRunning) {
+            ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], initialTimestamp + 1000 * number);
+            number++;
+            if (number >= this.count) {
+                cancel();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        this.isRunning = false;
+        try {
+            Thread.sleep(this.closeDelayTimestamp);
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
new file mode 100644
index 0000000..4753a3f
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class SiddhiTupleFactoryTest {
+    @Test
+    public void testConvertObjectArrayToTuple() {
+        Object[] row = new Object[]{1, "message", 1234567L, true, new Object()};
+        Tuple5 tuple5 = SiddhiTupleFactory.newTuple(row);
+        assertEquals(5, tuple5.getArity());
+        assertArrayEquals(row, new Object[]{
+            tuple5.f0,
+            tuple5.f1,
+            tuple5.f2,
+            tuple5.f3,
+            tuple5.f4
+        });
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConvertTooLongObjectArrayToTuple() {
+        Object[] row = new Object[26];
+        SiddhiTupleFactory.newTuple(row);
+    }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
new file mode 100644
index 0000000..c4a1e8c
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SiddhiTypeFactoryTest {
+    @Test
+    public void testTypeInfoParser() {
+        TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>");
+        Assert.assertNotNull(type1);
+        TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">");
+        Assert.assertNotNull(type2);
+    }
+
+    public static class InnerPojo {
+    }
+
+    @Test
+    public void testBuildTypeInformationForSiddhiStream() {
+        String query = "define stream inputStream (timestamp long, name string, value double);"
+            + "from inputStream select name, value insert into outputStream;";
+        TypeInformation<Tuple3<Long, String, Double>> inputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "inputStream");
+        TypeInformation<Tuple2<String, Double>> outputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "outputStream");
+
+        Assert.assertNotNull(inputStreamType);
+        Assert.assertNotNull(outputStreamType);
+    }
+}
diff --git a/flink-library-siddhi/src/test/resources/log4j-test.properties b/flink-library-siddhi/src/test/resources/log4j-test.properties
new file mode 100755
index 0000000..5b1e4ed
--- /dev/null
+++ b/flink-library-siddhi/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-library-siddhi/src/test/resources/logback-test.xml b/flink-library-siddhi/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..b7a5793
--- /dev/null
+++ b/flink-library-siddhi/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+		</encoder>
+	</appender>
+
+	<root level="WARN">
+		<appender-ref ref="STDOUT"/>
+	</root>
+
+	<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+	<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+	<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+	<logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
diff --git a/pom.xml b/pom.xml
index 4070698..c2d36ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
     <module>flink-connector-netty</module>
     <module>flink-connector-akka</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-library-siddhi</module>
   </modules>
 
   <properties>