[BAHIR-144] Add flink-library-siddhi
This closes #22
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
new file mode 100644
index 0000000..91c6797
--- /dev/null
+++ b/flink-library-siddhi/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink-parent_2.11</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-library-siddhi_2.11</artifactId>
+ <name>flink-library-siddhi</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <siddhi.version>4.0.0-M120</siddhi.version>
+ </properties>
+
+ <dependencies>
+ <!-- core dependencies -->
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-core</artifactId>
+ <version>${siddhi.version}</version>
+ <exclusions>
+ <exclusion> <!-- declare the exclusion here -->
+ <groupId>org.apache.directory.jdbm</groupId>
+ <artifactId>apacheds-jdbm1</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-query-api</artifactId>
+ <version>${siddhi.version}</version>
+ <exclusions>
+ <exclusion> <!-- declare the exclusion here -->
+ <groupId>org.apache.directory.jdbm</groupId>
+ <artifactId>apacheds-jdbm1</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Core streaming API -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <repositories>
+ <repository>
+ <id>wso2-maven2-repository</id>
+ <name>WSO2 Maven2 Repository</name>
+ <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
+ </repository>
+ </repositories>
+</project>
\ No newline at end of file
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
new file mode 100644
index 0000000..a63dbf6
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Siddhi CEP Environment, provides utility methods to
+ *
+ * <ul>
+ * <li>Initialize SiddhiCEP environment based on {@link StreamExecutionEnvironment}</li>
+ * <li>Register {@link SiddhiStream} with field-based StreamSchema and bind with physical source {@link DataStream}</li>
+ * <li>Define rich-featured Siddhi CEP execution plan with SQL-Like query for SiddhiStreamOperator</li>
+ * <li>Transform and connect source DataStream to SiddhiStreamOperator</li>
+ * <li>Register customizable siddhi plugins to extend built-in CEP functions</li>
+ * </ul>
+ * </p>
+ *
+ * @see SiddhiStream
+ * @see StreamSchema
+ * @see SiddhiStreamOperator
+ */
+@PublicEvolving
+public class SiddhiCEP {
+ private final StreamExecutionEnvironment executionEnvironment;
+ private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
+ private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
+ private final Map<String, Class<?>> extensions = new HashMap<>();
+
+ /**
+ * @param streamExecutionEnvironment Stream Execution Environment
+ */
+ private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
+ this.executionEnvironment = streamExecutionEnvironment;
+ }
+
+ /**
+ * @see DataStream
+ * @return Siddhi streamId and source DataStream mapping.
+ */
+ public Map<String, DataStream<?>> getDataStreams() {
+ return this.dataStreams;
+ }
+
+ /**
+ * @see SiddhiStreamSchema
+ * @return Siddhi streamId and stream schema mapping.
+ */
+ public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
+ return this.dataStreamSchemas;
+ }
+
+ /**
+ * @param streamId Siddhi streamId to check.
+ * @return whether the given streamId is defined in current SiddhiCEP environment.
+ */
+ public boolean isStreamDefined(String streamId) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ return dataStreams.containsKey(streamId);
+ }
+
+ /**
+ * @return Registered siddhi extensions.
+ */
+ public Map<String, Class<?>> getExtensions() {
+ return this.extensions;
+ }
+
+ /**
+ * Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException}
+ * @param streamId Siddhi streamId to check.
+ * @throws UndefinedStreamException throws if given streamId is not defined
+ */
+ public void checkStreamDefined(String streamId) throws UndefinedStreamException {
+ Preconditions.checkNotNull(streamId,"streamId");
+ if (!isStreamDefined(streamId)) {
+ throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
+ }
+ }
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema,
+ * and select as initial source stream to connect to siddhi operator.
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ *
+ * @see #registerStream(String, DataStream, String...)
+ * @see #from(String)
+ */
+ public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment());
+ return environment.from(streamId, dataStream, fieldNames);
+ }
+
+ /**
+ * Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields,
+ * and select the registered stream as initial stream to connect to Siddhi Runtime.
+ *
+ * @see #registerStream(String, DataStream, String...)
+ * @see #from(String)
+ */
+ public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ this.registerStream(streamId, dataStream, fieldNames);
+ return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+ }
+
+ /**
+ * Select stream by streamId as initial stream to connect to Siddhi Runtime.
+ *
+ * @param streamId Siddhi Stream Name
+ * @param <T> Stream Generic Type
+ */
+ public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+ }
+
+ /**
+ * Select one stream and union other streams by streamId to connect to Siddhi Stream Operator.
+ *
+ * @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context.
+ * @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context.
+ *
+ * @return The UnionSiddhiStream Builder
+ */
+ public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) {
+ Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+ Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+ return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds);
+ }
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema.
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ */
+ public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ if (isStreamDefined(streamId)) {
+ throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
+ }
+ dataStreams.put(streamId, dataStream);
+ SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+ schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+ dataStreamSchemas.put(streamId, schema);
+ }
+
+ /**
+ * @return Current StreamExecutionEnvironment.
+ */
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return executionEnvironment;
+ }
+
+ /**
+ * Register Siddhi CEP Extensions
+ *
+ * @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a>
+ * @param extensionName Unique siddhi extension name
+ * @param extensionClass Siddhi Extension class
+ */
+ public void registerExtension(String extensionName, Class<?> extensionClass) {
+ if (extensions.containsKey(extensionName)) {
+ throw new IllegalArgumentException("Extension named " + extensionName + " already registered");
+ }
+ extensions.put(extensionName, extensionClass);
+ }
+
+ /**
+ * Get registered source DataStream with Siddhi streamId.
+ *
+ * @param streamId Siddhi streamId
+ * @return The source DataStream registered with Siddhi streamId
+ */
+ public <T> DataStream<T> getDataStream(String streamId) {
+ if (this.dataStreams.containsKey(streamId)) {
+ return (DataStream<T>) this.dataStreams.get(streamId);
+ } else {
+ throw new UndefinedStreamException("Undefined stream " + streamId);
+ }
+ }
+
+ /**
+ * Create new SiddhiCEP instance.
+ *
+ * @param streamExecutionEnvironment StreamExecutionEnvironment
+ * @return New SiddhiCEP instance.
+ */
+ public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
+ return new SiddhiCEP(streamExecutionEnvironment);
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
new file mode 100644
index 0000000..43d7436
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Stream API
+ */
+@PublicEvolving
+public abstract class SiddhiStream {
+ private final SiddhiCEP cepEnvironment;
+
+ /**
+ * @param cepEnvironment SiddhiCEP cepEnvironment.
+ */
+ public SiddhiStream(SiddhiCEP cepEnvironment) {
+ Preconditions.checkNotNull(cepEnvironment,"SiddhiCEP cepEnvironment is null");
+ this.cepEnvironment = cepEnvironment;
+ }
+
+ /**
+ * @return current SiddhiCEP cepEnvironment.
+ */
+ protected SiddhiCEP getCepEnvironment() {
+ return this.cepEnvironment;
+ }
+
+ /**
+ * @return Transform SiddhiStream to physical DataStream
+ */
+ protected abstract DataStream<Tuple2<String, Object>> toDataStream();
+
+ /**
+ * Convert DataStream<T> to DataStream<Tuple2<String,T>>.
+ * If it's KeyedStream. pass through original keySelector
+ */
+ protected <T> DataStream<Tuple2<String, Object>> convertDataStream(DataStream<T> dataStream, String streamId) {
+ final String streamIdInClosure = streamId;
+ DataStream<Tuple2<String, Object>> resultStream = dataStream.map(new MapFunction<T, Tuple2<String, Object>>() {
+ @Override
+ public Tuple2<String, Object> map(T value) throws Exception {
+ return Tuple2.of(streamIdInClosure, (Object) value);
+ }
+ });
+ if (dataStream instanceof KeyedStream) {
+ final KeySelector<T, Object> keySelector = ((KeyedStream<T, Object>) dataStream).getKeySelector();
+ final KeySelector<Tuple2<String, Object>, Object> keySelectorInClosure = new KeySelector<Tuple2<String, Object>, Object>() {
+ @Override
+ public Object getKey(Tuple2<String, Object> value) throws Exception {
+ return keySelector.getKey((T) value.f1);
+ }
+ };
+ return resultStream.keyBy(keySelectorInClosure);
+ } else {
+ return resultStream;
+ }
+ }
+
+ /**
+ * ExecutableStream context to define execution logic, i.e. SiddhiCEP execution plan.
+ */
+ public abstract static class ExecutableStream extends SiddhiStream {
+ public ExecutableStream(SiddhiCEP environment) {
+ super(environment);
+ }
+
+ /**
+ * Siddhi Continuous Query Language (CQL)
+ *
+ * @param executionPlan Siddhi SQL-Like execution plan query
+ * @return ExecutionSiddhiStream context
+ */
+ public ExecutionSiddhiStream cql(String executionPlan) {
+ Preconditions.checkNotNull(executionPlan,"executionPlan");
+ return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getCepEnvironment());
+ }
+ }
+
+ /**
+ * Initial Single Siddhi Stream Context
+ */
+ public static class SingleSiddhiStream<T> extends ExecutableStream {
+ private final String streamId;
+
+ public SingleSiddhiStream(String streamId, SiddhiCEP environment) {
+ super(environment);
+ environment.checkStreamDefined(streamId);
+ this.streamId = streamId;
+ }
+
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and as the first stream of {@link UnionSiddhiStream}
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ *
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+ return union(streamId);
+ }
+
+ /**
+ * @param streamIds Defined siddhi streamIds to union
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String... streamIds) {
+ Preconditions.checkNotNull(streamIds,"streamIds");
+ return new UnionSiddhiStream<T>(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment());
+ }
+
+ @Override
+ protected DataStream<Tuple2<String, Object>> toDataStream() {
+ return convertDataStream(getCepEnvironment().getDataStream(this.streamId), this.streamId);
+ }
+ }
+
+ public static class UnionSiddhiStream<T> extends ExecutableStream {
+ private String firstStreamId;
+ private List<String> unionStreamIds;
+
+ public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) {
+ super(environment);
+ Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+ Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+ environment.checkStreamDefined(firstStreamId);
+ for (String unionStreamId : unionStreamIds) {
+ environment.checkStreamDefined(unionStreamId);
+ }
+ this.firstStreamId = firstStreamId;
+ this.unionStreamIds = unionStreamIds;
+ }
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and continue to union it with current stream.
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ *
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+ return union(streamId);
+ }
+
+ /**
+ * @param streamId another defined streamId to union with.
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String... streamId) {
+ List<String> newUnionStreamIds = new LinkedList<>();
+ newUnionStreamIds.addAll(unionStreamIds);
+ newUnionStreamIds.addAll(Arrays.asList(streamId));
+ return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment());
+ }
+
+ @Override
+ protected DataStream<Tuple2<String, Object>> toDataStream() {
+ final String localFirstStreamId = firstStreamId;
+ final List<String> localUnionStreamIds = this.unionStreamIds;
+ DataStream<Tuple2<String, Object>> dataStream = convertDataStream(getCepEnvironment().<T>getDataStream(localFirstStreamId), this.firstStreamId);
+ for (String unionStreamId : localUnionStreamIds) {
+ dataStream = dataStream.union(convertDataStream(getCepEnvironment().<T>getDataStream(unionStreamId), unionStreamId));
+ }
+ return dataStream;
+ }
+ }
+
+ public static class ExecutionSiddhiStream {
+ private final DataStream<Tuple2<String, Object>> dataStream;
+ private final SiddhiCEP environment;
+ private final String executionPlan;
+
+ public ExecutionSiddhiStream(DataStream<Tuple2<String, Object>> dataStream, String executionPlan, SiddhiCEP environment) {
+ this.executionPlan = executionPlan;
+ this.dataStream = dataStream;
+ this.environment = environment;
+ }
+
+ /**
+ * @param outStreamId The <code>streamId</code> to return as data stream.
+ * @param <T> Type information should match with stream definition.
+ * During execution phase, it will automatically build type information based on stream definition.
+ * @return Return output stream as Tuple
+ * @see SiddhiTypeFactory
+ */
+ public <T extends Tuple> DataStream<T> returns(String outStreamId) {
+ SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+ siddhiContext.setExecutionPlan(executionPlan);
+ siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+ siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+ siddhiContext.setOutputStreamId(outStreamId);
+ siddhiContext.setExtensions(environment.getExtensions());
+ siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+ TypeInformation<T> typeInformation =
+ SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId);
+ siddhiContext.setOutputStreamType(typeInformation);
+ return returnsInternal(siddhiContext);
+ }
+
+ /**
+ * @return Return output stream as <code>DataStream<Map<String,Object>></code>,
+ * out type is <code>LinkedHashMap<String,Object></code> and guarantee field order
+ * as defined in siddhi execution plan
+ * @see java.util.LinkedHashMap
+ */
+ public DataStream<Map<String, Object>> returnAsMap(String outStreamId) {
+ return this.returnsInternal(outStreamId, SiddhiTypeFactory.getMapTypeInformation());
+ }
+
+ /**
+ * @param outStreamId OutStreamId
+ * @param outType Output type class
+ * @param <T> Output type
+ * @return Return output stream as POJO class.
+ */
+ public <T> DataStream<T> returns(String outStreamId, Class<T> outType) {
+ TypeInformation<T> typeInformation = TypeExtractor.getForClass(outType);
+ return returnsInternal(outStreamId, typeInformation);
+ }
+
+ private <T> DataStream<T> returnsInternal(String outStreamId, TypeInformation<T> typeInformation) {
+ SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+ siddhiContext.setExecutionPlan(executionPlan);
+ siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+ siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+ siddhiContext.setOutputStreamId(outStreamId);
+ siddhiContext.setOutputStreamType(typeInformation);
+ siddhiContext.setExtensions(environment.getExtensions());
+ siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+ return returnsInternal(siddhiContext);
+ }
+
+ private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) {
+ return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream);
+ }
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
new file mode 100644
index 0000000..f65cc81
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class DuplicatedStreamException extends RuntimeException {
+ public DuplicatedStreamException(String message) {
+ super(message);
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
new file mode 100644
index 0000000..26254c2
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class UndefinedStreamException extends RuntimeException {
+ public UndefinedStreamException(String message) {
+ super(message);
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
new file mode 100755
index 0000000..8cb6d67
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+/**
+ * <h1>Siddhi Runtime Operator</h1>
+ *
+ * A flink Stream Operator to integrate with native siddhi execution runtime, extension and type schema mechanism/
+ *
+ * <ul>
+ * <li>
+ * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
+ * </li>
+ * <li>
+ * Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId
+ * </li>
+ * <li>
+ * Convert native {@link StreamRecord} to Siddhi {@link org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send to Siddhi Runtime.
+ * </li>
+ * <li>
+ * Listen output callback event and convert as expected output type according to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then output as typed DataStream.
+ * </li>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ *
+ * @param <IN> Input Element Type
+ * @param <OUT> Output Element Type
+ */
+public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<IN, OUT> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+ private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+ private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
+ private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
+
+ private final SiddhiOperatorContext siddhiPlan;
+ private final String executionExpression;
+ private final boolean isProcessingTime;
+ private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;
+
+ private transient SiddhiManager siddhiManager;
+ private transient SiddhiAppRuntime siddhiRuntime;
+ private transient Map<String, InputHandler> inputStreamHandlers;
+
+ // queue to buffer out of order stream records
+ private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
+
+ private transient ListState<byte[]> siddhiRuntimeState;
+ private transient ListState<byte[]> queuedRecordsState;
+
+ /**
+ * @param siddhiPlan Siddhi CEP Execution Plan
+ */
+ public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+ validate(siddhiPlan);
+ this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+ this.siddhiPlan = siddhiPlan;
+ this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+ this.streamRecordSerializers = new HashMap<>();
+
+ registerStreamRecordSerializers();
+ }
+
+ /**
+ * Register StreamRecordSerializer based on {@link StreamSchema}
+ */
+ private void registerStreamRecordSerializers() {
+ for (String streamId : this.siddhiPlan.getInputStreams()) {
+ streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
+ }
+ }
+
+ protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig);
+
+ protected StreamElementSerializer<IN> getStreamRecordSerializer(String streamId) {
+ if (streamRecordSerializers.containsKey(streamId)) {
+ return streamRecordSerializers.get(streamId);
+ } else {
+ throw new UndefinedStreamException("Stream " + streamId + " not defined");
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ String streamId = getStreamId(element.getValue());
+ StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+
+ if (isProcessingTime) {
+ processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
+ this.checkpointSiddhiRuntimeState();
+ } else {
+ PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+ // event time processing
+ // we have to buffer the elements until we receive the proper watermark
+ if (getExecutionConfig().isObjectReuseEnabled()) {
+ // copy the StreamRecord so that it cannot be changed
+ priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
+ } else {
+ priorityQueue.offer(element);
+ }
+ this.checkpointRecordQueueState();
+ }
+ }
+
+ protected abstract void processEvent(String streamId, StreamSchema<IN> schema, IN value, long timestamp) throws Exception;
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+ StreamRecord<IN> streamRecord = priorityQueue.poll();
+ String streamId = getStreamId(streamRecord.getValue());
+ long timestamp = streamRecord.getTimestamp();
+ StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+ processEvent(streamId, schema, streamRecord.getValue(), timestamp);
+ }
+ output.emitWatermark(mark);
+ }
+
+ public abstract String getStreamId(IN record);
+
+ public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
+ return priorityQueue;
+ }
+
+ protected SiddhiAppRuntime getSiddhiRuntime() {
+ return this.siddhiRuntime;
+ }
+
+ public InputHandler getSiddhiInputHandler(String streamId) {
+ return inputStreamHandlers.get(streamId);
+ }
+
+ protected SiddhiOperatorContext getSiddhiPlan() {
+ return this.siddhiPlan;
+ }
+
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+ super.setup(containingTask, config, output);
+ if (priorityQueue == null) {
+ priorityQueue = new PriorityQueue<>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
+ }
+ startSiddhiRuntime();
+ }
+
+ /**
+ * Send input data to siddhi runtime
+ */
+ protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException {
+ this.getSiddhiInputHandler(streamId).send(timestamp, data);
+ }
+
+ /**
+ * Validate execution plan during building DAG before submitting to execution environment and fail-fast.
+ */
+ private static void validate(final SiddhiOperatorContext siddhiPlan) {
+ SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
+ try {
+ siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan());
+ } finally {
+ siddhiManager.shutdown();
+ }
+ }
+
+ /**
+ * Create and start execution runtime
+ */
+ private void startSiddhiRuntime() {
+ if (this.siddhiRuntime == null) {
+ this.siddhiManager = this.siddhiPlan.createSiddhiManager();
+ for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
+ this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
+ }
+ this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression);
+ this.siddhiRuntime.start();
+ registerInputAndOutput(this.siddhiRuntime);
+ LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
+ } else {
+ throw new IllegalStateException("Siddhi has already been initialized");
+ }
+ }
+
+
+ private void shutdownSiddhiRuntime() {
+ if (this.siddhiRuntime != null) {
+ this.siddhiRuntime.shutdown();
+ LOGGER.info("Siddhi {} shutdown", this.siddhiRuntime.getName());
+ this.siddhiRuntime = null;
+ this.siddhiManager.shutdown();
+ this.siddhiManager = null;
+ this.inputStreamHandlers = null;
+ } else {
+ throw new IllegalStateException("Siddhi has already shutdown");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void registerInputAndOutput(SiddhiAppRuntime runtime) {
+ AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId());
+ runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output));
+ inputStreamHandlers = new HashMap<>();
+ for (String inputStreamId : this.siddhiPlan.getInputStreams()) {
+ inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId));
+ }
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ shutdownSiddhiRuntime();
+ this.siddhiRuntimeState.clear();
+ super.dispose();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ checkpointSiddhiRuntimeState();
+ checkpointRecordQueueState();
+ }
+
+ private void restoreState() throws Exception {
+ LOGGER.info("Restore siddhi state");
+ final Iterator<byte[]> siddhiState = siddhiRuntimeState.get().iterator();
+ if (siddhiState.hasNext()) {
+ this.siddhiRuntime.restore(siddhiState.next());
+ }
+
+ LOGGER.info("Restore queued records state");
+ final Iterator<byte[]> queueState = queuedRecordsState.get().iterator();
+ if (queueState.hasNext()) {
+ final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(queueState.next());
+ final DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(byteArrayInputStream);
+ try {
+ this.priorityQueue = restoreQueuerState(dataInputView);
+ } finally {
+ dataInputView.close();
+ byteArrayInputStream.close();
+ }
+ }
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+ if (siddhiRuntimeState == null) {
+ siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
+ new BytePrimitiveArraySerializer()));
+ }
+ if (queuedRecordsState == null) {
+ queuedRecordsState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
+ }
+ if (context.isRestored()) {
+ restoreState();
+ }
+ }
+
+
+ private void checkpointSiddhiRuntimeState() throws Exception {
+ this.siddhiRuntimeState.clear();
+ this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot());
+ this.queuedRecordsState.clear();
+ }
+
+ private void checkpointRecordQueueState() throws Exception {
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ final DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStream);
+ try {
+ snapshotQueueState(this.priorityQueue, dataOutputView);
+ this.queuedRecordsState.clear();
+ this.queuedRecordsState.add(byteArrayOutputStream.toByteArray());
+ } finally {
+ dataOutputView.close();
+ byteArrayOutputStream.close();
+ }
+ }
+
+ protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> queue, DataOutputView dataOutputView) throws IOException;
+
+ protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView dataInputView) throws IOException;
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
new file mode 100644
index 0000000..f760938
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SiddhiCEP Operator Context Metadata including input/output stream (streamId, TypeInformation) as well execution plan query,
+ * and execution environment context like TimeCharacteristic and ExecutionConfig.
+ */
+public class SiddhiOperatorContext implements Serializable {
+ private ExecutionConfig executionConfig;
+ private Map<String, SiddhiStreamSchema<?>> inputStreamSchemas;
+ private final Map<String, Class<?>> siddhiExtensions;
+ private String outputStreamId;
+ private TypeInformation outputStreamType;
+ private TimeCharacteristic timeCharacteristic;
+ private String name;
+ private String executionPlan;
+
+ public SiddhiOperatorContext() {
+ inputStreamSchemas = new HashMap<>();
+ siddhiExtensions = new HashMap<>();
+ }
+
+ /**
+ * @param extensions siddhi extensions to register
+ */
+ public void setExtensions(Map<String, Class<?>> extensions) {
+ Preconditions.checkNotNull(extensions,"extensions");
+ siddhiExtensions.putAll(extensions);
+ }
+
+ /**
+ * @return registered siddhi extensions
+ */
+ public Map<String, Class<?>> getExtensions() {
+ return siddhiExtensions;
+ }
+
+ /**
+ * @return Siddhi Stream Operator Name in format of "Siddhi: execution query ... (query length)"
+ */
+ public String getName() {
+ if (this.name == null) {
+ if (executionPlan.length() > 100) {
+ return String.format("Siddhi: %s ... (%s)", executionPlan.substring(0, 100), executionPlan.length() - 100);
+ } else {
+ return String.format("Siddhi: %s", executionPlan);
+ }
+ } else {
+ return this.name;
+ }
+ }
+
+ /**
+ * @return Source siddhi stream IDs
+ */
+ public List<String> getInputStreams() {
+ Object[] keys = this.inputStreamSchemas.keySet().toArray();
+ List<String> result = new ArrayList<>(keys.length);
+ for (Object key : keys) {
+ result.add((String) key);
+ }
+ return result;
+ }
+
+ /**
+ * @return Siddhi CEP cql-like execution plan
+ */
+ public String getExecutionPlan() {
+ return executionPlan;
+ }
+
+ /**
+ * Stream definition + execution expression
+ */
+ public String getFinalExecutionPlan() {
+ Preconditions.checkNotNull(executionPlan, "Execution plan is not set");
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, SiddhiStreamSchema<?>> entry : inputStreamSchemas.entrySet()) {
+ sb.append(entry.getValue().getStreamDefinitionExpression(entry.getKey()));
+ }
+ sb.append(this.getExecutionPlan());
+ return sb.toString();
+ }
+
+ /**
+ * @return Siddhi Stream Operator output type information
+ */
+ public TypeInformation getOutputStreamType() {
+ return outputStreamType;
+ }
+
+ /**
+ * @return Siddhi output streamId for callback
+ */
+ public String getOutputStreamId() {
+ return outputStreamId;
+ }
+
+ /**
+ * @param inputStreamId Siddhi streamId
+ * @return StreamSchema for given siddhi streamId
+ *
+ * @throws UndefinedStreamException throws if stream is not defined
+ */
+ @SuppressWarnings("unchecked")
+ public <IN> StreamSchema<IN> getInputStreamSchema(String inputStreamId) {
+ Preconditions.checkNotNull(inputStreamId,"inputStreamId");
+
+ if (!inputStreamSchemas.containsKey(inputStreamId)) {
+ throw new UndefinedStreamException("Input stream: " + inputStreamId + " is not found");
+ }
+ return (StreamSchema<IN>) inputStreamSchemas.get(inputStreamId);
+ }
+
+ /**
+ * @param outputStreamId Siddhi output streamId, which must exist in siddhi execution plan
+ */
+ public void setOutputStreamId(String outputStreamId) {
+ Preconditions.checkNotNull(outputStreamId,"outputStreamId");
+ this.outputStreamId = outputStreamId;
+ }
+
+ /**
+ * @param outputStreamType Output stream TypeInformation
+ */
+ public void setOutputStreamType(TypeInformation outputStreamType) {
+ Preconditions.checkNotNull(outputStreamType,"outputStreamType");
+ this.outputStreamType = outputStreamType;
+ }
+
+ /**
+ * @return Returns execution environment TimeCharacteristic
+ */
+ public TimeCharacteristic getTimeCharacteristic() {
+ return timeCharacteristic;
+ }
+
+ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+ Preconditions.checkNotNull(timeCharacteristic,"timeCharacteristic");
+ this.timeCharacteristic = timeCharacteristic;
+ }
+
+ /**
+ * @param executionPlan Siddhi SQL-Like exeuction plan query
+ */
+ public void setExecutionPlan(String executionPlan) {
+ Preconditions.checkNotNull(executionPlan,"executionPlan");
+ this.executionPlan = executionPlan;
+ }
+
+ /**
+ * @return Returns input stream ID and schema mapping
+ */
+ public Map<String, SiddhiStreamSchema<?>> getInputStreamSchemas() {
+ return inputStreamSchemas;
+ }
+
+ /**
+ * @param inputStreamSchemas input stream ID and schema mapping
+ */
+ public void setInputStreamSchemas(Map<String, SiddhiStreamSchema<?>> inputStreamSchemas) {
+ Preconditions.checkNotNull(inputStreamSchemas,"inputStreamSchemas");
+ this.inputStreamSchemas = inputStreamSchemas;
+ }
+
+ public void setName(String name) {
+ Preconditions.checkNotNull(name,"name");
+ this.name = name;
+ }
+
+ /**
+ * @return Created new SiddhiManager instance with registered siddhi extensions
+ */
+ public SiddhiManager createSiddhiManager() {
+ SiddhiManager siddhiManager = new SiddhiManager();
+ for (Map.Entry<String, Class<?>> entry : getExtensions().entrySet()) {
+ siddhiManager.setExtension(entry.getKey(), entry.getValue());
+ }
+ return siddhiManager;
+ }
+
+ /**
+ * @return StreamExecutionEnvironment ExecutionConfig
+ */
+ public ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+
+ /**
+ * @param executionConfig StreamExecutionEnvironment ExecutionConfig
+ */
+ public void setExecutionConfig(ExecutionConfig executionConfig) {
+ Preconditions.checkNotNull(executionConfig,"executionConfig");
+ this.executionConfig = executionConfig;
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
new file mode 100755
index 0000000..5c54ad8
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN>
+ */
+public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
+
+ public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) {
+ super(siddhiPlan);
+ }
+
+ @Override
+ protected StreamElementSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) {
+ TypeInformation<Tuple2<String, IN>> tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation((TypeInformation<IN>) streamSchema.getTypeInfo());
+ return new StreamElementSerializer<>(tuple2TypeInformation.createSerializer(executionConfig));
+ }
+
+ @Override
+ protected void processEvent(String streamId, StreamSchema<Tuple2<String, IN>> schema, Tuple2<String, IN> value, long timestamp) throws InterruptedException {
+ send(value.f0, getSiddhiPlan().getInputStreamSchema(value.f0).getStreamSerializer().getRow(value.f1), timestamp);
+ }
+
+ @Override
+ public String getStreamId(Tuple2<String, IN> record) {
+ return record.f0;
+ }
+
+ @Override
+ protected void snapshotQueueState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> queue, DataOutputView dataOutputView) throws IOException {
+ dataOutputView.writeInt(queue.size());
+ for (StreamRecord<Tuple2<String, IN>> record : queue) {
+ String streamId = record.getValue().f0;
+ dataOutputView.writeUTF(streamId);
+ this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView);
+ }
+ }
+
+ @Override
+ protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
+ int sizeOfQueue = dataInputView.readInt();
+ PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue);
+ for (int i = 0; i < sizeOfQueue; i++) {
+ String streamId = dataInputView.readUTF();
+ StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView);
+ priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord());
+ }
+ return priorityQueue;
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
new file mode 100755
index 0000000..7af37ce
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamInMemOutputHandler<R> extends StreamCallback {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamInMemOutputHandler.class);
+
+ private final AbstractDefinition definition;
+ private final TypeInformation<R> typeInfo;
+ private final ObjectMapper objectMapper;
+
+
+ private final LinkedList<StreamRecord<R>> collectedRecords;
+
+ public StreamInMemOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition) {
+ this.typeInfo = typeInfo;
+ this.definition = definition;
+ this.objectMapper = new ObjectMapper();
+ this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ this.collectedRecords = new LinkedList<>();
+ }
+
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+ collectedRecords.add(new StreamRecord<R>((R) toMap(event), event.getTimestamp()));
+ } else if (typeInfo.isTupleType()) {
+ Tuple tuple = this.toTuple(event);
+ collectedRecords.add(new StreamRecord<R>((R) tuple, event.getTimestamp()));
+ } else if (typeInfo instanceof PojoTypeInfo) {
+ R obj;
+ try {
+ obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+ } catch (IllegalArgumentException ex) {
+ LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+ throw ex;
+ }
+ collectedRecords.add(new StreamRecord<R>(obj, event.getTimestamp()));
+ } else {
+ throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+ }
+ }
+ }
+
+
+ @Override
+ public synchronized void stopProcessing() {
+ super.stopProcessing();
+ this.collectedRecords.clear();
+ }
+
+ private Map<String, Object> toMap(Event event) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+ map.put(definition.getAttributeNameArray()[i], event.getData(i));
+ }
+ return map;
+ }
+
+ private <T extends Tuple> T toTuple(Event event) {
+ return SiddhiTupleFactory.newTuple(event.getData());
+ }
+
+ public LinkedList<StreamRecord<R>> getCollectedRecords() {
+ return collectedRecords;
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
new file mode 100755
index 0000000..8840dac
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamOutputHandler<R> extends StreamCallback {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamOutputHandler.class);
+
+ private final AbstractDefinition definition;
+ private final Output<StreamRecord<R>> output;
+ private final TypeInformation<R> typeInfo;
+ private final ObjectMapper objectMapper;
+
+ public StreamOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition, Output<StreamRecord<R>> output) {
+ this.typeInfo = typeInfo;
+ this.definition = definition;
+ this.output = output;
+ this.objectMapper = new ObjectMapper();
+ this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Override
+ public void receive(Event[] events) {
+ StreamRecord<R> reusableRecord = new StreamRecord<>(null, 0L);
+ for (Event event : events) {
+ if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+ reusableRecord.replace(toMap(event), event.getTimestamp());
+ output.collect(reusableRecord);
+ } else if (typeInfo.isTupleType()) {
+ Tuple tuple = this.toTuple(event);
+ reusableRecord.replace(tuple, event.getTimestamp());
+ output.collect(reusableRecord);
+ } else if (typeInfo instanceof PojoTypeInfo) {
+ R obj;
+ try {
+ obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+ } catch (IllegalArgumentException ex) {
+ LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+ throw ex;
+ }
+ reusableRecord.replace(obj, event.getTimestamp());
+ output.collect(reusableRecord);
+ } else {
+ throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+ }
+ }
+ }
+
+
+ @Override
+ public synchronized void stopProcessing() {
+ super.stopProcessing();
+ }
+
+ private Map<String, Object> toMap(Event event) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+ map.put(definition.getAttributeNameArray()[i], event.getData(i));
+ }
+ return map;
+ }
+
+ private <T extends Tuple> T toTuple(Event event) {
+ return SiddhiTupleFactory.newTuple(event.getData());
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
new file mode 100644
index 0000000..049681c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Stream Record Timestamp Comparator
+ */
+public class StreamRecordComparator<IN> implements Comparator<StreamRecord<IN>>, Serializable {
+ private static final long serialVersionUID = 1581054988433915305L;
+
+ @Override
+ public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+ if (o1.getTimestamp() < o2.getTimestamp()) {
+ return -1;
+ } else if (o1.getTimestamp() > o2.getTimestamp()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
new file mode 100644
index 0000000..6b1ceae
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h1> Features </h1>
+ * <ul>
+ * <li>
+ * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
+ * <ul>
+ * <li>Filter</li>
+ * <li>Join</li>
+ * <li>Aggregation</li>
+ * <li>Group by</li>
+ * <li>Having</li>
+ * <li>Window</li>
+ * <li>Conditions and Expressions</li>
+ * <li>Pattern processing</li>
+ * <li>Sequence processing</li>
+ * <li>Event Tables</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
+ * <ul>
+ * <li>Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.</li>
+ * <li>Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan</li>
+ * <li>Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ * <p/>
+ * <h1>Example</h1>
+ * <pre>
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ *
+ * cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
+ *
+ * cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
+ * cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
+ *
+ * DataStream<Tuple4<Integer,String,Integer,String>> output = cep
+ * .from("inputStream1").union("inputStream2")
+ * .cql(
+ * "from every s1 = inputStream1[id == 2] "
+ * + " -> s2 = inputStream2[id == 3] "
+ * + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+ * + "insert into outputStream"
+ * )
+ * .returns("outputStream");
+ *
+ * env.execute();
+ * </pre>
+ *
+ * @see <a href="https://github.com/wso2/siddhi">https://github.com/wso2/siddhi</a>
+ */
+package org.apache.flink.streaming.siddhi;
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
new file mode 100644
index 0000000..2a3a04c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Siddhi specific Stream Schema.
+ *
+ * @param <T> Siddhi stream element type
+ */
+public class SiddhiStreamSchema<T> extends StreamSchema<T> {
+ private static final String DEFINE_STREAM_TEMPLATE = "define stream %s (%s);";
+
+ public SiddhiStreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
+ super(typeInfo, fieldNames);
+ }
+
+ public SiddhiStreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+ super(typeInfo, fieldIndexes, fieldNames);
+ }
+
+ public StreamDefinition getStreamDefinition(String streamId) {
+ StreamDefinition streamDefinition = StreamDefinition.id(streamId);
+ for (int i = 0; i < getFieldNames().length; i++) {
+ streamDefinition.attribute(getFieldNames()[i], SiddhiTypeFactory.getAttributeType(getFieldTypes()[i]));
+ }
+ return streamDefinition;
+ }
+
+ public String getStreamDefinitionExpression(StreamDefinition streamDefinition) {
+ List<String> columns = new ArrayList<>();
+ Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+ for (Attribute attribute : streamDefinition.getAttributeList()) {
+ columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+ }
+ return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+ }
+
+ public String getStreamDefinitionExpression(String streamId) {
+ StreamDefinition streamDefinition = getStreamDefinition(streamId);
+ List<String> columns = new ArrayList<>();
+ Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+ for (Attribute attribute : streamDefinition.getAttributeList()) {
+ columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+ }
+ return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java
new file mode 100644
index 0000000..c851631
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Generic Field-based Stream Schema
+ *
+ * @param <T> Stream element type
+ */
+public class StreamSchema<T> implements Serializable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamSchema.class);
+ private final TypeInformation<T> typeInfo;
+ private final int[] fieldIndexes;
+ private final String[] fieldNames;
+ private TypeInformation[] fieldTypes;
+ private final StreamSerializer<T> streamSerializer;
+ private TypeSerializer<T> typeSerializer;
+
+ public StreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
+ Preconditions.checkNotNull(fieldNames, "Field name is required");
+ this.typeInfo = typeInfo;
+ this.fieldNames = fieldNames;
+ this.fieldIndexes = getFieldIndexes(typeInfo, fieldNames);
+ this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames);
+ this.streamSerializer = new StreamSerializer<>(this);
+ }
+
+ public StreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+ this.typeInfo = typeInfo;
+ this.fieldIndexes = fieldIndexes;
+ this.fieldNames = fieldNames;
+ this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames);
+ this.streamSerializer = new StreamSerializer<>(this);
+ }
+
+ public boolean isAtomicType() {
+ return typeInfo instanceof AtomicType;
+ }
+
+ public boolean isTupleType() {
+ return typeInfo instanceof TupleTypeInfo;
+ }
+
+ public boolean isPojoType() {
+ return typeInfo instanceof PojoTypeInfo;
+ }
+
+ public boolean isCaseClassType() {
+ return typeInfo instanceof CaseClassTypeInfo;
+ }
+
+ public boolean isCompositeType() {
+ return typeInfo instanceof CompositeType;
+ }
+
+ private <E> int[] getFieldIndexes(TypeInformation<E> typeInfo, String... fieldNames) {
+ int[] result;
+ if (isAtomicType()) {
+ result = new int[]{0};
+ } else if (isTupleType()) {
+ result = new int[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ result[i] = i;
+ }
+ } else if (isPojoType()) {
+ result = new int[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ int index = ((PojoTypeInfo) typeInfo).getFieldIndex(fieldNames[i]);
+ if (index < 0) {
+ throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo);
+ }
+ result[i] = index;
+ }
+ } else if (isCaseClassType()) {
+ result = new int[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ int index = ((CaseClassTypeInfo) typeInfo).getFieldIndex(fieldNames[i]);
+ if (index < 0) {
+ throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo);
+ }
+ result[i] = index;
+ }
+ } else {
+ throw new IllegalArgumentException("Failed to get field index from " + typeInfo);
+ }
+ return result;
+ }
+
+
+ private <E> TypeInformation[] getFieldTypes(TypeInformation<E> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+ TypeInformation[] fieldTypes;
+ if (isCompositeType()) {
+ CompositeType cType = (CompositeType) typeInfo;
+ if (fieldNames.length != cType.getArity()) {
+ // throw new IllegalArgumentException("Arity of type (" + cType.getFieldNames().length+ ") " +
+ // "not equal to number of field names " + fieldNames.length + ".");
+ LOGGER.warn("Arity of type (" + cType.getFieldNames().length + ") " +
+ "not equal to number of field names " + fieldNames.length + ".");
+ }
+ fieldTypes = new TypeInformation[fieldIndexes.length];
+ for (int i = 0; i < fieldIndexes.length; i++) {
+ fieldTypes[i] = cType.getTypeAt(fieldIndexes[i]);
+ }
+ } else if (isAtomicType()) {
+ if (fieldIndexes.length != 1 || fieldIndexes[0] != 0) {
+ throw new IllegalArgumentException(
+ "Non-composite input type may have only a single field and its index must be 0.");
+ }
+ fieldTypes = new TypeInformation[]{typeInfo};
+ } else {
+ throw new IllegalArgumentException(
+ "Illegal input type info"
+ );
+ }
+ return fieldTypes;
+ }
+
+ public TypeInformation<T> getTypeInfo() {
+ return typeInfo;
+ }
+
+ public int[] getFieldIndexes() {
+ return fieldIndexes;
+ }
+
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ public TypeInformation[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ public StreamSerializer<T> getStreamSerializer() {
+ return streamSerializer;
+ }
+
+ public TypeSerializer<T> getTypeSerializer() {
+ return typeSerializer;
+ }
+
+ public void setTypeSerializer(TypeSerializer<T> typeSerializer) {
+ this.typeSerializer = typeSerializer;
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java
new file mode 100644
index 0000000..760afbe
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+
+/**
+ * Stream Serialization and Field Extraction Methods.
+ */
+public class StreamSerializer<T> implements Serializable {
+ private final StreamSchema<T> schema;
+
+ public StreamSerializer(StreamSchema<T> schema) {
+ this.schema = schema;
+ }
+
+ public Object[] getRow(T input) {
+ Preconditions.checkArgument(input.getClass() == schema.getTypeInfo().getTypeClass(),
+ "Invalid input type: " + input + ", expected: " + schema.getTypeInfo());
+
+ Object[] data;
+ if (schema.isAtomicType()) {
+ data = new Object[]{input};
+ } else if (schema.isTupleType()) {
+ Tuple tuple = (Tuple) input;
+ data = new Object[schema.getFieldIndexes().length];
+ for (int i = 0; i < schema.getFieldIndexes().length; i++) {
+ data[i] = tuple.getField(schema.getFieldIndexes()[i]);
+ }
+ } else if (schema.isPojoType() || schema.isCaseClassType()) {
+ data = new Object[schema.getFieldIndexes().length];
+ for (int i = 0; i < schema.getFieldNames().length; i++) {
+ data[i] = getFieldValue(schema.getFieldNames()[i], input);
+ }
+ } else {
+ throw new IllegalArgumentException("Failed to get field values from " + schema.getTypeInfo());
+ }
+ return data;
+ }
+
+ private Object getFieldValue(String fieldName, T input) {
+ // TODO: Cache Field Accessor
+ Field field = TypeExtractor.getDeclaredField(schema.getTypeInfo().getTypeClass(), fieldName);
+ if (field == null) {
+ throw new IllegalArgumentException(fieldName + " is not found in " + schema.getTypeInfo());
+ }
+ if (!field.isAccessible()) {
+ field.setAccessible(true);
+ }
+ try {
+ return field.get(input);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
new file mode 100644
index 0000000..20ca535
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Convert SiddhiCEPExecutionPlan to SiddhiCEP Operator and build output DataStream
+ */
+public class SiddhiStreamFactory {
+ @SuppressWarnings("unchecked")
+ public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream) {
+ return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context));
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
new file mode 100644
index 0000000..88c15eb
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Siddhi Tuple Utility methods
+ */
+public class SiddhiTupleFactory {
+ /**
+ * Convert object array to type of Tuple{N} where N is between 0 to 25.
+ *
+ * @throws IllegalArgumentException if rows's length > 25
+ */
+ public static <T extends Tuple> T newTuple(Object[] row) {
+ Preconditions.checkNotNull(row, "Tuple row is null");
+ switch (row.length) {
+ case 0:
+ return setTupleValue(new Tuple0(), row);
+ case 1:
+ return setTupleValue(new Tuple1(), row);
+ case 2:
+ return setTupleValue(new Tuple2(), row);
+ case 3:
+ return setTupleValue(new Tuple3(), row);
+ case 4:
+ return setTupleValue(new Tuple4(), row);
+ case 5:
+ return setTupleValue(new Tuple5(), row);
+ case 6:
+ return setTupleValue(new Tuple6(), row);
+ case 7:
+ return setTupleValue(new Tuple7(), row);
+ case 8:
+ return setTupleValue(new Tuple8(), row);
+ case 9:
+ return setTupleValue(new Tuple9(), row);
+ case 10:
+ return setTupleValue(new Tuple10(), row);
+ case 11:
+ return setTupleValue(new Tuple11(), row);
+ case 12:
+ return setTupleValue(new Tuple12(), row);
+ case 13:
+ return setTupleValue(new Tuple13(), row);
+ case 14:
+ return setTupleValue(new Tuple14(), row);
+ case 15:
+ return setTupleValue(new Tuple15(), row);
+ case 16:
+ return setTupleValue(new Tuple16(), row);
+ case 17:
+ return setTupleValue(new Tuple17(), row);
+ case 18:
+ return setTupleValue(new Tuple18(), row);
+ case 19:
+ return setTupleValue(new Tuple19(), row);
+ case 20:
+ return setTupleValue(new Tuple20(), row);
+ case 21:
+ return setTupleValue(new Tuple21(), row);
+ case 22:
+ return setTupleValue(new Tuple22(), row);
+ case 23:
+ return setTupleValue(new Tuple23(), row);
+ case 24:
+ return setTupleValue(new Tuple24(), row);
+ case 25:
+ return setTupleValue(new Tuple25(), row);
+ default:
+ throw new IllegalArgumentException("Too long row: " + row.length + ", unable to convert to Tuple");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Tuple> T setTupleValue(Tuple tuple, Object[] row) {
+ if (row.length != tuple.getArity()) {
+ throw new IllegalArgumentException("Row length" + row.length + " is not equal with tuple's arity: " + tuple.getArity());
+ }
+ for (int i = 0; i < row.length; i++) {
+ tuple.setField(row[i], i);
+ }
+ return (T) tuple;
+ }
+}
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
new file mode 100644
index 0000000..22405c9
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi Type Utils for conversion between Java Type, Siddhi Field Type, Stream Definition, and Flink Type Information.
+ */
+public class SiddhiTypeFactory {
+ private static final Map<Class<?>, Attribute.Type> JAVA_TO_SIDDHI_TYPE = new HashMap<>();
+ private static final Map<Attribute.Type, Class<?>> SIDDHI_TO_JAVA_TYPE = new HashMap<>();
+
+ static {
+ registerType(String.class, Attribute.Type.STRING);
+ registerType(Integer.class, Attribute.Type.INT);
+ registerType(int.class, Attribute.Type.INT);
+ registerType(Long.class, Attribute.Type.LONG);
+ registerType(long.class, Attribute.Type.LONG);
+ registerType(Float.class, Attribute.Type.FLOAT);
+ registerType(float.class, Attribute.Type.FLOAT);
+ registerType(Double.class, Attribute.Type.DOUBLE);
+ registerType(double.class, Attribute.Type.DOUBLE);
+ registerType(Boolean.class, Attribute.Type.BOOL);
+ registerType(boolean.class, Attribute.Type.BOOL);
+ }
+
+ public static void registerType(Class<?> javaType, Attribute.Type siddhiType) {
+ if (JAVA_TO_SIDDHI_TYPE.containsKey(javaType)) {
+ throw new IllegalArgumentException("Java type: " + javaType + " or siddhi type: " + siddhiType + " were already registered");
+ }
+ JAVA_TO_SIDDHI_TYPE.put(javaType, siddhiType);
+ SIDDHI_TO_JAVA_TYPE.put(siddhiType, javaType);
+ }
+
+ public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) {
+ SiddhiManager siddhiManager = null;
+ SiddhiAppRuntime runtime = null;
+ try {
+ siddhiManager = new SiddhiManager();
+ runtime = siddhiManager.createSiddhiAppRuntime(executionPlan);
+ Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap();
+ if (definitionMap.containsKey(streamId)) {
+ return definitionMap.get(streamId);
+ } else {
+ throw new IllegalArgumentException("Unknown stream id" + streamId);
+ }
+ } finally {
+ if (runtime != null) {
+ runtime.shutdown();
+ }
+ if (siddhiManager != null) {
+ siddhiManager.shutdown();
+ }
+ }
+ }
+
+ public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(AbstractDefinition definition) {
+ int tupleSize = definition.getAttributeList().size();
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("Tuple").append(tupleSize);
+ stringBuilder.append("<");
+ List<String> attributeTypes = new ArrayList<>();
+ for (Attribute attribute : definition.getAttributeList()) {
+ attributeTypes.add(getJavaType(attribute.getType()).getName());
+ }
+ stringBuilder.append(StringUtils.join(attributeTypes, ","));
+ stringBuilder.append(">");
+ try {
+ return TypeInfoParser.parse(stringBuilder.toString());
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalArgumentException("Unable to parse " + stringBuilder.toString(), ex);
+ }
+ }
+
+ public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(String executionPlan, String streamId) {
+ return getTupleTypeInformation(getStreamDefinition(executionPlan, streamId));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final TypeInformation<?> MAP_TYPE_INFORMATION = TypeExtractor.createTypeInfo(new HashMap<String, Object>().getClass());
+
+ public static TypeInformation<Map<String, Object>> getMapTypeInformation() {
+ return (TypeInformation<Map<String, Object>>) MAP_TYPE_INFORMATION;
+ }
+
+ public static <F> Attribute.Type getAttributeType(TypeInformation<F> fieldType) {
+ if (JAVA_TO_SIDDHI_TYPE.containsKey(fieldType.getTypeClass())) {
+ return JAVA_TO_SIDDHI_TYPE.get(fieldType.getTypeClass());
+ } else {
+ return Attribute.Type.OBJECT
+ ;
+ }
+ }
+
+ public static Class<?> getJavaType(Attribute.Type attributeType) {
+ if (!SIDDHI_TO_JAVA_TYPE.containsKey(attributeType)) {
+ throw new IllegalArgumentException("Unable to get java type for siddhi attribute type: " + attributeType);
+ }
+ return SIDDHI_TO_JAVA_TYPE.get(attributeType);
+ }
+
+ public static <T> TypeInformation<Tuple2<String, T>> getStreamTupleTypeInformation(TypeInformation<T> typeInformation) {
+ return TypeInfoParser.parse("Tuple2<String," + typeInformation.getTypeClass().getName() + ">");
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
new file mode 100755
index 0000000..5c16c71
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Flink-siddhi library integration test cases
+ */
+public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {
+
+ @Rule
+ public transient TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testSimpleWriteAndRead() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ String path = tempFolder.newFile().toURI().toString();
+ input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
+ @Override
+ public Event map(Event event) throws Exception {
+ return event;
+ }
+ })).writeAsText(path);
+ env.execute();
+ Assert.assertEquals(6, getLineCount(path));
+ }
+
+ @Test
+ public void testSimplePojoStreamAndReturnPojo() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ DataStream<Event> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price")
+ .cql("from inputStream insert into outputStream")
+ .returns("outputStream", Event.class);
+ String path = tempFolder.newFile().toURI().toString();
+ output.print();
+ env.execute();
+ }
+
+ @Test
+ public void testUnboundedPojoSourceAndReturnTuple() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+ DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price", "timestamp")
+ .cql("from inputStream select timestamp, id, name, price insert into outputStream")
+ .returns("outputStream");
+
+ DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
+ @Override
+ public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception {
+ return value.f1;
+ }
+ });
+ String resultPath = tempFolder.newFile().toURI().toString();
+ following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+ @Test
+ public void testUnboundedTupleSourceAndReturnTuple() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Tuple4<Integer, String, Double, Long>> input = env
+ .addSource(new RandomTupleSource(5).closeDelay(1500)).keyBy(1);
+
+ DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price", "timestamp")
+ .cql("from inputStream select timestamp, id, name, price insert into outputStream")
+ .returns("outputStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+ @Test
+ public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<String> input = env.addSource(new RandomWordSource(5).closeDelay(1500));
+
+ DataStream<Tuple1<String>> output = SiddhiCEP
+ .define("wordStream", input, "words")
+ .cql("from wordStream select words insert into outputStream")
+ .returns("outputStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+ @Test(expected = InvalidTypesException.class)
+ public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.addSource(new RandomEventSource(5).closeDelay(1500));
+
+ DataStream<Tuple5<Long, Integer, String, Double, Long>> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price", "timestamp")
+ .cql("from inputStream select timestamp, id, name, price insert into outputStream")
+ .returns("outputStream");
+
+ DataStream<Long> following = output.map(new MapFunction<Tuple5<Long, Integer, String, Double, Long>, Long>() {
+ @Override
+ public Long map(Tuple5<Long, Integer, String, Double, Long> value) throws Exception {
+ return value.f0;
+ }
+ });
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ env.execute();
+ }
+
+ @Test
+ public void testUnboundedPojoStreamAndReturnMap() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+ DataStream<Map<String, Object>> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price", "timestamp")
+ .cql("from inputStream select timestamp, id, name, price insert into outputStream")
+ .returnAsMap("outputStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+ @Test
+ public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.addSource(new RandomEventSource(5));
+ input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
+ @Override
+ public long extractAscendingTimestamp(Event element) {
+ return element.getTimestamp();
+ }
+ });
+
+ DataStream<Event> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price", "timestamp")
+ .cql("from inputStream select timestamp, id, name, price insert into outputStream")
+ .returns("outputStream", Event.class);
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+
+ @Test
+ public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input1 = env.addSource(new RandomEventSource(2), "input1");
+ DataStream<Event> input2 = env.addSource(new RandomEventSource(2), "input2");
+ DataStream<Event> input3 = env.addSource(new RandomEventSource(2), "input2");
+ DataStream<Event> output = SiddhiCEP
+ .define("inputStream1", input1, "id", "name", "price", "timestamp")
+ .union("inputStream2", input2, "id", "name", "price", "timestamp")
+ .union("inputStream3", input3, "id", "name", "price", "timestamp")
+ .cql(
+ "from inputStream1 select timestamp, id, name, price insert into outputStream;"
+ + "from inputStream2 select timestamp, id, name, price insert into outputStream;"
+ + "from inputStream3 select timestamp, id, name, price insert into outputStream;"
+ )
+ .returns("outputStream", Event.class);
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(6, getLineCount(resultPath));
+ }
+
+ /**
+ * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Joins</a>
+ */
+ @Test
+ public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+ DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
+
+ DataStream<? extends Map> output = SiddhiCEP
+ .define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp")
+ .union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp")
+ .cql(
+ "from inputStream1#window.length(5) as s1 "
+ + "join inputStream2#window.time(500) as s2 "
+ + "on s1.id == s2.id "
+ + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+ + "insert into JoinStream;"
+ )
+ .returnAsMap("JoinStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+ /**
+ * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Patterns</a>
+ */
+ @Test
+ public void testUnboundedPojoStreamSimplePatternMatch() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
+ DataStream<Event> input2 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input2");
+
+ DataStream<Map<String, Object>> output = SiddhiCEP
+ .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
+ .union("inputStream2", input2.keyBy("name"), "id", "name", "price", "timestamp")
+ .cql(
+ "from every s1 = inputStream1[id == 2] "
+ + " -> s2 = inputStream2[id == 3] "
+ + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+ + "insert into outputStream"
+ )
+ .returnAsMap("outputStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(1, getLineCount(resultPath));
+ compareResultsByLinesInMemory("{id_1=2, name_1=test_event, id_2=3, name_2=test_event}", resultPath);
+ }
+
+ /**
+ * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Sequences</a>
+ */
+ @Test
+ public void testUnboundedPojoStreamSimpleSequences() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
+ DataStream<Map<String, Object>> output = SiddhiCEP
+ .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
+ .union("inputStream2", input1.keyBy("name"), "id", "name", "price", "timestamp")
+ .cql(
+ "from every s1 = inputStream1[id == 2]+ , "
+ + "s2 = inputStream2[id == 3]? "
+ + "within 1000 second "
+ + "select s1[0].name as n1, s2.name as n2 "
+ + "insert into outputStream"
+ )
+ .returnAsMap("outputStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(1, getLineCount(resultPath));
+ }
+
+ private static int getLineCount(String resPath) throws IOException {
+ List<String> result = new LinkedList<>();
+ readAllResultLines(result, resPath);
+ return result.size();
+ }
+
+ @Test
+ public void testCustomizeSiddhiFunctionExtension() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+ SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
+
+ DataStream<Map<String, Object>> output = cep
+ .from("inputStream", input, "id", "name", "price", "timestamp")
+ .cql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into outputStream")
+ .returnAsMap("outputStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+ @Test
+ public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+ DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
+
+ SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
+
+ cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
+ cep.registerStream("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp");
+
+ DataStream<Tuple4<Long, String, Double, Double>> output = cep
+ .from("inputStream1").union("inputStream2")
+ .cql(
+ "from inputStream1#window.length(5) as s1 "
+ + "join inputStream2#window.time(500) as s2 "
+ + "on s1.id == s2.id "
+ + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+ + "insert into JoinStream;"
+ )
+ .returns("JoinStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ assertEquals(5, getLineCount(resultPath));
+ }
+
+ @Test(expected = UndefinedStreamException.class)
+ public void testTriggerUndefinedStreamException() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+
+ SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
+
+ DataStream<Map<String, Object>> output = cep
+ .from("inputStream1").union("inputStream2")
+ .cql(
+ "from inputStream1#window.length(5) as s1 "
+ + "join inputStream2#window.time(500) as s2 "
+ + "on s1.id == s2.id "
+ + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+ + "insert into JoinStream;"
+ )
+ .returnAsMap("JoinStream");
+
+ String resultPath = tempFolder.newFile().toURI().toString();
+ output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ env.execute();
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
new file mode 100644
index 0000000..582f1cd
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.extension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.wso2.siddhi.core.config.SiddhiAppContext;
+import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.core.util.config.ConfigReader;
+import org.wso2.siddhi.query.api.definition.Attribute;
+
+public class CustomPlusFunctionExtension extends FunctionExecutor {
+ private Attribute.Type returnType;
+
+ /**
+ * The initialization method for FunctionExecutor, this method will be called before the other methods
+ */
+ @Override
+ protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
+ for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) {
+ Attribute.Type attributeType = expressionExecutor.getReturnType();
+ if (attributeType == Attribute.Type.DOUBLE) {
+ returnType = attributeType;
+
+ } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) {
+ throw new SiddhiAppCreationException("Plus cannot have parameters with types String or Bool");
+ } else {
+ returnType = Attribute.Type.LONG;
+ }
+ }
+ }
+
+ /**
+ * The main execution method which will be called upon event arrival
+ * when there are more then one function parameter
+ *
+ * @param data the runtime values of function parameters
+ * @return the function result
+ */
+ @Override
+ protected Object execute(Object[] data) {
+ if (returnType == Attribute.Type.DOUBLE) {
+ double total = 0;
+ for (Object aObj : data) {
+ total += Double.parseDouble(String.valueOf(aObj));
+ }
+
+ return total;
+ } else {
+ long total = 0;
+ for (Object aObj : data) {
+ total += Long.parseLong(String.valueOf(aObj));
+ }
+ return total;
+ }
+ }
+
+ /**
+ * The main execution method which will be called upon event arrival
+ * when there are zero or one function parameter
+ *
+ * @param data null if the function parameter count is zero or
+ * runtime data value of the function parameter
+ * @return the function result
+ */
+ @Override
+ protected Object execute(Object data) {
+ if (returnType == Attribute.Type.DOUBLE) {
+ return Double.parseDouble(String.valueOf(data));
+ } else {
+ return Long.parseLong(String.valueOf(data));
+ }
+ }
+
+ @Override
+ public Attribute.Type getReturnType() {
+ return returnType;
+ }
+
+ @Override
+ public Map<String, Object> currentState() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public void restoreState(Map<String, Object> map) {
+
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
new file mode 100644
index 0000000..d271c89
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SiddhiSyntaxTest {
+
+ private SiddhiManager siddhiManager;
+
+ @Before
+ public void setUp() {
+ siddhiManager = new SiddhiManager();
+ }
+
+ @After
+ public void after() {
+ siddhiManager = new SiddhiManager();
+ }
+
+ @Test
+ public void testSimplePlan() throws InterruptedException {
+ SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
+ "define stream inStream (name string, value double);"
+ + "from inStream insert into outStream");
+ runtime.start();
+
+ final List<Object[]> received = new ArrayList<>(3);
+ InputHandler inputHandler = runtime.getInputHandler("inStream");
+ Assert.assertNotNull(inputHandler);
+
+ try {
+ runtime.getInputHandler("unknownStream");
+ Assert.fail("Should throw exception for getting input handler for unknown streamId.");
+ } catch (Exception ex) {
+ // Expected exception for getting input handler for illegal streamId.
+ }
+
+ runtime.addCallback("outStream", new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ received.add(event.getData());
+ }
+ }
+ });
+
+ inputHandler.send(new Object[]{"a", 1.1});
+ inputHandler.send(new Object[]{"b", 1.2});
+ inputHandler.send(new Object[]{"c", 1.3});
+ Thread.sleep(100);
+ Assert.assertEquals(3, received.size());
+ Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1});
+ Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2});
+ Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3});
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
new file mode 100644
index 0000000..db05e9d
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Test;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import static org.junit.Assert.*;
+
+public class SiddhiExecutionPlanSchemaTest {
+ @Test
+ public void testStreamSchemaWithPojo() {
+ TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+ assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+
+ SiddhiStreamSchema<Event> schema = new SiddhiStreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+ assertEquals(4, schema.getFieldIndexes().length);
+
+ StreamDefinition streamDefinition = schema.getStreamDefinition("test_stream");
+ assertArrayEquals(new String[]{"id", "timestamp", "name", "price"}, streamDefinition.getAttributeNameArray());
+
+ assertEquals(Attribute.Type.INT, streamDefinition.getAttributeType("id"));
+ assertEquals(Attribute.Type.LONG, streamDefinition.getAttributeType("timestamp"));
+ assertEquals(Attribute.Type.STRING, streamDefinition.getAttributeType("name"));
+ assertEquals(Attribute.Type.DOUBLE, streamDefinition.getAttributeType("price"));
+
+ assertEquals("define stream test_stream (id int,timestamp long,name string,price double);", schema.getStreamDefinitionExpression("test_stream"));
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
new file mode 100644
index 0000000..f876b2b
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StreamSchemaTest {
+ @Test
+ public void testStreamSchemaWithPojo() {
+ TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+ assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+ StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+ assertEquals(4, schema.getFieldIndexes().length);
+ assertEquals(Event.class, schema.getTypeInfo().getTypeClass());
+ }
+
+ @Test
+ public void testStreamSchemaWithTuple() {
+ TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+ StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+ assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+ assertEquals(4, schema.getFieldIndexes().length);
+ assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+ }
+
+ @Test
+ public void testStreamSchemaWithPrimitive() {
+ TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+ StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
+ assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+ assertEquals(1, schema.getFieldIndexes().length);
+ assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testStreamSchemaWithPojoAndUnknownField() {
+ TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+ new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price", "unknown");
+ }
+
+ @Test
+ public void testStreamTupleSerializerWithPojo() {
+ TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+ assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+ StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+ assertEquals(Event.class, schema.getTypeInfo().getTypeClass());
+
+ TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+ assertEquals("Java Tuple2<String, GenericType<" + Event.class.getName() + ">>", tuple2TypeInformation.toString());
+ }
+
+ @Test
+ public void testStreamTupleSerializerWithTuple() {
+ TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+ StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+ assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+ TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+ assertEquals("Java Tuple2<String, GenericType<" + Tuple4.class.getName() + ">>", tuple2TypeInformation.toString());
+ }
+
+ @Test
+ public void testStreamTupleSerializerWithPrimitive() {
+ TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+ StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
+ assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+ TypeInformation<Tuple2<String, String>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+ assertEquals("Java Tuple2<String, String>", tuple2TypeInformation.toString());
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java
new file mode 100644
index 0000000..190208c
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamSerializerTest {
+ private static final long CURRENT = System.currentTimeMillis();
+
+ @Test
+ public void testSimplePojoRead() {
+ Event event = new Event();
+ event.setId(1);
+ event.setName("test");
+ event.setPrice(56.7);
+ event.setTimestamp(CURRENT);
+
+ StreamSchema<Event> schema = new StreamSchema<>(TypeExtractor.createTypeInfo(Event.class), "id", "name", "price", "timestamp");
+ StreamSerializer<Event> reader = new StreamSerializer<>(schema);
+ Assert.assertArrayEquals(new Object[]{1, "test", 56.7, CURRENT}, reader.getRow(event));
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
new file mode 100644
index 0000000..357e1d2
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.Objects;
+
+public class Event {
+ private long timestamp;
+ private String name;
+ private double price;
+ private int id;
+
+ public double getPrice() {
+ return price;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return "Event(" + id + ", " + name + ", " + price + ", " + timestamp + ")";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Event) {
+ Event other = (Event) obj;
+
+ return name.equals(other.name) && price == other.price && id == other.id && timestamp == other.timestamp;
+ } else {
+ return false;
+ }
+ }
+
+ public static Event of(int id, String name, double price) {
+ Event event = new Event();
+ event.setId(id);
+ event.setName(name);
+ event.setPrice(price);
+ event.setTimestamp(System.currentTimeMillis());
+ return event;
+ }
+
+ public static Event of(int id, String name, double price, long timestamp) {
+ Event event = new Event();
+ event.setId(id);
+ event.setName(name);
+ event.setPrice(price);
+ event.setTimestamp(timestamp);
+ return event;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, price, id);
+ }
+
+ public static TypeSerializer<Event> createTypeSerializer() {
+ TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+
+ return typeInformation.createSerializer(new ExecutionConfig());
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public void setPrice(double price) {
+ this.price = price;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
new file mode 100644
index 0000000..bb95fdd
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomEventSource implements SourceFunction<Event> {
+ private final int count;
+ private final Random random;
+ private final long initialTimestamp;
+
+ private volatile boolean isRunning = true;
+ private volatile int number = 0;
+ private volatile long closeDelayTimestamp = 1000;
+
+ public RandomEventSource(int count, long initialTimestamp) {
+ this.count = count;
+ this.random = new Random();
+ this.initialTimestamp = initialTimestamp;
+ }
+
+ public RandomEventSource() {
+ this(Integer.MAX_VALUE, System.currentTimeMillis());
+ }
+
+ public RandomEventSource(int count) {
+ this(count, System.currentTimeMillis());
+ }
+
+ public RandomEventSource closeDelay(long delayTimestamp) {
+ this.closeDelayTimestamp = delayTimestamp;
+ return this;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ while (isRunning) {
+ ctx.collect(Event.of(number, "test_event", random.nextDouble(), initialTimestamp + 1000 * number));
+ number++;
+ if (number >= this.count) {
+ cancel();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.isRunning = false;
+ try {
+ Thread.sleep(closeDelayTimestamp);
+ } catch (InterruptedException e) {
+ // ignored
+ }
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
new file mode 100644
index 0000000..35121f7
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomTupleSource implements SourceFunction<Tuple4<Integer, String, Double, Long>> {
+ private final int count;
+ private final Random random;
+ private final long initialTimestamp;
+
+ private volatile boolean isRunning = true;
+ private volatile int number = 0;
+ private long closeDelayTimestamp;
+
+ public RandomTupleSource(int count, long initialTimestamp) {
+ this.count = count;
+ this.random = new Random();
+ this.initialTimestamp = initialTimestamp;
+ }
+
+ public RandomTupleSource() {
+ this(Integer.MAX_VALUE, System.currentTimeMillis());
+ }
+
+ public RandomTupleSource(int count) {
+ this(count, System.currentTimeMillis());
+ }
+
+
+ public RandomTupleSource closeDelay(long delayTimestamp) {
+ this.closeDelayTimestamp = delayTimestamp;
+ return this;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple4<Integer, String, Double, Long>> ctx) throws Exception {
+ while (isRunning) {
+ ctx.collect(Tuple4.of(number, "test_tuple", random.nextDouble(), initialTimestamp + 1000 * number));
+ number++;
+ if (number >= this.count) {
+ cancel();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.isRunning = false;
+ try {
+ Thread.sleep(this.closeDelayTimestamp);
+ } catch (InterruptedException e) {
+ // ignored
+ }
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
new file mode 100644
index 0000000..19d904f
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomWordSource implements SourceFunction<String> {
+ private static final String[] WORDS = new String[] {
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."
+ };
+
+ private final int count;
+ private final Random random;
+ private final long initialTimestamp;
+
+ private volatile boolean isRunning = true;
+ private volatile int number = 0;
+ private long closeDelayTimestamp;
+
+ public RandomWordSource(int count, long initialTimestamp) {
+ this.count = count;
+ this.random = new Random();
+ this.initialTimestamp = initialTimestamp;
+ }
+
+ public RandomWordSource() {
+ this(Integer.MAX_VALUE, System.currentTimeMillis());
+ }
+
+ public RandomWordSource(int count) {
+ this(count, System.currentTimeMillis());
+ }
+
+
+ public RandomWordSource closeDelay(long delayTimestamp) {
+ this.closeDelayTimestamp = delayTimestamp;
+ return this;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ while (isRunning) {
+ ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], initialTimestamp + 1000 * number);
+ number++;
+ if (number >= this.count) {
+ cancel();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.isRunning = false;
+ try {
+ Thread.sleep(this.closeDelayTimestamp);
+ } catch (InterruptedException e) {
+ // ignored
+ }
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
new file mode 100644
index 0000000..4753a3f
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class SiddhiTupleFactoryTest {
+ @Test
+ public void testConvertObjectArrayToTuple() {
+ Object[] row = new Object[]{1, "message", 1234567L, true, new Object()};
+ Tuple5 tuple5 = SiddhiTupleFactory.newTuple(row);
+ assertEquals(5, tuple5.getArity());
+ assertArrayEquals(row, new Object[]{
+ tuple5.f0,
+ tuple5.f1,
+ tuple5.f2,
+ tuple5.f3,
+ tuple5.f4
+ });
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConvertTooLongObjectArrayToTuple() {
+ Object[] row = new Object[26];
+ SiddhiTupleFactory.newTuple(row);
+ }
+}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
new file mode 100644
index 0000000..c4a1e8c
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SiddhiTypeFactoryTest {
+ @Test
+ public void testTypeInfoParser() {
+ TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>");
+ Assert.assertNotNull(type1);
+ TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">");
+ Assert.assertNotNull(type2);
+ }
+
+ public static class InnerPojo {
+ }
+
+ @Test
+ public void testBuildTypeInformationForSiddhiStream() {
+ String query = "define stream inputStream (timestamp long, name string, value double);"
+ + "from inputStream select name, value insert into outputStream;";
+ TypeInformation<Tuple3<Long, String, Double>> inputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "inputStream");
+ TypeInformation<Tuple2<String, Double>> outputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "outputStream");
+
+ Assert.assertNotNull(inputStreamType);
+ Assert.assertNotNull(outputStreamType);
+ }
+}
diff --git a/flink-library-siddhi/src/test/resources/log4j-test.properties b/flink-library-siddhi/src/test/resources/log4j-test.properties
new file mode 100755
index 0000000..5b1e4ed
--- /dev/null
+++ b/flink-library-siddhi/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-library-siddhi/src/test/resources/logback-test.xml b/flink-library-siddhi/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..b7a5793
--- /dev/null
+++ b/flink-library-siddhi/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+ <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
diff --git a/pom.xml b/pom.xml
index 4070698..c2d36ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
<module>flink-connector-netty</module>
<module>flink-connector-akka</module>
<module>flink-connector-influxdb</module>
+ <module>flink-library-siddhi</module>
</modules>
<properties>