| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.cep.nfa.compiler; |
| |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; |
| import org.apache.flink.cep.nfa.NFA; |
| import org.apache.flink.cep.nfa.State; |
| import org.apache.flink.cep.nfa.StateTransition; |
| import org.apache.flink.cep.nfa.StateTransitionAction; |
| import org.apache.flink.cep.pattern.GroupPattern; |
| import org.apache.flink.cep.pattern.MalformedPatternException; |
| import org.apache.flink.cep.pattern.Pattern; |
| import org.apache.flink.cep.pattern.Quantifier; |
| import org.apache.flink.cep.pattern.Quantifier.Times; |
| import org.apache.flink.cep.pattern.conditions.BooleanConditions; |
| import org.apache.flink.cep.pattern.conditions.IterativeCondition; |
| import org.apache.flink.cep.pattern.conditions.RichAndCondition; |
| import org.apache.flink.cep.pattern.conditions.RichNotCondition; |
| import org.apache.flink.streaming.api.windowing.time.Time; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Stack; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a |
| * {@link NFAFactory}. |
| */ |
| public class NFACompiler { |
| |
| protected static final String ENDING_STATE_NAME = "$endState$"; |
| |
| /** |
| * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create |
| * multiple NFAs. |
| * |
| * @param pattern Definition of sequence pattern |
| * @param timeoutHandling True if the NFA shall return timed out event patterns |
| * @param <T> Type of the input events |
| * @return Factory for NFAs corresponding to the given pattern |
| */ |
| @SuppressWarnings("unchecked") |
| public static <T> NFAFactory<T> compileFactory( |
| final Pattern<T, ?> pattern, |
| boolean timeoutHandling) { |
| if (pattern == null) { |
| // return a factory for empty NFAs |
| return new NFAFactoryImpl<>(0, Collections.<State<T>>emptyList(), timeoutHandling); |
| } else { |
| final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern); |
| nfaFactoryCompiler.compileFactory(); |
| return new NFAFactoryImpl<>(nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling); |
| } |
| } |
| |
| /** |
| * Verifies if the provided pattern can possibly generate empty match. Example of patterns that can possibly |
| * generate empty matches are: A*, A?, A* B? etc. |
| * |
| * @param pattern pattern to check |
| * @return true if empty match could potentially match the pattern, false otherwise |
| */ |
| public static boolean canProduceEmptyMatches(final Pattern<?, ?> pattern) { |
| NFAFactoryCompiler<?> compiler = new NFAFactoryCompiler<>(checkNotNull(pattern)); |
| compiler.compileFactory(); |
| State<?> startState = compiler.getStates().stream().filter(State::isStart).findFirst().orElseThrow( |
| () -> new IllegalStateException("Compiler produced no start state. It is a bug. File a jira.")); |
| |
| Set<State<?>> visitedStates = new HashSet<>(); |
| final Stack<State<?>> statesToCheck = new Stack<>(); |
| statesToCheck.push(startState); |
| while (!statesToCheck.isEmpty()) { |
| final State<?> currentState = statesToCheck.pop(); |
| if (visitedStates.contains(currentState)) { |
| continue; |
| } else { |
| visitedStates.add(currentState); |
| } |
| |
| for (StateTransition<?> transition : currentState.getStateTransitions()) { |
| if (transition.getAction() == StateTransitionAction.PROCEED) { |
| if (transition.getTargetState().isFinal()) { |
| return true; |
| } else { |
| statesToCheck.push(transition.getTargetState()); |
| } |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Converts a {@link Pattern} into graph of {@link State}. It enables sharing of |
| * compilation state across methods. |
| * |
| * @param <T> |
| */ |
| static class NFAFactoryCompiler<T> { |
| |
| private final NFAStateNameHandler stateNameHandler = new NFAStateNameHandler(); |
| private final Map<String, State<T>> stopStates = new HashMap<>(); |
| private final List<State<T>> states = new ArrayList<>(); |
| |
| private long windowTime = 0; |
| private GroupPattern<T, ?> currentGroupPattern; |
| private Map<GroupPattern<T, ?>, Boolean> firstOfLoopMap = new HashMap<>(); |
| private Pattern<T, ?> currentPattern; |
| private Pattern<T, ?> followingPattern; |
| private final AfterMatchSkipStrategy afterMatchSkipStrategy; |
| private Map<String, State<T>> originalStateMap = new HashMap<>(); |
| |
| NFAFactoryCompiler(final Pattern<T, ?> pattern) { |
| this.currentPattern = pattern; |
| afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy(); |
| } |
| |
| /** |
| * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create |
| * multiple NFAs. |
| */ |
| void compileFactory() { |
| if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { |
| throw new MalformedPatternException("NotFollowedBy is not supported as a last part of a Pattern!"); |
| } |
| |
| checkPatternNameUniqueness(); |
| |
| checkPatternSkipStrategy(); |
| |
| // we're traversing the pattern from the end to the beginning --> the first state is the final state |
| State<T> sinkState = createEndingState(); |
| // add all the normal states |
| sinkState = createMiddleStates(sinkState); |
| // add the beginning state |
| createStartState(sinkState); |
| } |
| |
| AfterMatchSkipStrategy getAfterMatchSkipStrategy(){ |
| return afterMatchSkipStrategy; |
| } |
| |
| List<State<T>> getStates() { |
| return states; |
| } |
| |
| long getWindowTime() { |
| return windowTime; |
| } |
| |
| /** |
| * Check pattern after match skip strategy. |
| */ |
| private void checkPatternSkipStrategy() { |
| if (afterMatchSkipStrategy.getPatternName().isPresent()) { |
| String patternName = afterMatchSkipStrategy.getPatternName().get(); |
| Pattern<T, ?> pattern = currentPattern; |
| while (pattern.getPrevious() != null && !pattern.getName().equals(patternName)) { |
| pattern = pattern.getPrevious(); |
| } |
| |
| // pattern name match check. |
| if (!pattern.getName().equals(patternName)) { |
| throw new MalformedPatternException("The pattern name specified in AfterMatchSkipStrategy " + |
| "can not be found in the given Pattern"); |
| } |
| } |
| } |
| |
| /** |
| * Check if there are duplicate pattern names. If yes, it |
| * throws a {@link MalformedPatternException}. |
| */ |
| private void checkPatternNameUniqueness() { |
| // make sure there is no pattern with name "$endState$" |
| stateNameHandler.checkNameUniqueness(ENDING_STATE_NAME); |
| Pattern patternToCheck = currentPattern; |
| while (patternToCheck != null) { |
| checkPatternNameUniqueness(patternToCheck); |
| patternToCheck = patternToCheck.getPrevious(); |
| } |
| stateNameHandler.clear(); |
| } |
| |
| /** |
| * Check if the given pattern's name is already used or not. If yes, it |
| * throws a {@link MalformedPatternException}. |
| * |
| * @param pattern The pattern to be checked |
| */ |
| private void checkPatternNameUniqueness(final Pattern pattern) { |
| if (pattern instanceof GroupPattern) { |
| Pattern patternToCheck = ((GroupPattern) pattern).getRawPattern(); |
| while (patternToCheck != null) { |
| checkPatternNameUniqueness(patternToCheck); |
| patternToCheck = patternToCheck.getPrevious(); |
| } |
| } else { |
| stateNameHandler.checkNameUniqueness(pattern.getName()); |
| } |
| } |
| |
| /** |
| * Retrieves list of conditions resulting in Stop state and names of the corresponding NOT patterns. |
| * |
| * <p>A current not condition can be produced in two cases: |
| * <ol> |
| * <li>the previous pattern is a {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li> |
| * <li>exists a backward path of {@link Quantifier.QuantifierProperty#OPTIONAL} patterns to |
| * {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li> |
| * </ol> |
| * |
| * <p><b>WARNING:</b> for more info on the second case see: {@link NFAFactoryCompiler#copyWithoutTransitiveNots(State)} |
| * |
| * @return list of not conditions with corresponding names |
| */ |
| private List<Tuple2<IterativeCondition<T>, String>> getCurrentNotCondition() { |
| List<Tuple2<IterativeCondition<T>, String>> notConditions = new ArrayList<>(); |
| |
| Pattern<T, ? extends T> previousPattern = currentPattern; |
| while (previousPattern.getPrevious() != null && ( |
| previousPattern.getPrevious().getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL) || |
| previousPattern.getPrevious().getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW)) { |
| |
| previousPattern = previousPattern.getPrevious(); |
| |
| if (previousPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { |
| final IterativeCondition<T> notCondition = getTakeCondition(previousPattern); |
| notConditions.add(Tuple2.of(notCondition, previousPattern.getName())); |
| } |
| } |
| return notConditions; |
| } |
| |
| /** |
| * Creates the dummy Final {@link State} of the NFA graph. |
| * @return dummy Final state |
| */ |
| private State<T> createEndingState() { |
| State<T> endState = createState(ENDING_STATE_NAME, State.StateType.Final); |
| windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L; |
| return endState; |
| } |
| |
| /** |
| * Creates all the states between Start and Final state. |
| * |
| * @param sinkState the state that last state should point to (always the Final state) |
| * @return the next state after Start in the resulting graph |
| */ |
| private State<T> createMiddleStates(final State<T> sinkState) { |
| State<T> lastSink = sinkState; |
| while (currentPattern.getPrevious() != null) { |
| |
| if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { |
| //skip notFollow patterns, they are converted into edge conditions |
| } else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) { |
| final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal); |
| final IterativeCondition<T> notCondition = getTakeCondition(currentPattern); |
| final State<T> stopState = createStopState(notCondition, currentPattern.getName()); |
| |
| if (lastSink.isFinal()) { |
| //so that the proceed to final is not fired |
| notNext.addIgnore(lastSink, new RichNotCondition<>(notCondition)); |
| } else { |
| notNext.addProceed(lastSink, new RichNotCondition<>(notCondition)); |
| } |
| notNext.addProceed(stopState, notCondition); |
| lastSink = notNext; |
| } else { |
| lastSink = convertPattern(lastSink); |
| } |
| |
| // we traverse the pattern graph backwards |
| followingPattern = currentPattern; |
| currentPattern = currentPattern.getPrevious(); |
| |
| final Time currentWindowTime = currentPattern.getWindowTime(); |
| if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) { |
| // the window time is the global minimum of all window times of each state |
| windowTime = currentWindowTime.toMilliseconds(); |
| } |
| } |
| return lastSink; |
| } |
| |
| /** |
| * Creates the Start {@link State} of the resulting NFA graph. |
| * |
| * @param sinkState the state that Start state should point to (always first state of middle states) |
| * @return created state |
| */ |
| @SuppressWarnings("unchecked") |
| private State<T> createStartState(State<T> sinkState) { |
| final State<T> beginningState = convertPattern(sinkState); |
| beginningState.makeStart(); |
| return beginningState; |
| } |
| |
| private State<T> convertPattern(final State<T> sinkState) { |
| final State<T> lastSink; |
| |
| final Quantifier quantifier = currentPattern.getQuantifier(); |
| if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) { |
| |
| // if loop has started then all notPatterns previous to the optional states are no longer valid |
| setCurrentGroupPatternFirstOfLoop(false); |
| final State<T> sink = copyWithoutTransitiveNots(sinkState); |
| final State<T> looping = createLooping(sink); |
| |
| setCurrentGroupPatternFirstOfLoop(true); |
| lastSink = createTimesState(looping, sinkState, currentPattern.getTimes()); |
| } else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) { |
| lastSink = createTimesState(sinkState, sinkState, currentPattern.getTimes()); |
| } else { |
| lastSink = createSingletonState(sinkState); |
| } |
| addStopStates(lastSink); |
| |
| return lastSink; |
| } |
| |
| /** |
| * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states. |
| * Should be used instead of instantiating with new operator. |
| * |
| * @return the created state |
| */ |
| private State<T> createState(String name, State.StateType stateType) { |
| String stateName = stateNameHandler.getUniqueInternalName(name); |
| State<T> state = new State<>(stateName, stateType); |
| states.add(state); |
| return state; |
| } |
| |
| private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) { |
| // We should not duplicate the notStates. All states from which we can stop should point to the same one. |
| State<T> stopState = stopStates.get(name); |
| if (stopState == null) { |
| stopState = createState(name, State.StateType.Stop); |
| stopState.addTake(notCondition); |
| stopStates.put(name, stopState); |
| } |
| return stopState; |
| } |
| |
| /** |
| * This method creates an alternative state that is target for TAKE transition from an optional State. |
| * Accepting an event in optional State discards all not Patterns that were present before it. |
| * |
| * <p>E.g for a Pattern begin("a").notFollowedBy("b").followedByAny("c").optional().followedByAny("d") |
| * a sequence like : {a c b d} is a valid match, but {a b d} is not. |
| * |
| * <p><b>NOTICE:</b> This method creates copy only if it necessary. |
| * |
| * @param sinkState a state to create copy without transitive nots |
| * @return the copy of the state itself if no modifications were needed |
| */ |
| private State<T> copyWithoutTransitiveNots(final State<T> sinkState) { |
| final List<Tuple2<IterativeCondition<T>, String>> currentNotCondition = getCurrentNotCondition(); |
| |
| if (currentNotCondition.isEmpty() || |
| !currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { |
| //we do not create an alternative path if we are NOT in an OPTIONAL state or there is no NOTs prior to |
| //the optional state |
| return sinkState; |
| } |
| |
| final State<T> copyOfSink = createState(sinkState.getName(), sinkState.getStateType()); |
| |
| for (StateTransition<T> tStateTransition : sinkState.getStateTransitions()) { |
| |
| if (tStateTransition.getAction() == StateTransitionAction.PROCEED) { |
| State<T> targetState = tStateTransition.getTargetState(); |
| boolean remove = false; |
| if (targetState.isStop()) { |
| for (Tuple2<IterativeCondition<T>, String> notCondition : currentNotCondition) { |
| if (targetState.getName().equals(notCondition.f1)) { |
| remove = true; |
| } |
| } |
| } else { |
| targetState = copyWithoutTransitiveNots(tStateTransition.getTargetState()); |
| } |
| |
| if (!remove) { |
| copyOfSink.addStateTransition(tStateTransition.getAction(), targetState, tStateTransition.getCondition()); |
| } |
| } else { |
| copyOfSink.addStateTransition( |
| tStateTransition.getAction(), |
| tStateTransition.getTargetState().equals(tStateTransition.getSourceState()) |
| ? copyOfSink |
| : tStateTransition.getTargetState(), |
| tStateTransition.getCondition() |
| ); |
| } |
| |
| } |
| return copyOfSink; |
| } |
| |
| private State<T> copy(final State<T> state) { |
| final State<T> copyOfState = createState( |
| NFAStateNameHandler.getOriginalNameFromInternal(state.getName()), |
| state.getStateType()); |
| for (StateTransition<T> tStateTransition : state.getStateTransitions()) { |
| copyOfState.addStateTransition( |
| tStateTransition.getAction(), |
| tStateTransition.getTargetState().equals(tStateTransition.getSourceState()) |
| ? copyOfState |
| : tStateTransition.getTargetState(), |
| tStateTransition.getCondition()); |
| } |
| return copyOfState; |
| } |
| |
| private void addStopStates(final State<T> state) { |
| for (Tuple2<IterativeCondition<T>, String> notCondition: getCurrentNotCondition()) { |
| final State<T> stopState = createStopState(notCondition.f0, notCondition.f1); |
| state.addProceed(stopState, notCondition.f0); |
| } |
| } |
| |
| private void addStopStateToLooping(final State<T> loopingState) { |
| if (followingPattern != null && |
| followingPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { |
| final IterativeCondition<T> notCondition = getTakeCondition(followingPattern); |
| final State<T> stopState = createStopState(notCondition, followingPattern.getName()); |
| loopingState.addProceed(stopState, notCondition); |
| } |
| } |
| |
| /** |
| * Creates a "complex" state consisting of given number of states with |
| * same {@link IterativeCondition}. |
| * |
| * @param sinkState the state that the created state should point to |
| * @param proceedState state that the state being converted should proceed to |
| * @param times number of times the state should be copied |
| * @return the first state of the "complex" state, next state should point to it |
| */ |
| @SuppressWarnings("unchecked") |
| private State<T> createTimesState(final State<T> sinkState, final State<T> proceedState, Times times) { |
| State<T> lastSink = sinkState; |
| setCurrentGroupPatternFirstOfLoop(false); |
| final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition(); |
| final IterativeCondition<T> innerIgnoreCondition = extendWithUntilCondition( |
| getInnerIgnoreCondition(currentPattern), |
| untilCondition, |
| false); |
| final IterativeCondition<T> takeCondition = extendWithUntilCondition( |
| getTakeCondition(currentPattern), |
| untilCondition, |
| true); |
| |
| if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) && |
| times.getFrom() != times.getTo()) { |
| if (untilCondition != null) { |
| State<T> sinkStateCopy = copy(sinkState); |
| originalStateMap.put(sinkState.getName(), sinkStateCopy); |
| } |
| updateWithGreedyCondition(sinkState, takeCondition); |
| } |
| |
| for (int i = times.getFrom(); i < times.getTo(); i++) { |
| lastSink = createSingletonState(lastSink, proceedState, takeCondition, innerIgnoreCondition, true); |
| addStopStateToLooping(lastSink); |
| } |
| for (int i = 0; i < times.getFrom() - 1; i++) { |
| lastSink = createSingletonState(lastSink, null, takeCondition, innerIgnoreCondition, false); |
| addStopStateToLooping(lastSink); |
| } |
| // we created the intermediate states in the loop, now we create the start of the loop. |
| setCurrentGroupPatternFirstOfLoop(true); |
| return createSingletonState( |
| lastSink, |
| proceedState, |
| takeCondition, |
| getIgnoreCondition(currentPattern), |
| currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)); |
| } |
| |
| /** |
| * Marks the current group pattern as the head of the TIMES quantifier or not. |
| * |
| * @param isFirstOfLoop whether the current group pattern is the head of the TIMES quantifier |
| */ |
| @SuppressWarnings("unchecked") |
| private void setCurrentGroupPatternFirstOfLoop(boolean isFirstOfLoop) { |
| if (currentPattern instanceof GroupPattern) { |
| firstOfLoopMap.put((GroupPattern<T, ?>) currentPattern, isFirstOfLoop); |
| } |
| } |
| |
| /** |
| * Checks if the current group pattern is the head of the TIMES/LOOPING quantifier or not a |
| * TIMES/LOOPING quantifier pattern. |
| */ |
| private boolean isCurrentGroupPatternFirstOfLoop() { |
| if (firstOfLoopMap.containsKey(currentGroupPattern)) { |
| return firstOfLoopMap.get(currentGroupPattern); |
| } else { |
| return true; |
| } |
| } |
| |
| /** |
| * Checks if the given pattern is the head pattern of the current group pattern. |
| * |
| * @param pattern the pattern to be checked |
| * @return {@code true} iff the given pattern is in a group pattern and it is the head pattern of the |
| * group pattern, {@code false} otherwise |
| */ |
| private boolean headOfGroup(Pattern<T, ?> pattern) { |
| return currentGroupPattern != null && pattern.getPrevious() == null; |
| } |
| |
| /** |
| * Checks if the given pattern is optional. If the given pattern is the head of a group pattern, |
| * the optional status depends on the group pattern. |
| */ |
| private boolean isPatternOptional(Pattern<T, ?> pattern) { |
| if (headOfGroup(pattern)) { |
| return isCurrentGroupPatternFirstOfLoop() && |
| currentGroupPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL); |
| } else { |
| return pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL); |
| } |
| } |
| |
| /** |
| * Creates a simple single state. For an OPTIONAL state it also consists |
| * of a similar state without the PROCEED edge, so that for each PROCEED transition branches |
| * in computation state graph can be created only once. |
| * |
| * @param sinkState state that the state being converted should point to |
| * @return the created state |
| */ |
| @SuppressWarnings("unchecked") |
| private State<T> createSingletonState(final State<T> sinkState) { |
| return createSingletonState( |
| sinkState, |
| sinkState, |
| getTakeCondition(currentPattern), |
| getIgnoreCondition(currentPattern), |
| isPatternOptional(currentPattern)); |
| } |
| |
| /** |
| * Creates a simple single state. For an OPTIONAL state it also consists |
| * of a similar state without the PROCEED edge, so that for each PROCEED transition branches |
| * in computation state graph can be created only once. |
| * |
| * @param ignoreCondition condition that should be applied to IGNORE transition |
| * @param sinkState state that the state being converted should point to |
| * @param proceedState state that the state being converted should proceed to |
| * @param isOptional whether the state being converted is optional |
| * @return the created state |
| */ |
| @SuppressWarnings("unchecked") |
| private State<T> createSingletonState(final State<T> sinkState, |
| final State<T> proceedState, |
| final IterativeCondition<T> takeCondition, |
| final IterativeCondition<T> ignoreCondition, |
| final boolean isOptional) { |
| if (currentPattern instanceof GroupPattern) { |
| return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional); |
| } |
| |
| final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal); |
| // if event is accepted then all notPatterns previous to the optional states are no longer valid |
| final State<T> sink = copyWithoutTransitiveNots(sinkState); |
| singletonState.addTake(sink, takeCondition); |
| |
| // if no element accepted the previous nots are still valid. |
| final IterativeCondition<T> proceedCondition = getTrueFunction(); |
| |
| // for the first state of a group pattern, its PROCEED edge should point to the following state of |
| // that group pattern and the edge will be added at the end of creating the NFA for that group pattern |
| if (isOptional && !headOfGroup(currentPattern)) { |
| if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { |
| final IterativeCondition<T> untilCondition = |
| (IterativeCondition<T>) currentPattern.getUntilCondition(); |
| if (untilCondition != null) { |
| singletonState.addProceed( |
| originalStateMap.get(proceedState.getName()), |
| new RichAndCondition<>(proceedCondition, untilCondition)); |
| } |
| singletonState.addProceed(proceedState, |
| untilCondition != null |
| ? new RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition)) |
| : proceedCondition); |
| } else { |
| singletonState.addProceed(proceedState, proceedCondition); |
| } |
| } |
| |
| if (ignoreCondition != null) { |
| final State<T> ignoreState; |
| if (isOptional) { |
| ignoreState = createState(currentPattern.getName(), State.StateType.Normal); |
| ignoreState.addTake(sink, takeCondition); |
| ignoreState.addIgnore(ignoreCondition); |
| addStopStates(ignoreState); |
| } else { |
| ignoreState = singletonState; |
| } |
| singletonState.addIgnore(ignoreState, ignoreCondition); |
| } |
| return singletonState; |
| } |
| |
| /** |
| * Create all the states for the group pattern. |
| * |
| * @param groupPattern the group pattern to create the states for |
| * @param sinkState the state that the group pattern being converted should point to |
| * @param proceedState the state that the group pattern being converted should proceed to |
| * @param isOptional whether the group pattern being converted is optional |
| * @return the first state of the states of the group pattern |
| */ |
| private State<T> createGroupPatternState( |
| final GroupPattern<T, ?> groupPattern, |
| final State<T> sinkState, |
| final State<T> proceedState, |
| final boolean isOptional) { |
| final IterativeCondition<T> proceedCondition = getTrueFunction(); |
| |
| Pattern<T, ?> oldCurrentPattern = currentPattern; |
| Pattern<T, ?> oldFollowingPattern = followingPattern; |
| GroupPattern<T, ?> oldGroupPattern = currentGroupPattern; |
| |
| State<T> lastSink = sinkState; |
| currentGroupPattern = groupPattern; |
| currentPattern = groupPattern.getRawPattern(); |
| lastSink = createMiddleStates(lastSink); |
| lastSink = convertPattern(lastSink); |
| if (isOptional) { |
| // for the first state of a group pattern, its PROCEED edge should point to |
| // the following state of that group pattern |
| lastSink.addProceed(proceedState, proceedCondition); |
| } |
| currentPattern = oldCurrentPattern; |
| followingPattern = oldFollowingPattern; |
| currentGroupPattern = oldGroupPattern; |
| return lastSink; |
| } |
| |
| /** |
| * Create the states for the group pattern as a looping one. |
| * |
| * @param groupPattern the group pattern to create the states for |
| * @param sinkState the state that the group pattern being converted should point to |
| * @return the first state of the states of the group pattern |
| */ |
| private State<T> createLoopingGroupPatternState( |
| final GroupPattern<T, ?> groupPattern, |
| final State<T> sinkState) { |
| final IterativeCondition<T> proceedCondition = getTrueFunction(); |
| |
| Pattern<T, ?> oldCurrentPattern = currentPattern; |
| Pattern<T, ?> oldFollowingPattern = followingPattern; |
| GroupPattern<T, ?> oldGroupPattern = currentGroupPattern; |
| |
| final State<T> dummyState = createState(currentPattern.getName(), State.StateType.Normal); |
| State<T> lastSink = dummyState; |
| currentGroupPattern = groupPattern; |
| currentPattern = groupPattern.getRawPattern(); |
| lastSink = createMiddleStates(lastSink); |
| lastSink = convertPattern(lastSink); |
| lastSink.addProceed(sinkState, proceedCondition); |
| dummyState.addProceed(lastSink, proceedCondition); |
| currentPattern = oldCurrentPattern; |
| followingPattern = oldFollowingPattern; |
| currentGroupPattern = oldGroupPattern; |
| return lastSink; |
| } |
| |
| /** |
| * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and |
| * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that |
| * for each PROCEED transition branches in computation state graph can be created only once. |
| * |
| * @param sinkState the state that the converted state should point to |
| * @return the first state of the created complex state |
| */ |
| @SuppressWarnings("unchecked") |
| private State<T> createLooping(final State<T> sinkState) { |
| if (currentPattern instanceof GroupPattern) { |
| return createLoopingGroupPatternState((GroupPattern) currentPattern, sinkState); |
| } |
| final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition(); |
| |
| final IterativeCondition<T> ignoreCondition = extendWithUntilCondition( |
| getInnerIgnoreCondition(currentPattern), |
| untilCondition, |
| false); |
| final IterativeCondition<T> takeCondition = extendWithUntilCondition( |
| getTakeCondition(currentPattern), |
| untilCondition, |
| true); |
| |
| IterativeCondition<T> proceedCondition = getTrueFunction(); |
| final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal); |
| |
| if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { |
| if (untilCondition != null) { |
| State<T> sinkStateCopy = copy(sinkState); |
| loopingState.addProceed(sinkStateCopy, new RichAndCondition<>(proceedCondition, untilCondition)); |
| originalStateMap.put(sinkState.getName(), sinkStateCopy); |
| } |
| loopingState.addProceed(sinkState, |
| untilCondition != null |
| ? new RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition)) |
| : proceedCondition); |
| updateWithGreedyCondition(sinkState, getTakeCondition(currentPattern)); |
| } else { |
| loopingState.addProceed(sinkState, proceedCondition); |
| } |
| loopingState.addTake(takeCondition); |
| |
| addStopStateToLooping(loopingState); |
| |
| if (ignoreCondition != null) { |
| final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal); |
| ignoreState.addTake(loopingState, takeCondition); |
| ignoreState.addIgnore(ignoreCondition); |
| loopingState.addIgnore(ignoreState, ignoreCondition); |
| |
| addStopStateToLooping(ignoreState); |
| } |
| return loopingState; |
| } |
| |
| /** |
| * This method extends the given condition with stop(until) condition if necessary. |
| * The until condition needs to be applied only if both of the given conditions are not null. |
| * |
| * @param condition the condition to extend |
| * @param untilCondition the until condition to join with the given condition |
| * @param isTakeCondition whether the {@code condition} is for {@code TAKE} edge |
| * @return condition with AND applied or the original condition |
| */ |
| private IterativeCondition<T> extendWithUntilCondition( |
| IterativeCondition<T> condition, |
| IterativeCondition<T> untilCondition, |
| boolean isTakeCondition) { |
| if (untilCondition != null && condition != null) { |
| return new RichAndCondition<>(new RichNotCondition<>(untilCondition), condition); |
| } else if (untilCondition != null && isTakeCondition) { |
| return new RichNotCondition<>(untilCondition); |
| } |
| |
| return condition; |
| } |
| |
| /** |
| * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge |
| * that corresponds to the specified {@link Pattern} and extended with stop(until) condition |
| * if necessary. It is applicable only for inner states of a complex state like looping or times. |
| */ |
| @SuppressWarnings("unchecked") |
| private IterativeCondition<T> getInnerIgnoreCondition(Pattern<T, ?> pattern) { |
| Quantifier.ConsumingStrategy consumingStrategy = pattern.getQuantifier().getInnerConsumingStrategy(); |
| if (headOfGroup(pattern)) { |
| // for the head pattern of a group pattern, we should consider the |
| // inner consume strategy of the group pattern |
| consumingStrategy = currentGroupPattern.getQuantifier().getInnerConsumingStrategy(); |
| } |
| |
| IterativeCondition<T> innerIgnoreCondition = null; |
| switch (consumingStrategy) { |
| case STRICT: |
| innerIgnoreCondition = null; |
| break; |
| case SKIP_TILL_NEXT: |
| innerIgnoreCondition = new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition()); |
| break; |
| case SKIP_TILL_ANY: |
| innerIgnoreCondition = BooleanConditions.trueFunction(); |
| break; |
| } |
| |
| if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { |
| innerIgnoreCondition = extendWithUntilCondition( |
| innerIgnoreCondition, |
| (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), |
| false); |
| } |
| return innerIgnoreCondition; |
| } |
| |
| /** |
| * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge |
| * that corresponds to the specified {@link Pattern} and extended with |
| * stop(until) condition if necessary. For more on strategy see {@link Quantifier} |
| */ |
| @SuppressWarnings("unchecked") |
| private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) { |
| Quantifier.ConsumingStrategy consumingStrategy = pattern.getQuantifier().getConsumingStrategy(); |
| if (headOfGroup(pattern)) { |
| // for the head pattern of a group pattern, we should consider the inner consume strategy |
| // of the group pattern if the group pattern is not the head of the TIMES/LOOPING quantifier; |
| // otherwise, we should consider the consume strategy of the group pattern |
| if (isCurrentGroupPatternFirstOfLoop()) { |
| consumingStrategy = currentGroupPattern.getQuantifier().getConsumingStrategy(); |
| } else { |
| consumingStrategy = currentGroupPattern.getQuantifier().getInnerConsumingStrategy(); |
| } |
| } |
| |
| IterativeCondition<T> ignoreCondition = null; |
| switch (consumingStrategy) { |
| case STRICT: |
| ignoreCondition = null; |
| break; |
| case SKIP_TILL_NEXT: |
| ignoreCondition = new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition()); |
| break; |
| case SKIP_TILL_ANY: |
| ignoreCondition = BooleanConditions.trueFunction(); |
| break; |
| } |
| |
| if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { |
| ignoreCondition = extendWithUntilCondition( |
| ignoreCondition, |
| (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), |
| false); |
| } |
| return ignoreCondition; |
| } |
| |
| /** |
| * @return the {@link IterativeCondition condition} for the {@code TAKE} edge |
| * that corresponds to the specified {@link Pattern} and extended with |
| * stop(until) condition if necessary. |
| */ |
| @SuppressWarnings("unchecked") |
| private IterativeCondition<T> getTakeCondition(Pattern<T, ?> pattern) { |
| IterativeCondition<T> takeCondition = (IterativeCondition<T>) pattern.getCondition(); |
| if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { |
| takeCondition = extendWithUntilCondition( |
| takeCondition, |
| (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), |
| true); |
| } |
| return takeCondition; |
| } |
| |
| /** |
| * @return An true function extended with stop(until) condition if necessary. |
| */ |
| @SuppressWarnings("unchecked") |
| private IterativeCondition<T> getTrueFunction() { |
| IterativeCondition<T> trueCondition = BooleanConditions.trueFunction(); |
| if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { |
| trueCondition = extendWithUntilCondition( |
| trueCondition, |
| (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), |
| true); |
| } |
| return trueCondition; |
| } |
| |
| private void updateWithGreedyCondition( |
| State<T> state, |
| IterativeCondition<T> takeCondition) { |
| for (StateTransition<T> stateTransition : state.getStateTransitions()) { |
| stateTransition.setCondition( |
| new RichAndCondition<>(stateTransition.getCondition(), new RichNotCondition<>(takeCondition))); |
| } |
| } |
| } |
| |
| /** |
| * Factory interface for {@link NFA}. |
| * |
| * @param <T> Type of the input events which are processed by the NFA |
| */ |
| public interface NFAFactory<T> extends Serializable { |
| NFA<T> createNFA(); |
| } |
| |
| /** |
| * Implementation of the {@link NFAFactory} interface. |
| * |
| * <p>The implementation takes the input type serializer, the window time and the set of |
| * states and their transitions to be able to create an NFA from them. |
| * |
| * @param <T> Type of the input events which are processed by the NFA |
| */ |
| private static class NFAFactoryImpl<T> implements NFAFactory<T> { |
| |
| private static final long serialVersionUID = 8939783698296714379L; |
| |
| private final long windowTime; |
| private final Collection<State<T>> states; |
| private final boolean timeoutHandling; |
| |
| private NFAFactoryImpl( |
| long windowTime, |
| Collection<State<T>> states, |
| boolean timeoutHandling) { |
| |
| this.windowTime = windowTime; |
| this.states = states; |
| this.timeoutHandling = timeoutHandling; |
| } |
| |
| @Override |
| public NFA<T> createNFA() { |
| return new NFA<>(states, windowTime, timeoutHandling); |
| } |
| } |
| } |