blob: 9b385399959879c83e261881021a83c8f057fc64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.siddhi;
import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
import org.apache.flink.streaming.siddhi.source.Event;
import org.apache.flink.streaming.siddhi.source.RandomEventSource;
import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
import org.apache.flink.streaming.siddhi.source.RandomWordSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.junit.Assert.assertEquals;
/**
* Flink-siddhi library integration test cases
*/
public class SiddhiCEPITCase extends AbstractTestBase implements Serializable {
@Rule
public transient TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testSimpleWriteAndRead() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
Event.of(1, "start", 1.0),
Event.of(2, "middle", 2.0),
Event.of(3, "end", 3.0),
Event.of(4, "start", 4.0),
Event.of(5, "middle", 5.0),
Event.of(6, "end", 6.0)
);
String path = tempFolder.newFile().toURI().toString();
input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
@Override
public Event map(Event event) throws Exception {
return event;
}
})).writeAsText(path);
env.execute();
Assert.assertEquals(6, getLineCount(path));
}
@Test
public void testSimplePojoStreamAndReturnPojo() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
Event.of(1, "start", 1.0),
Event.of(2, "middle", 2.0),
Event.of(3, "end", 3.0),
Event.of(4, "start", 4.0),
Event.of(5, "middle", 5.0),
Event.of(6, "end", 6.0)
);
DataStream<Event> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price")
.cql("from inputStream insert into outputStream")
.returns("outputStream", Event.class);
String path = tempFolder.newFile().toURI().toString();
output.print();
env.execute();
}
@Test
public void testUnboundedPojoSourceAndReturnTuple() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.addSource(new RandomEventSource(5));
DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream");
DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
@Override
public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception {
return value.f1;
}
});
String resultPath = tempFolder.newFile().toURI().toString();
following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
@Test
public void testUnboundedTupleSourceAndReturnTuple() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple4<Integer, String, Double, Long>> input = env
.addSource(new RandomTupleSource(5).closeDelay(1500)).keyBy(1);
DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
@Test
public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.addSource(new RandomWordSource(5).closeDelay(1500));
DataStream<Tuple1<String>> output = SiddhiCEP
.define("wordStream", input, "words")
.cql("from wordStream select words insert into outputStream")
.returns("outputStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
@Test(expected = InvalidTypesException.class)
public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.addSource(new RandomEventSource(5).closeDelay(1500));
DataStream<Tuple5<Long, Integer, String, Double, Long>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream");
DataStream<Long> following = output.map(new MapFunction<Tuple5<Long, Integer, String, Double, Long>, Long>() {
@Override
public Long map(Tuple5<Long, Integer, String, Double, Long> value) throws Exception {
return value.f0;
}
});
String resultPath = tempFolder.newFile().toURI().toString();
following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
env.execute();
}
@Test
public void testUnboundedPojoStreamAndReturnMap() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Event> input = env.addSource(new RandomEventSource(5));
DataStream<Map<String, Object>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returnAsMap("outputStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
@Test
public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.addSource(new RandomEventSource(5));
input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override
public long extractAscendingTimestamp(Event element) {
return element.getTimestamp();
}
});
DataStream<Event> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream", Event.class);
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
@Test
public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input1 = env.addSource(new RandomEventSource(2), "input1");
DataStream<Event> input2 = env.addSource(new RandomEventSource(2), "input2");
DataStream<Event> input3 = env.addSource(new RandomEventSource(2), "input2");
DataStream<Event> output = SiddhiCEP
.define("inputStream1", input1, "id", "name", "price", "timestamp")
.union("inputStream2", input2, "id", "name", "price", "timestamp")
.union("inputStream3", input3, "id", "name", "price", "timestamp")
.cql(
"from inputStream1 select timestamp, id, name, price insert into outputStream;"
+ "from inputStream2 select timestamp, id, name, price insert into outputStream;"
+ "from inputStream3 select timestamp, id, name, price insert into outputStream;"
)
.returns("outputStream", Event.class);
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(6, getLineCount(resultPath));
}
/**
* @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Joins</a>
*/
@Test
public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
DataStream<? extends Map<?,?>> output = SiddhiCEP
.define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp")
.union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp")
.cql(
"from inputStream1#window.length(5) as s1 "
+ "join inputStream2#window.time(500) as s2 "
+ "on s1.id == s2.id "
+ "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+ "insert into JoinStream;"
)
.returnAsMap("JoinStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
/**
* @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Patterns</a>
*/
@Test
public void testUnboundedPojoStreamSimplePatternMatch() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
DataStream<Event> input2 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input2");
DataStream<Map<String, Object>> output = SiddhiCEP
.define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
.union("inputStream2", input2.keyBy("name"), "id", "name", "price", "timestamp")
.cql(
"from every s1 = inputStream1[id == 2] "
+ " -> s2 = inputStream2[id == 3] "
+ "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+ "insert into outputStream"
)
.returnAsMap("outputStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(1, getLineCount(resultPath));
compareResultsByLinesInMemory("{id_1=2, name_1=test_event, id_2=3, name_2=test_event}", resultPath);
}
/**
* @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Sequences</a>
*/
@Test
public void testUnboundedPojoStreamSimpleSequences() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
DataStream<Map<String, Object>> output = SiddhiCEP
.define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
.union("inputStream2", input1.keyBy("name"), "id", "name", "price", "timestamp")
.cql(
"from every s1 = inputStream1[id == 2]+ , "
+ "s2 = inputStream2[id == 3]? "
+ "within 1000 second "
+ "select s1[0].name as n1, s2.name as n2 "
+ "insert into outputStream"
)
.returnAsMap("outputStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(1, getLineCount(resultPath));
}
private static int getLineCount(String resPath) throws IOException {
List<String> result = new LinkedList<>();
readAllResultLines(result, resPath);
return result.size();
}
@Test
public void testCustomizeSiddhiFunctionExtension() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.addSource(new RandomEventSource(5));
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
DataStream<Map<String, Object>> output = cep
.from("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into outputStream")
.returnAsMap("outputStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
@Test
public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
cep.registerStream("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp");
DataStream<Tuple4<Long, String, Double, Double>> output = cep
.from("inputStream1").union("inputStream2")
.cql(
"from inputStream1#window.length(5) as s1 "
+ "join inputStream2#window.time(500) as s2 "
+ "on s1.id == s2.id "
+ "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+ "insert into JoinStream;"
)
.returns("JoinStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
@Test(expected = UndefinedStreamException.class)
public void testTriggerUndefinedStreamException() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
DataStream<Map<String, Object>> output = cep
.from("inputStream1").union("inputStream2")
.cql(
"from inputStream1#window.length(5) as s1 "
+ "join inputStream2#window.time(500) as s2 "
+ "on s1.id == s2.id "
+ "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+ "insert into JoinStream;"
)
.returnAsMap("JoinStream");
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}