blob: b7f1177977450914842ead0e95ed7769e37241bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
/**
* Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps.
*/
@SuppressWarnings("unchecked")
public class SameElementITCase extends TestLogger {
@Test
public void testEagerZeroOrMoreSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(42, "a", 3.0);
Event middleEvent3 = new Event(43, "a", 4.0);
Event end1 = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent2, 4));
inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
inputEvents.add(new StreamRecord<>(middleEvent3, 6));
inputEvents.add(new StreamRecord<>(middleEvent3, 6));
inputEvents.add(new StreamRecord<>(end1, 7));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1, end1),
Lists.newArrayList(startEvent, end1)
));
}
@Test
public void testClearingBuffer() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event a1 = new Event(40, "a", 1.0);
Event b1 = new Event(41, "b", 2.0);
Event c1 = new Event(41, "c", 2.0);
Event d = new Event(41, "d", 2.0);
inputEvents.add(new StreamRecord<>(a1, 1));
inputEvents.add(new StreamRecord<>(b1, 2));
inputEvents.add(new StreamRecord<>(c1, 2));
inputEvents.add(new StreamRecord<>(d, 2));
Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).followedBy("b").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
}).followedBy("c").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("d").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("d");
}
});
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, b1, c1, d)
));
assertEquals(1, nfaState.getPartialMatches().size());
assertEquals("a", nfaState.getPartialMatches().peek().getCurrentStateName());
}
@Test
public void testClearingBufferWithUntilAtTheEnd() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event a1 = new Event(40, "a", 1.0);
Event d1 = new Event(41, "d", 2.0);
Event d2 = new Event(41, "d", 2.0);
Event d3 = new Event(41, "d", 2.0);
Event d4 = new Event(41, "d", 2.0);
inputEvents.add(new StreamRecord<>(a1, 1));
inputEvents.add(new StreamRecord<>(d1, 2));
inputEvents.add(new StreamRecord<>(d2, 2));
inputEvents.add(new StreamRecord<>(d3, 2));
inputEvents.add(new StreamRecord<>(d4, 4));
Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).followedBy("d").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("d");
}
}).oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
}
});
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, d1, d2, d3),
Lists.newArrayList(a1, d1, d2),
Lists.newArrayList(a1, d1)
));
assertEquals(1, nfaState.getPartialMatches().size());
assertEquals("a", nfaState.getPartialMatches().peek().getCurrentStateName());
}
@Test
public void testZeroOrMoreSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent1a = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(42, "a", 3.0);
Event middleEvent3 = new Event(43, "a", 4.0);
Event middleEvent3a = new Event(43, "a", 4.0);
Event end1 = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
inputEvents.add(new StreamRecord<>(middleEvent2, 4));
inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
inputEvents.add(new StreamRecord<>(middleEvent3, 6));
inputEvents.add(new StreamRecord<>(middleEvent3a, 6));
inputEvents.add(new StreamRecord<>(end1, 7));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedByAny("middle").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent2, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent2, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent3, middleEvent3a, end1),
Lists.newArrayList(startEvent, middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1a, end1),
Lists.newArrayList(startEvent, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent3a, end1),
Lists.newArrayList(startEvent, end1)
));
}
@Test
public void testSimplePatternWSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event end1 = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(end1, 7));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedByAny("middle").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).followedBy("end1").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1, end1)
));
}
@Test
public void testIterativeConditionWSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent1a = new Event(41, "a", 2.0);
Event middleEvent1b = new Event(41, "a", 2.0);
final Event end = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
inputEvents.add(new StreamRecord<>(end, 7));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedByAny("middle").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().allowCombinations().followedBy("end").where(new IterativeCondition<Event>() {
private static final long serialVersionUID = -5566639743229703237L;
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
double sum = 0.0;
for (Event event: ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 4.0) == 0;
}
});
NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, end),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b, end)
));
}
@Test
public void testEndWLoopingWSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent1a = new Event(41, "a", 2.0);
Event middleEvent1b = new Event(41, "a", 2.0);
final Event end = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
inputEvents.add(new StreamRecord<>(end, 7));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedByAny("middle").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional();
NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent),
Lists.newArrayList(startEvent, middleEvent1),
Lists.newArrayList(startEvent, middleEvent1a),
Lists.newArrayList(startEvent, middleEvent1b),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a),
Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b),
Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b)
));
}
@Test
public void testRepeatingPatternWSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middle1Event1 = new Event(40, "a", 2.0);
Event middle1Event2 = new Event(40, "a", 3.0);
Event middle1Event3 = new Event(40, "a", 4.0);
Event middle2Event1 = new Event(40, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middle1Event1, 3));
inputEvents.add(new StreamRecord<>(middle1Event1, 3));
inputEvents.add(new StreamRecord<>(middle1Event2, 3));
inputEvents.add(new StreamRecord<>(new Event(40, "d", 6.0), 5));
inputEvents.add(new StreamRecord<>(middle2Event1, 6));
inputEvents.add(new StreamRecord<>(middle1Event3, 7));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle1").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
}).optional().followedBy("end").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
});
NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middle1Event1),
Lists.newArrayList(startEvent, middle1Event1, middle1Event1),
Lists.newArrayList(startEvent, middle2Event1, middle1Event3),
Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2),
Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle2Event1, middle1Event3),
Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
));
}
}