/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.cep.nfa.compiler;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
import org.apache.flink.cep.nfa.StateTransition;
import org.apache.flink.cep.nfa.StateTransitionAction;
import org.apache.flink.cep.pattern.GroupPattern;
import org.apache.flink.cep.pattern.MalformedPatternException;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.Quantifier.Times;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.RichAndCondition;
import org.apache.flink.cep.pattern.conditions.RichNotCondition;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
 * {@link NFAFactory}.
 */
public class NFACompiler {

	protected static final String ENDING_STATE_NAME = "$endState$";

	/**
	 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
	 * multiple NFAs.
	 *
	 * @param pattern Definition of sequence pattern
	 * @param timeoutHandling True if the NFA shall return timed out event patterns
	 * @param <T> Type of the input events
	 * @return Factory for NFAs corresponding to the given pattern
	 */
	@SuppressWarnings("unchecked")
	public static <T> NFAFactory<T> compileFactory(
		final Pattern<T, ?> pattern,
		boolean timeoutHandling) {
		if (pattern == null) {
			// return a factory for empty NFAs
			return new NFAFactoryImpl<>(0, Collections.<State<T>>emptyList(), timeoutHandling);
		} else {
			final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
			nfaFactoryCompiler.compileFactory();
			return new NFAFactoryImpl<>(nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
		}
	}

	/**
	 * Verifies if the provided pattern can possibly generate empty match. Example of patterns that can possibly
	 * generate empty matches are: A*, A?, A* B? etc.
	 *
	 * @param pattern pattern to check
	 * @return true if empty match could potentially match the pattern, false otherwise
	 */
	public static boolean canProduceEmptyMatches(final Pattern<?, ?> pattern) {
		NFAFactoryCompiler<?> compiler = new NFAFactoryCompiler<>(checkNotNull(pattern));
		compiler.compileFactory();
		State<?> startState = compiler.getStates().stream().filter(State::isStart).findFirst().orElseThrow(
			() -> new IllegalStateException("Compiler produced no start state. It is a bug. File a jira."));

		Set<State<?>> visitedStates = new HashSet<>();
		final Stack<State<?>> statesToCheck = new Stack<>();
		statesToCheck.push(startState);
		while (!statesToCheck.isEmpty()) {
			final State<?> currentState = statesToCheck.pop();
			if (visitedStates.contains(currentState)) {
				continue;
			} else {
				visitedStates.add(currentState);
			}

			for (StateTransition<?> transition : currentState.getStateTransitions()) {
				if (transition.getAction() == StateTransitionAction.PROCEED) {
					if (transition.getTargetState().isFinal()) {
						return true;
					} else {
						statesToCheck.push(transition.getTargetState());
					}
				}
			}
		}

		return false;
	}

	/**
	 * Converts a {@link Pattern} into graph of {@link State}. It enables sharing of
	 * compilation state across methods.
	 *
	 * @param <T>
	 */
	static class NFAFactoryCompiler<T> {

		private final NFAStateNameHandler stateNameHandler = new NFAStateNameHandler();
		private final Map<String, State<T>> stopStates = new HashMap<>();
		private final List<State<T>> states = new ArrayList<>();

		private long windowTime = 0;
		private GroupPattern<T, ?> currentGroupPattern;
		private Map<GroupPattern<T, ?>, Boolean> firstOfLoopMap = new HashMap<>();
		private Pattern<T, ?> currentPattern;
		private Pattern<T, ?> followingPattern;
		private final AfterMatchSkipStrategy afterMatchSkipStrategy;
		private Map<String, State<T>> originalStateMap = new HashMap<>();

		NFAFactoryCompiler(final Pattern<T, ?> pattern) {
			this.currentPattern = pattern;
			afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
		}

		/**
		 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
		 * multiple NFAs.
		 */
		void compileFactory() {
			if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
				throw new MalformedPatternException("NotFollowedBy is not supported as a last part of a Pattern!");
			}

			checkPatternNameUniqueness();

			checkPatternSkipStrategy();

			// we're traversing the pattern from the end to the beginning --> the first state is the final state
			State<T> sinkState = createEndingState();
			// add all the normal states
			sinkState = createMiddleStates(sinkState);
			// add the beginning state
			createStartState(sinkState);
		}

		AfterMatchSkipStrategy getAfterMatchSkipStrategy(){
			return afterMatchSkipStrategy;
		}

		List<State<T>> getStates() {
			return states;
		}

		long getWindowTime() {
			return windowTime;
		}

		/**
		 * Check pattern after match skip strategy.
		 */
		private void checkPatternSkipStrategy() {
			if (afterMatchSkipStrategy.getPatternName().isPresent()) {
				String patternName = afterMatchSkipStrategy.getPatternName().get();
				Pattern<T, ?> pattern = currentPattern;
				while (pattern.getPrevious() != null && !pattern.getName().equals(patternName)) {
					pattern = pattern.getPrevious();
				}

				// pattern name match check.
				if (!pattern.getName().equals(patternName)) {
					throw new MalformedPatternException("The pattern name specified in AfterMatchSkipStrategy " +
						"can not be found in the given Pattern");
				}
			}
		}

		/**
		 * Check if there are duplicate pattern names. If yes, it
		 * throws a {@link MalformedPatternException}.
		 */
		private void checkPatternNameUniqueness() {
			// make sure there is no pattern with name "$endState$"
			stateNameHandler.checkNameUniqueness(ENDING_STATE_NAME);
			Pattern patternToCheck = currentPattern;
			while (patternToCheck != null) {
				checkPatternNameUniqueness(patternToCheck);
				patternToCheck = patternToCheck.getPrevious();
			}
			stateNameHandler.clear();
		}

		/**
		 * Check if the given pattern's name is already used or not. If yes, it
		 * throws a {@link MalformedPatternException}.
		 *
		 * @param pattern The pattern to be checked
		 */
		private void checkPatternNameUniqueness(final Pattern pattern) {
			if (pattern instanceof GroupPattern) {
				Pattern patternToCheck = ((GroupPattern) pattern).getRawPattern();
				while (patternToCheck != null) {
					checkPatternNameUniqueness(patternToCheck);
					patternToCheck = patternToCheck.getPrevious();
				}
			} else {
				stateNameHandler.checkNameUniqueness(pattern.getName());
			}
		}

		/**
		 * Retrieves list of conditions resulting in Stop state and names of the corresponding NOT patterns.
		 *
		 * <p>A current not condition can be produced in two cases:
		 * <ol>
		 *     <li>the previous pattern is a {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li>
		 *     <li>exists a backward path of {@link Quantifier.QuantifierProperty#OPTIONAL} patterns to
		 *       {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li>
		 * </ol>
		 *
		 * <p><b>WARNING:</b> for more info on the second case see: {@link NFAFactoryCompiler#copyWithoutTransitiveNots(State)}
		 *
		 * @return list of not conditions with corresponding names
		 */
		private List<Tuple2<IterativeCondition<T>, String>> getCurrentNotCondition() {
			List<Tuple2<IterativeCondition<T>, String>> notConditions = new ArrayList<>();

			Pattern<T, ? extends T> previousPattern = currentPattern;
			while (previousPattern.getPrevious() != null && (
				previousPattern.getPrevious().getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL) ||
				previousPattern.getPrevious().getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW)) {

				previousPattern = previousPattern.getPrevious();

				if (previousPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
					final IterativeCondition<T> notCondition = getTakeCondition(previousPattern);
					notConditions.add(Tuple2.of(notCondition, previousPattern.getName()));
				}
			}
			return notConditions;
		}

		/**
		 * Creates the dummy Final {@link State} of the NFA graph.
		 * @return dummy Final state
		 */
		private State<T> createEndingState() {
			State<T> endState = createState(ENDING_STATE_NAME, State.StateType.Final);
			windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
			return endState;
		}

		/**
		 * Creates all the states between Start and Final state.
		 *
		 * @param sinkState the state that last state should point to (always the Final state)
		 * @return the next state after Start in the resulting graph
		 */
		private State<T> createMiddleStates(final State<T> sinkState) {
			State<T> lastSink = sinkState;
			while (currentPattern.getPrevious() != null) {

				if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
					//skip notFollow patterns, they are converted into edge conditions
				} else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
					final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal);
					final IterativeCondition<T> notCondition = getTakeCondition(currentPattern);
					final State<T> stopState = createStopState(notCondition, currentPattern.getName());

					if (lastSink.isFinal()) {
						//so that the proceed to final is not fired
						notNext.addIgnore(lastSink, new RichNotCondition<>(notCondition));
					} else {
						notNext.addProceed(lastSink, new RichNotCondition<>(notCondition));
					}
					notNext.addProceed(stopState, notCondition);
					lastSink = notNext;
				} else {
					lastSink = convertPattern(lastSink);
				}

				// we traverse the pattern graph backwards
				followingPattern = currentPattern;
				currentPattern = currentPattern.getPrevious();

				final Time currentWindowTime = currentPattern.getWindowTime();
				if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
					// the window time is the global minimum of all window times of each state
					windowTime = currentWindowTime.toMilliseconds();
				}
			}
			return lastSink;
		}

		/**
		 * Creates the Start {@link State} of the resulting NFA graph.
		 *
		 * @param sinkState the state that Start state should point to (always first state of middle states)
		 * @return created state
		 */
		@SuppressWarnings("unchecked")
		private State<T> createStartState(State<T> sinkState) {
			final State<T> beginningState = convertPattern(sinkState);
			beginningState.makeStart();
			return beginningState;
		}

		private State<T> convertPattern(final State<T> sinkState) {
			final State<T> lastSink;

			final Quantifier quantifier = currentPattern.getQuantifier();
			if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {

				// if loop has started then all notPatterns previous to the optional states are no longer valid
				setCurrentGroupPatternFirstOfLoop(false);
				final State<T> sink = copyWithoutTransitiveNots(sinkState);
				final State<T> looping = createLooping(sink);

				setCurrentGroupPatternFirstOfLoop(true);
				lastSink = createTimesState(looping, sinkState, currentPattern.getTimes());
			} else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
				lastSink = createTimesState(sinkState, sinkState, currentPattern.getTimes());
			} else {
				lastSink = createSingletonState(sinkState);
			}
			addStopStates(lastSink);

			return lastSink;
		}

		/**
		 * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states.
		 * Should be used instead of instantiating with new operator.
		 *
		 * @return the created state
		 */
		private State<T> createState(String name, State.StateType stateType) {
			String stateName = stateNameHandler.getUniqueInternalName(name);
			State<T> state = new State<>(stateName, stateType);
			states.add(state);
			return state;
		}

		private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) {
			// We should not duplicate the notStates. All states from which we can stop should point to the same one.
			State<T> stopState = stopStates.get(name);
			if (stopState == null) {
				stopState = createState(name, State.StateType.Stop);
				stopState.addTake(notCondition);
				stopStates.put(name, stopState);
			}
			return stopState;
		}

		/**
		 * This method creates an alternative state that is target for TAKE transition from an optional State.
		 * Accepting an event in optional State discards all not Patterns that were present before it.
		 *
		 * <p>E.g for a Pattern begin("a").notFollowedBy("b").followedByAny("c").optional().followedByAny("d")
		 * a sequence like : {a c b d} is a valid match, but {a b d} is not.
		 *
		 * <p><b>NOTICE:</b> This method creates copy only if it necessary.
		 *
		 * @param sinkState a state to create copy without transitive nots
		 * @return the copy of the state itself if no modifications were needed
		 */
		private State<T> copyWithoutTransitiveNots(final State<T> sinkState) {
			final List<Tuple2<IterativeCondition<T>, String>> currentNotCondition = getCurrentNotCondition();

			if (currentNotCondition.isEmpty() ||
				!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
				//we do not create an alternative path if we are NOT in an OPTIONAL state or there is no NOTs prior to
				//the optional state
				return sinkState;
			}

			final State<T> copyOfSink = createState(sinkState.getName(), sinkState.getStateType());

			for (StateTransition<T> tStateTransition : sinkState.getStateTransitions()) {

				if (tStateTransition.getAction() == StateTransitionAction.PROCEED) {
					State<T> targetState = tStateTransition.getTargetState();
					boolean remove = false;
					if (targetState.isStop()) {
						for (Tuple2<IterativeCondition<T>, String> notCondition : currentNotCondition) {
							if (targetState.getName().equals(notCondition.f1)) {
								remove = true;
							}
						}
					} else {
						targetState = copyWithoutTransitiveNots(tStateTransition.getTargetState());
					}

					if (!remove) {
						copyOfSink.addStateTransition(tStateTransition.getAction(), targetState, tStateTransition.getCondition());
					}
				} else {
					copyOfSink.addStateTransition(
							tStateTransition.getAction(),
							tStateTransition.getTargetState().equals(tStateTransition.getSourceState())
									? copyOfSink
									: tStateTransition.getTargetState(),
							tStateTransition.getCondition()
					);
				}

			}
			return copyOfSink;
		}

		private State<T> copy(final State<T> state) {
			final State<T> copyOfState = createState(
				NFAStateNameHandler.getOriginalNameFromInternal(state.getName()),
				state.getStateType());
			for (StateTransition<T> tStateTransition : state.getStateTransitions()) {
				copyOfState.addStateTransition(
					tStateTransition.getAction(),
					tStateTransition.getTargetState().equals(tStateTransition.getSourceState())
							? copyOfState
							: tStateTransition.getTargetState(),
					tStateTransition.getCondition());
			}
			return copyOfState;
		}

		private void addStopStates(final State<T> state) {
			for (Tuple2<IterativeCondition<T>, String> notCondition: getCurrentNotCondition()) {
				final State<T> stopState = createStopState(notCondition.f0, notCondition.f1);
				state.addProceed(stopState, notCondition.f0);
			}
		}

		private void addStopStateToLooping(final State<T> loopingState) {
			if (followingPattern != null &&
					followingPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
				final IterativeCondition<T> notCondition = getTakeCondition(followingPattern);
				final State<T> stopState = createStopState(notCondition, followingPattern.getName());
				loopingState.addProceed(stopState, notCondition);
			}
		}

		/**
		 * Creates a "complex" state consisting of given number of states with
		 * same {@link IterativeCondition}.
		 *
		 * @param sinkState the state that the created state should point to
		 * @param proceedState state that the state being converted should proceed to
		 * @param times     number of times the state should be copied
		 * @return the first state of the "complex" state, next state should point to it
		 */
		@SuppressWarnings("unchecked")
		private State<T> createTimesState(final State<T> sinkState, final State<T> proceedState, Times times) {
			State<T> lastSink = sinkState;
			setCurrentGroupPatternFirstOfLoop(false);
			final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition();
			final IterativeCondition<T> innerIgnoreCondition = extendWithUntilCondition(
				getInnerIgnoreCondition(currentPattern),
				untilCondition,
				false);
			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
				getTakeCondition(currentPattern),
				untilCondition,
				true);

			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) &&
				times.getFrom() != times.getTo()) {
				if (untilCondition != null) {
					State<T> sinkStateCopy = copy(sinkState);
					originalStateMap.put(sinkState.getName(), sinkStateCopy);
				}
				updateWithGreedyCondition(sinkState, takeCondition);
			}

			for (int i = times.getFrom(); i < times.getTo(); i++) {
				lastSink = createSingletonState(lastSink, proceedState, takeCondition, innerIgnoreCondition, true);
				addStopStateToLooping(lastSink);
			}
			for (int i = 0; i < times.getFrom() - 1; i++) {
				lastSink = createSingletonState(lastSink, null, takeCondition, innerIgnoreCondition, false);
				addStopStateToLooping(lastSink);
			}
			// we created the intermediate states in the loop, now we create the start of the loop.
			setCurrentGroupPatternFirstOfLoop(true);
			return createSingletonState(
				lastSink,
				proceedState,
				takeCondition,
				getIgnoreCondition(currentPattern),
				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
		}

		/**
		 * Marks the current group pattern as the head of the TIMES quantifier or not.
		 *
		 * @param isFirstOfLoop whether the current group pattern is the head of the TIMES quantifier
		 */
		@SuppressWarnings("unchecked")
		private void setCurrentGroupPatternFirstOfLoop(boolean isFirstOfLoop) {
			if (currentPattern instanceof GroupPattern) {
				firstOfLoopMap.put((GroupPattern<T, ?>) currentPattern, isFirstOfLoop);
			}
		}

		/**
		 * Checks if the current group pattern is the head of the TIMES/LOOPING quantifier or not a
		 * TIMES/LOOPING quantifier pattern.
		 */
		private boolean isCurrentGroupPatternFirstOfLoop() {
			if (firstOfLoopMap.containsKey(currentGroupPattern)) {
				return firstOfLoopMap.get(currentGroupPattern);
			} else {
				return true;
			}
		}

		/**
		 * Checks if the given pattern is the head pattern of the current group pattern.
		 *
		 * @param pattern the pattern to be checked
		 * @return {@code true} iff the given pattern is in a group pattern and it is the head pattern of the
		 * group pattern, {@code false} otherwise
		 */
		private boolean headOfGroup(Pattern<T, ?> pattern) {
			return currentGroupPattern != null && pattern.getPrevious() == null;
		}

		/**
		 * Checks if the given pattern is optional. If the given pattern is the head of a group pattern,
		 * the optional status depends on the group pattern.
		 */
		private boolean isPatternOptional(Pattern<T, ?> pattern) {
			if (headOfGroup(pattern)) {
				return isCurrentGroupPatternFirstOfLoop() &&
					currentGroupPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL);
			} else {
				return pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL);
			}
		}

		/**
		 * Creates a simple single state. For an OPTIONAL state it also consists
		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
		 * in computation state graph  can be created only once.
		 *
		 * @param sinkState state that the state being converted should point to
		 * @return the created state
		 */
		@SuppressWarnings("unchecked")
		private State<T> createSingletonState(final State<T> sinkState) {
			return createSingletonState(
				sinkState,
				sinkState,
				getTakeCondition(currentPattern),
				getIgnoreCondition(currentPattern),
				isPatternOptional(currentPattern));
		}

		/**
		 * Creates a simple single state. For an OPTIONAL state it also consists
		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
		 * in computation state graph  can be created only once.
		 *
		 * @param ignoreCondition condition that should be applied to IGNORE transition
		 * @param sinkState state that the state being converted should point to
		 * @param proceedState state that the state being converted should proceed to
		 * @param isOptional whether the state being converted is optional
		 * @return the created state
		 */
		@SuppressWarnings("unchecked")
		private State<T> createSingletonState(final State<T> sinkState,
			final State<T> proceedState,
			final IterativeCondition<T> takeCondition,
			final IterativeCondition<T> ignoreCondition,
			final boolean isOptional) {
			if (currentPattern instanceof GroupPattern) {
				return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional);
			}

			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
			// if event is accepted then all notPatterns previous to the optional states are no longer valid
			final State<T> sink = copyWithoutTransitiveNots(sinkState);
			singletonState.addTake(sink, takeCondition);

			// if no element accepted the previous nots are still valid.
			final IterativeCondition<T> proceedCondition = getTrueFunction();

			// for the first state of a group pattern, its PROCEED edge should point to the following state of
			// that group pattern and the edge will be added at the end of creating the NFA for that group pattern
			if (isOptional && !headOfGroup(currentPattern)) {
				if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
					final IterativeCondition<T> untilCondition =
						(IterativeCondition<T>) currentPattern.getUntilCondition();
					if (untilCondition != null) {
						singletonState.addProceed(
							originalStateMap.get(proceedState.getName()),
							new RichAndCondition<>(proceedCondition, untilCondition));
					}
					singletonState.addProceed(proceedState,
						untilCondition != null
							? new RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition))
							: proceedCondition);
				} else {
					singletonState.addProceed(proceedState, proceedCondition);
				}
			}

			if (ignoreCondition != null) {
				final State<T> ignoreState;
				if (isOptional) {
					ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
					ignoreState.addTake(sink, takeCondition);
					ignoreState.addIgnore(ignoreCondition);
					addStopStates(ignoreState);
				} else {
					ignoreState = singletonState;
				}
				singletonState.addIgnore(ignoreState, ignoreCondition);
			}
			return singletonState;
		}

		/**
		 * Create all the states for the group pattern.
		 *
		 * @param groupPattern the group pattern to create the states for
		 * @param sinkState the state that the group pattern being converted should point to
		 * @param proceedState the state that the group pattern being converted should proceed to
		 * @param isOptional whether the group pattern being converted is optional
		 * @return the first state of the states of the group pattern
		 */
		private State<T> createGroupPatternState(
			final GroupPattern<T, ?> groupPattern,
			final State<T> sinkState,
			final State<T> proceedState,
			final boolean isOptional) {
			final IterativeCondition<T> proceedCondition = getTrueFunction();

			Pattern<T, ?> oldCurrentPattern = currentPattern;
			Pattern<T, ?> oldFollowingPattern = followingPattern;
			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;

			State<T> lastSink = sinkState;
			currentGroupPattern = groupPattern;
			currentPattern = groupPattern.getRawPattern();
			lastSink = createMiddleStates(lastSink);
			lastSink = convertPattern(lastSink);
			if (isOptional) {
				// for the first state of a group pattern, its PROCEED edge should point to
				// the following state of that group pattern
				lastSink.addProceed(proceedState, proceedCondition);
			}
			currentPattern = oldCurrentPattern;
			followingPattern = oldFollowingPattern;
			currentGroupPattern = oldGroupPattern;
			return lastSink;
		}

		/**
		 * Create the states for the group pattern as a looping one.
		 *
		 * @param groupPattern the group pattern to create the states for
		 * @param sinkState the state that the group pattern being converted should point to
		 * @return the first state of the states of the group pattern
		 */
		private State<T> createLoopingGroupPatternState(
			final GroupPattern<T, ?> groupPattern,
			final State<T> sinkState) {
			final IterativeCondition<T> proceedCondition = getTrueFunction();

			Pattern<T, ?> oldCurrentPattern = currentPattern;
			Pattern<T, ?> oldFollowingPattern = followingPattern;
			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;

			final State<T> dummyState = createState(currentPattern.getName(), State.StateType.Normal);
			State<T> lastSink = dummyState;
			currentGroupPattern = groupPattern;
			currentPattern = groupPattern.getRawPattern();
			lastSink = createMiddleStates(lastSink);
			lastSink = convertPattern(lastSink);
			lastSink.addProceed(sinkState, proceedCondition);
			dummyState.addProceed(lastSink, proceedCondition);
			currentPattern = oldCurrentPattern;
			followingPattern = oldFollowingPattern;
			currentGroupPattern = oldGroupPattern;
			return lastSink;
		}

		/**
		 * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and
		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
		 * for each PROCEED transition branches in computation state graph  can be created only once.
		 *
		 * @param sinkState the state that the converted state should point to
		 * @return the first state of the created complex state
		 */
		@SuppressWarnings("unchecked")
		private State<T> createLooping(final State<T> sinkState) {
			if (currentPattern instanceof GroupPattern) {
				return createLoopingGroupPatternState((GroupPattern) currentPattern, sinkState);
			}
			final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition();

			final IterativeCondition<T> ignoreCondition = extendWithUntilCondition(
				getInnerIgnoreCondition(currentPattern),
				untilCondition,
				false);
			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
				getTakeCondition(currentPattern),
				untilCondition,
				true);

			IterativeCondition<T> proceedCondition = getTrueFunction();
			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);

			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
				if (untilCondition != null) {
					State<T> sinkStateCopy = copy(sinkState);
					loopingState.addProceed(sinkStateCopy, new RichAndCondition<>(proceedCondition, untilCondition));
					originalStateMap.put(sinkState.getName(), sinkStateCopy);
				}
				loopingState.addProceed(sinkState,
					untilCondition != null
						? new RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition))
						: proceedCondition);
				updateWithGreedyCondition(sinkState, getTakeCondition(currentPattern));
			} else {
				loopingState.addProceed(sinkState, proceedCondition);
			}
			loopingState.addTake(takeCondition);

			addStopStateToLooping(loopingState);

			if (ignoreCondition != null) {
				final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
				ignoreState.addTake(loopingState, takeCondition);
				ignoreState.addIgnore(ignoreCondition);
				loopingState.addIgnore(ignoreState, ignoreCondition);

				addStopStateToLooping(ignoreState);
			}
			return loopingState;
		}

		/**
		 * This method extends the given condition with stop(until) condition if necessary.
		 * The until condition needs to be applied only if both of the given conditions are not null.
		 *
		 * @param condition the condition to extend
		 * @param untilCondition the until condition to join with the given condition
		 * @param isTakeCondition whether the {@code condition} is for {@code TAKE} edge
		 * @return condition with AND applied or the original condition
		 */
		private IterativeCondition<T> extendWithUntilCondition(
				IterativeCondition<T> condition,
				IterativeCondition<T> untilCondition,
				boolean isTakeCondition) {
			if (untilCondition != null && condition != null) {
				return new RichAndCondition<>(new RichNotCondition<>(untilCondition), condition);
			} else if (untilCondition != null && isTakeCondition) {
				return new RichNotCondition<>(untilCondition);
			}

			return condition;
		}

		/**
		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
		 * that corresponds to the specified {@link Pattern} and extended with stop(until) condition
		 * if necessary. It is applicable only for inner states of a complex state like looping or times.
		 */
		@SuppressWarnings("unchecked")
		private IterativeCondition<T> getInnerIgnoreCondition(Pattern<T, ?> pattern) {
			Quantifier.ConsumingStrategy consumingStrategy = pattern.getQuantifier().getInnerConsumingStrategy();
			if (headOfGroup(pattern)) {
				// for the head pattern of a group pattern, we should consider the
				// inner consume strategy of the group pattern
				consumingStrategy = currentGroupPattern.getQuantifier().getInnerConsumingStrategy();
			}

			IterativeCondition<T> innerIgnoreCondition = null;
			switch (consumingStrategy) {
				case STRICT:
					innerIgnoreCondition = null;
					break;
				case SKIP_TILL_NEXT:
					innerIgnoreCondition = new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
					break;
				case SKIP_TILL_ANY:
					innerIgnoreCondition = BooleanConditions.trueFunction();
					break;
			}

			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
				innerIgnoreCondition = extendWithUntilCondition(
					innerIgnoreCondition,
					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
					false);
			}
			return innerIgnoreCondition;
		}

		/**
		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
		 * that corresponds to the specified {@link Pattern} and extended with
		 * stop(until) condition if necessary. For more on strategy see {@link Quantifier}
		 */
		@SuppressWarnings("unchecked")
		private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) {
			Quantifier.ConsumingStrategy consumingStrategy = pattern.getQuantifier().getConsumingStrategy();
			if (headOfGroup(pattern)) {
				// for the head pattern of a group pattern, we should consider the inner consume strategy
				// of the group pattern if the group pattern is not the head of the TIMES/LOOPING quantifier;
				// otherwise, we should consider the consume strategy of the group pattern
				if (isCurrentGroupPatternFirstOfLoop()) {
					consumingStrategy = currentGroupPattern.getQuantifier().getConsumingStrategy();
				} else {
					consumingStrategy = currentGroupPattern.getQuantifier().getInnerConsumingStrategy();
				}
			}

			IterativeCondition<T> ignoreCondition = null;
			switch (consumingStrategy) {
				case STRICT:
					ignoreCondition = null;
					break;
				case SKIP_TILL_NEXT:
					ignoreCondition = new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
					break;
				case SKIP_TILL_ANY:
					ignoreCondition = BooleanConditions.trueFunction();
					break;
			}

			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
				ignoreCondition = extendWithUntilCondition(
					ignoreCondition,
					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
					false);
			}
			return ignoreCondition;
		}

		/**
		 * @return the {@link IterativeCondition condition} for the {@code TAKE} edge
		 * that corresponds to the specified {@link Pattern} and extended with
		 * stop(until) condition if necessary.
		 */
		@SuppressWarnings("unchecked")
		private IterativeCondition<T> getTakeCondition(Pattern<T, ?> pattern) {
			IterativeCondition<T> takeCondition = (IterativeCondition<T>) pattern.getCondition();
			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
				takeCondition = extendWithUntilCondition(
					takeCondition,
					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
					true);
			}
			return takeCondition;
		}

		/**
		 * @return An true function extended with stop(until) condition if necessary.
		 */
		@SuppressWarnings("unchecked")
		private IterativeCondition<T> getTrueFunction() {
			IterativeCondition<T> trueCondition = BooleanConditions.trueFunction();
			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
				trueCondition = extendWithUntilCondition(
					trueCondition,
					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
					true);
			}
			return trueCondition;
		}

		private void updateWithGreedyCondition(
			State<T> state,
			IterativeCondition<T> takeCondition) {
			for (StateTransition<T> stateTransition : state.getStateTransitions()) {
				stateTransition.setCondition(
					new RichAndCondition<>(stateTransition.getCondition(), new RichNotCondition<>(takeCondition)));
			}
		}
	}

	/**
	 * Factory interface for {@link NFA}.
	 *
	 * @param <T> Type of the input events which are processed by the NFA
	 */
	public interface NFAFactory<T> extends Serializable {
		NFA<T> createNFA();
	}

	/**
	 * Implementation of the {@link NFAFactory} interface.
	 *
	 * <p>The implementation takes the input type serializer, the window time and the set of
	 * states and their transitions to be able to create an NFA from them.
	 *
	 * @param <T> Type of the input events which are processed by the NFA
	 */
	private static class NFAFactoryImpl<T> implements NFAFactory<T> {

		private static final long serialVersionUID = 8939783698296714379L;

		private final long windowTime;
		private final Collection<State<T>> states;
		private final boolean timeoutHandling;

		private NFAFactoryImpl(
				long windowTime,
				Collection<State<T>> states,
				boolean timeoutHandling) {

			this.windowTime = windowTime;
			this.states = states;
			this.timeoutHandling = timeoutHandling;
		}

		@Override
		public NFA<T> createNFA() {
			return new NFA<>(states, windowTime, timeoutHandling);
		}
	}
}
