blob: 0fc4f817d452dd85f3384dbd98400648df031b82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.nfa;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Stack;
import static org.apache.flink.cep.nfa.MigrationUtils.deserializeComputationStates;
/**
* Non-deterministic finite automaton implementation.
*
* <p>The {@link AbstractKeyedCEPPatternOperator CEP operator}
* keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones.
* When an event gets processed, it updates the NFA's internal state machine.
*
* <p>An event that belongs to a partially matched sequence is kept in an internal
* {@link SharedBuffer buffer}, which is a memory-optimized data-structure exactly for
* this purpose. Events in the buffer are removed when all the matched sequences that
* contain them are:
* <ol>
* <li>emitted (success)</li>
* <li>discarded (patterns containing NOT)</li>
* <li>timed-out (windowed patterns)</li>
* </ol>
*
* <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
*
* @param <T> Type of the processed events
* @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
* https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
*/
public class NFA<T> {
/**
* A set of all the valid NFA states, as returned by the
* {@link NFACompiler NFACompiler}.
* These are directly derived from the user-specified pattern.
*/
private final Map<String, State<T>> states;
/**
* The length of a windowed pattern, as specified using the
* {@link org.apache.flink.cep.pattern.Pattern#within(Time)} Pattern.within(Time)}
* method.
*/
private final long windowTime;
/**
* A flag indicating if we want timed-out patterns (in case of windowed patterns)
* to be emitted ({@code true}), or silently discarded ({@code false}).
*/
private final boolean handleTimeout;
public NFA(
final Collection<State<T>> validStates,
final long windowTime,
final boolean handleTimeout) {
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
this.states = loadStates(validStates);
}
private Map<String, State<T>> loadStates(final Collection<State<T>> validStates) {
Map<String, State<T>> tmp = new HashMap<>(4);
for (State<T> state : validStates) {
tmp.put(state.getName(), state);
}
return Collections.unmodifiableMap(tmp);
}
@VisibleForTesting
public Collection<State<T>> getStates() {
return states.values();
}
public NFAState createInitialNFAState() {
Queue<ComputationState> startingStates = new LinkedList<>();
for (State<T> state : states.values()) {
if (state.isStart()) {
startingStates.add(ComputationState.createStartState(state.getName()));
}
}
return new NFAState(startingStates);
}
private State<T> getState(ComputationState state) {
return states.get(state.getCurrentStateName());
}
private boolean isStartState(ComputationState state) {
State<T> stateObject = getState(state);
if (stateObject == null) {
throw new FlinkRuntimeException("State " + state.getCurrentStateName() + " does not exist in the NFA. NFA has states "
+ states.values());
}
return stateObject.isStart();
}
private boolean isStopState(ComputationState state) {
State<T> stateObject = getState(state);
if (stateObject == null) {
throw new FlinkRuntimeException("State " + state.getCurrentStateName() + " does not exist in the NFA. NFA has states "
+ states.values());
}
return stateObject.isStop();
}
private boolean isFinalState(ComputationState state) {
State<T> stateObject = getState(state);
if (stateObject == null) {
throw new FlinkRuntimeException("State " + state.getCurrentStateName() + " does not exist in the NFA. NFA has states "
+ states.values());
}
return stateObject.isFinal();
}
/**
* Processes the next input event. If some of the computations reach a final state then the
* resulting event sequences are returned. If computations time out and timeout handling is
* activated, then the timed out event patterns are returned.
*
* <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
* with the element that resulted in the stop state.
*
* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
* @param nfaState The NFAState object that we need to affect while processing
* @param event The current event to be processed or null if only pruning shall be done
* @param timestamp The timestamp of the current event
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
* reached a final state) and the collection of timed out patterns (if timeout handling is
* activated)
* @throws Exception Thrown if the system cannot access the state.
*/
public Collection<Map<String, List<T>>> process(
final SharedBufferAccessor<T> sharedBufferAccessor,
final NFAState nfaState,
final T event,
final long timestamp) throws Exception {
return process(sharedBufferAccessor, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip());
}
/**
* Processes the next input event. If some of the computations reach a final state then the
* resulting event sequences are returned. If computations time out and timeout handling is
* activated, then the timed out event patterns are returned.
*
* <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
* with the element that resulted in the stop state.
*
* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
* @param nfaState The NFAState object that we need to affect while processing
* @param event The current event to be processed or null if only pruning shall be done
* @param timestamp The timestamp of the current event
* @param afterMatchSkipStrategy The skip strategy to use after per match
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
* reached a final state) and the collection of timed out patterns (if timeout handling is
* activated)
* @throws Exception Thrown if the system cannot access the state.
*/
public Collection<Map<String, List<T>>> process(
final SharedBufferAccessor<T> sharedBufferAccessor,
final NFAState nfaState,
final T event,
final long timestamp,
final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) {
return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy);
}
}
/**
* Prunes states assuming there will be no events with timestamp <b>lower</b> than the given one.
* It cleares the sharedBuffer and also emits all timed out partial matches.
*
* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
* @param nfaState The NFAState object that we need to affect while processing
* @param timestamp timestamp that indicates that there will be no more events with lower timestamp
* @return all timed outed partial matches
* @throws Exception Thrown if the system cannot access the state.
*/
public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime(
final SharedBufferAccessor<T> sharedBufferAccessor,
final NFAState nfaState,
final long timestamp) throws Exception {
final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
for (ComputationState computationState : nfaState.getPartialMatches()) {
if (isStateTimedOut(computationState, timestamp)) {
if (handleTimeout) {
// extract the timed out event pattern
Map<String, List<T>> timedOutPattern = sharedBufferAccessor.materializeMatch(extractCurrentMatches(
sharedBufferAccessor,
computationState));
timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
}
sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
nfaState.setStateChanged();
} else {
newPartialMatches.add(computationState);
}
}
nfaState.setNewPartialMatches(newPartialMatches);
sharedBufferAccessor.advanceTime(timestamp);
return timeoutResult;
}
private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
return !isStartState(state) && windowTime > 0L && timestamp - state.getStartTimestamp() >= windowTime;
}
private Collection<Map<String, List<T>>> doProcess(
final SharedBufferAccessor<T> sharedBufferAccessor,
final NFAState nfaState,
final EventWrapper event,
final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
// iterate over all current computations
for (ComputationState computationState : nfaState.getPartialMatches()) {
final Collection<ComputationState> newComputationStates = computeNextStates(
sharedBufferAccessor,
computationState,
event,
event.getTimestamp());
if (newComputationStates.size() != 1) {
nfaState.setStateChanged();
} else if (!newComputationStates.iterator().next().equals(computationState)) {
nfaState.setStateChanged();
}
//delay adding new computation states in case a stop state is reached and we discard the path.
final Collection<ComputationState> statesToRetain = new ArrayList<>();
//if stop state reached in this path
boolean shouldDiscardPath = false;
for (final ComputationState newComputationState : newComputationStates) {
if (isFinalState(newComputationState)) {
potentialMatches.add(newComputationState);
} else if (isStopState(newComputationState)) {
//reached stop state. release entry for the stop state
shouldDiscardPath = true;
sharedBufferAccessor.releaseNode(newComputationState.getPreviousBufferEntry());
} else {
// add new computation state; it will be processed once the next event arrives
statesToRetain.add(newComputationState);
}
}
if (shouldDiscardPath) {
// a stop state was reached in this branch. release branch which results in removing previous event from
// the buffer
for (final ComputationState state : statesToRetain) {
sharedBufferAccessor.releaseNode(state.getPreviousBufferEntry());
}
} else {
newPartialMatches.addAll(statesToRetain);
}
}
if (!potentialMatches.isEmpty()) {
nfaState.setStateChanged();
}
List<Map<String, List<T>>> result = new ArrayList<>();
if (afterMatchSkipStrategy.isSkipStrategy()) {
processMatchesAccordingToSkipStrategy(sharedBufferAccessor,
nfaState,
afterMatchSkipStrategy,
potentialMatches,
newPartialMatches,
result);
} else {
for (ComputationState match : potentialMatches) {
Map<String, List<T>> materializedMatch =
sharedBufferAccessor.materializeMatch(
sharedBufferAccessor.extractPatterns(
match.getPreviousBufferEntry(),
match.getVersion()).get(0)
);
result.add(materializedMatch);
sharedBufferAccessor.releaseNode(match.getPreviousBufferEntry());
}
}
nfaState.setNewPartialMatches(newPartialMatches);
return result;
}
private void processMatchesAccordingToSkipStrategy(
SharedBufferAccessor<T> sharedBufferAccessor,
NFAState nfaState,
AfterMatchSkipStrategy afterMatchSkipStrategy,
PriorityQueue<ComputationState> potentialMatches,
PriorityQueue<ComputationState> partialMatches,
List<Map<String, List<T>>> result) throws Exception {
nfaState.getCompletedMatches().addAll(potentialMatches);
ComputationState earliestMatch = nfaState.getCompletedMatches().peek();
if (earliestMatch != null) {
ComputationState earliestPartialMatch;
while (earliestMatch != null && ((earliestPartialMatch = partialMatches.peek()) == null ||
isEarlier(earliestMatch, earliestPartialMatch))) {
nfaState.setStateChanged();
nfaState.getCompletedMatches().poll();
List<Map<String, List<EventId>>> matchedResult =
sharedBufferAccessor.extractPatterns(earliestMatch.getPreviousBufferEntry(), earliestMatch.getVersion());
afterMatchSkipStrategy.prune(
partialMatches,
matchedResult,
sharedBufferAccessor);
afterMatchSkipStrategy.prune(
nfaState.getCompletedMatches(),
matchedResult,
sharedBufferAccessor);
result.add(sharedBufferAccessor.materializeMatch(matchedResult.get(0)));
earliestMatch = nfaState.getCompletedMatches().peek();
}
nfaState.getPartialMatches().removeIf(pm -> pm.getStartEventID() != null && !partialMatches.contains(pm));
}
}
private boolean isEarlier(ComputationState earliestMatch, ComputationState earliestPartialMatch) {
return NFAState.COMPUTATION_STATE_COMPARATOR.compare(earliestMatch, earliestPartialMatch) <= 0;
}
private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
return s1.getName().equals(s2.getName());
}
/**
* Class for storing resolved transitions. It counts at insert time the number of
* branching transitions both for IGNORE and TAKE actions.
*/
private static class OutgoingEdges<T> {
private List<StateTransition<T>> edges = new ArrayList<>();
private final State<T> currentState;
private int totalTakeBranches = 0;
private int totalIgnoreBranches = 0;
OutgoingEdges(final State<T> currentState) {
this.currentState = currentState;
}
void add(StateTransition<T> edge) {
if (!isSelfIgnore(edge)) {
if (edge.getAction() == StateTransitionAction.IGNORE) {
totalIgnoreBranches++;
} else if (edge.getAction() == StateTransitionAction.TAKE) {
totalTakeBranches++;
}
}
edges.add(edge);
}
int getTotalIgnoreBranches() {
return totalIgnoreBranches;
}
int getTotalTakeBranches() {
return totalTakeBranches;
}
List<StateTransition<T>> getEdges() {
return edges;
}
private boolean isSelfIgnore(final StateTransition<T> edge) {
return isEquivalentState(edge.getTargetState(), currentState) &&
edge.getAction() == StateTransitionAction.IGNORE;
}
}
/**
* Helper class that ensures event is registered only once throughout the life of this object and released on close
* of this object. This allows to wrap whole processing of the event with try-with-resources block.
*/
private class EventWrapper implements AutoCloseable {
private final T event;
private long timestamp;
private final SharedBufferAccessor<T> sharedBufferAccessor;
private EventId eventId;
EventWrapper(T event, long timestamp, SharedBufferAccessor<T> sharedBufferAccessor) {
this.event = event;
this.timestamp = timestamp;
this.sharedBufferAccessor = sharedBufferAccessor;
}
EventId getEventId() throws Exception {
if (eventId == null) {
this.eventId = sharedBufferAccessor.registerEvent(event, timestamp);
}
return eventId;
}
T getEvent() {
return event;
}
public long getTimestamp() {
return timestamp;
}
@Override
public void close() throws Exception {
if (eventId != null) {
sharedBufferAccessor.releaseEvent(eventId);
}
}
}
/**
* Computes the next computation states based on the given computation state, the current event,
* its timestamp and the internal state machine. The algorithm is:
*<ol>
* <li>Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}</li>
* <li>Perform transitions:
* <ol>
* <li>IGNORE (links in {@link SharedBuffer} will still point to the previous event)</li>
* <ul>
* <li>do not perform for Start State - special case</li>
* <li>if stays in the same state increase the current stage for future use with number of outgoing edges</li>
* <li>if after PROCEED increase current stage and add new stage (as we change the state)</li>
* <li>lock the entry in {@link SharedBuffer} as it is needed in the created branch</li>
* </ul>
* <li>TAKE (links in {@link SharedBuffer} will point to the current event)</li>
* <ul>
* <li>add entry to the shared buffer with version of the current computation state</li>
* <li>add stage and then increase with number of takes for the future computation states</li>
* <li>peek to the next state if it has PROCEED path to a Final State, if true create Final
* ComputationState to emit results</li>
* </ul>
* </ol>
* </li>
* <li>Handle the Start State, as it always have to remain </li>
* <li>Release the corresponding entries in {@link SharedBuffer}.</li>
*</ol>
*
* @param sharedBufferAccessor The accessor to shared buffer that we need to change
* @param computationState Current computation state
* @param event Current event which is processed
* @param timestamp Timestamp of the current event
* @return Collection of computation states which result from the current one
* @throws Exception Thrown if the system cannot access the state.
*/
private Collection<ComputationState> computeNextStates(
final SharedBufferAccessor<T> sharedBufferAccessor,
final ComputationState computationState,
final EventWrapper event,
final long timestamp) throws Exception {
final ConditionContext<T> context = new ConditionContext<>(this, sharedBufferAccessor, computationState);
final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
// Create the computing version based on the previously computed edges
// We need to defer the creation of computation states until we know how many edges start
// at this computation state so that we can assign proper version
final List<StateTransition<T>> edges = outgoingEdges.getEdges();
int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1);
int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1);
final List<ComputationState> resultingComputationStates = new ArrayList<>();
for (StateTransition<T> edge : edges) {
switch (edge.getAction()) {
case IGNORE: {
if (!isStartState(computationState)) {
final DeweyNumber version;
if (isEquivalentState(edge.getTargetState(), getState(computationState))) {
//Stay in the same state (it can be either looping one or singleton)
final int toIncrease = calculateIncreasingSelfState(
outgoingEdges.getTotalIgnoreBranches(),
outgoingEdges.getTotalTakeBranches());
version = computationState.getVersion().increase(toIncrease);
} else {
//IGNORE after PROCEED
version = computationState.getVersion()
.increase(totalTakeToSkip + ignoreBranchesToVisit)
.addStage();
ignoreBranchesToVisit--;
}
addComputationState(
sharedBufferAccessor,
resultingComputationStates,
edge.getTargetState(),
computationState.getPreviousBufferEntry(),
version,
computationState.getStartTimestamp(),
computationState.getStartEventID()
);
}
}
break;
case TAKE:
final State<T> nextState = edge.getTargetState();
final State<T> currentState = edge.getSourceState();
final NodeId previousEntry = computationState.getPreviousBufferEntry();
final DeweyNumber currentVersion = computationState.getVersion().increase(takeBranchesToVisit);
final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage();
takeBranchesToVisit--;
final NodeId newEntry = sharedBufferAccessor.put(
currentState.getName(),
event.getEventId(),
previousEntry,
currentVersion);
final long startTimestamp;
final EventId startEventId;
if (isStartState(computationState)) {
startTimestamp = timestamp;
startEventId = event.getEventId();
} else {
startTimestamp = computationState.getStartTimestamp();
startEventId = computationState.getStartEventID();
}
addComputationState(
sharedBufferAccessor,
resultingComputationStates,
nextState,
newEntry,
nextVersion,
startTimestamp,
startEventId);
//check if newly created state is optional (have a PROCEED path to Final state)
final State<T> finalState = findFinalStateAfterProceed(context, nextState, event.getEvent());
if (finalState != null) {
addComputationState(
sharedBufferAccessor,
resultingComputationStates,
finalState,
newEntry,
nextVersion,
startTimestamp,
startEventId);
}
break;
}
}
if (isStartState(computationState)) {
int totalBranches = calculateIncreasingSelfState(
outgoingEdges.getTotalIgnoreBranches(),
outgoingEdges.getTotalTakeBranches());
DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
ComputationState startState = ComputationState.createStartState(computationState.getCurrentStateName(), startVersion);
resultingComputationStates.add(startState);
}
if (computationState.getPreviousBufferEntry() != null) {
// release the shared entry referenced by the current computation state.
sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
}
return resultingComputationStates;
}
private void addComputationState(
SharedBufferAccessor<T> sharedBufferAccessor,
List<ComputationState> computationStates,
State<T> currentState,
NodeId previousEntry,
DeweyNumber version,
long startTimestamp,
EventId startEventId) throws Exception {
ComputationState computationState = ComputationState.createState(
currentState.getName(), previousEntry, version, startTimestamp, startEventId);
computationStates.add(computationState);
sharedBufferAccessor.lockNode(previousEntry);
}
private State<T> findFinalStateAfterProceed(
ConditionContext<T> context,
State<T> state,
T event) {
final Stack<State<T>> statesToCheck = new Stack<>();
statesToCheck.push(state);
try {
while (!statesToCheck.isEmpty()) {
final State<T> currentState = statesToCheck.pop();
for (StateTransition<T> transition : currentState.getStateTransitions()) {
if (transition.getAction() == StateTransitionAction.PROCEED &&
checkFilterCondition(context, transition.getCondition(), event)) {
if (transition.getTargetState().isFinal()) {
return transition.getTargetState();
} else {
statesToCheck.push(transition.getTargetState());
}
}
}
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failure happened in filter function.", e);
}
return null;
}
private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + Math.max(1, takeBranches);
}
private OutgoingEdges<T> createDecisionGraph(
ConditionContext<T> context,
ComputationState computationState,
T event) {
State<T> state = getState(computationState);
final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(state);
final Stack<State<T>> states = new Stack<>();
states.push(state);
//First create all outgoing edges, so to be able to reason about the Dewey version
while (!states.isEmpty()) {
State<T> currentState = states.pop();
Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions();
// check all state transitions for each state
for (StateTransition<T> stateTransition : stateTransitions) {
try {
if (checkFilterCondition(context, stateTransition.getCondition(), event)) {
// filter condition is true
switch (stateTransition.getAction()) {
case PROCEED:
// simply advance the computation state, but apply the current event to it
// PROCEED is equivalent to an epsilon transition
states.push(stateTransition.getTargetState());
break;
case IGNORE:
case TAKE:
outgoingEdges.add(stateTransition);
break;
}
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failure happened in filter function.", e);
}
}
}
return outgoingEdges;
}
private boolean checkFilterCondition(
ConditionContext<T> context,
IterativeCondition<T> condition,
T event) throws Exception {
return condition == null || condition.filter(event, context);
}
/**
* Extracts all the sequences of events from the start to the given computation state. An event
* sequence is returned as a map which contains the events and the names of the states to which
* the events were mapped.
*
* @param sharedBufferAccessor The accessor to {@link SharedBuffer} from which to extract the matches
* @param computationState The end computation state of the extracted event sequences
* @return Collection of event sequences which end in the given computation state
* @throws Exception Thrown if the system cannot access the state.
*/
private Map<String, List<EventId>> extractCurrentMatches(
final SharedBufferAccessor<T> sharedBufferAccessor,
final ComputationState computationState) throws Exception {
if (computationState.getPreviousBufferEntry() == null) {
return new HashMap<>();
}
List<Map<String, List<EventId>>> paths = sharedBufferAccessor.extractPatterns(
computationState.getPreviousBufferEntry(),
computationState.getVersion());
if (paths.isEmpty()) {
return new HashMap<>();
}
// for a given computation state, we cannot have more than one matching patterns.
Preconditions.checkState(paths.size() == 1);
return paths.get(0);
}
/**
* The context used when evaluating this computation state.
*/
private static class ConditionContext<T> implements IterativeCondition.Context<T> {
/** The current computation state. */
private ComputationState computationState;
/**
* The matched pattern so far. A condition will be evaluated over this
* pattern. This is evaluated <b>only once</b>, as this is an expensive
* operation that traverses a path in the {@link SharedBuffer}.
*/
private Map<String, List<T>> matchedEvents;
private NFA<T> nfa;
private SharedBufferAccessor<T> sharedBufferAccessor;
ConditionContext(
final NFA<T> nfa,
final SharedBufferAccessor<T> sharedBufferAccessor,
final ComputationState computationState) {
this.computationState = computationState;
this.nfa = nfa;
this.sharedBufferAccessor = sharedBufferAccessor;
}
@Override
public Iterable<T> getEventsForPattern(final String key) throws Exception {
Preconditions.checkNotNull(key);
// the (partially) matched pattern is computed lazily when this method is called.
// this is to avoid any overheads when using a simple, non-iterative condition.
if (matchedEvents == null) {
this.matchedEvents = sharedBufferAccessor.materializeMatch(nfa.extractCurrentMatches(sharedBufferAccessor,
computationState));
}
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
List<T> elements = matchedEvents.get(key);
return elements == null
? Collections.EMPTY_LIST.<T>iterator()
: elements.iterator();
}
};
}
}
//////////////////// DEPRECATED/MIGRATION UTILS
/**
* Wrapper for migrated state.
*/
public static class MigratedNFA<T> {
private final Queue<ComputationState> computationStates;
private final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer;
public org.apache.flink.cep.nfa.SharedBuffer<T> getSharedBuffer() {
return sharedBuffer;
}
public Queue<ComputationState> getComputationStates() {
return computationStates;
}
MigratedNFA(
final Queue<ComputationState> computationStates,
final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer) {
this.sharedBuffer = sharedBuffer;
this.computationStates = computationStates;
}
}
/**
* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
*/
@Deprecated
public static final class NFASerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
private static final int VERSION = 1;
/** This empty constructor is required for deserializing the configuration. */
public NFASerializerConfigSnapshot() {}
public NFASerializerConfigSnapshot(
TypeSerializer<T> eventSerializer,
TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
super(eventSerializer, sharedBufferSerializer);
}
@Override
public int getVersion() {
return VERSION;
}
}
/**
* Only for backward compatibility with <=1.5.
*/
@Deprecated
public static class NFASerializer<T> extends TypeSerializer<MigratedNFA<T>> {
private static final long serialVersionUID = 2098282423980597010L;
private final TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer;
private final TypeSerializer<T> eventSerializer;
public NFASerializer(TypeSerializer<T> typeSerializer) {
this(typeSerializer,
new org.apache.flink.cep.nfa.SharedBuffer.SharedBufferSerializer<>(
StringSerializer.INSTANCE,
typeSerializer));
}
NFASerializer(
TypeSerializer<T> typeSerializer,
TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
this.eventSerializer = typeSerializer;
this.sharedBufferSerializer = sharedBufferSerializer;
}
@Override
public boolean isImmutableType() {
return false;
}
@Override
public NFASerializer<T> duplicate() {
return new NFASerializer<>(eventSerializer.duplicate());
}
@Override
public MigratedNFA<T> createInstance() {
return null;
}
@Override
public MigratedNFA<T> copy(MigratedNFA<T> from) {
throw new UnsupportedOperationException();
}
@Override
public MigratedNFA<T> copy(MigratedNFA<T> from, MigratedNFA<T> reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(MigratedNFA<T> record, DataOutputView target) {
throw new UnsupportedOperationException();
}
@Override
public MigratedNFA<T> deserialize(DataInputView source) throws IOException {
MigrationUtils.skipSerializedStates(source);
source.readLong();
source.readBoolean();
org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer = sharedBufferSerializer.deserialize(source);
Queue<ComputationState> computationStates = deserializeComputationStates(sharedBuffer, eventSerializer, source);
return new MigratedNFA<>(computationStates, sharedBuffer);
}
@Override
public MigratedNFA<T> deserialize(
MigratedNFA<T> reuse,
DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object obj) {
return obj == this ||
(obj != null && obj.getClass().equals(getClass()) &&
sharedBufferSerializer.equals(((NFASerializer) obj).sharedBufferSerializer) &&
eventSerializer.equals(((NFASerializer) obj).eventSerializer));
}
@Override
public boolean canEqual(Object obj) {
return true;
}
@Override
public int hashCode() {
return 37 * sharedBufferSerializer.hashCode() + eventSerializer.hashCode();
}
@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
return new NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer);
}
@Override
public CompatibilityResult<MigratedNFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof NFASerializerConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
CompatibilityResult<T> eventCompatResult = CompatibilityUtil.resolveCompatibilityResult(
serializersAndConfigs.get(0).f0,
UnloadableDummyTypeSerializer.class,
serializersAndConfigs.get(0).f1,
eventSerializer);
CompatibilityResult<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufCompatResult =
CompatibilityUtil.resolveCompatibilityResult(
serializersAndConfigs.get(1).f0,
UnloadableDummyTypeSerializer.class,
serializersAndConfigs.get(1).f1,
sharedBufferSerializer);
if (!sharedBufCompatResult.isRequiresMigration() && !eventCompatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else {
if (eventCompatResult.getConvertDeserializer() != null &&
sharedBufCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new NFASerializer<>(
new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
}
}
}
return CompatibilityResult.requiresMigration();
}
}
}