blob: 42a95e82c286eed46c3c9d0385d2f92ef09981cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.scala.pattern
import org.apache.flink.cep
import org.apache.flink.cep.nfa.AfterMatchSkipStrategy
import org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JContext}
import org.apache.flink.cep.pattern.conditions.{IterativeCondition, SimpleCondition}
import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, Pattern => JPattern}
import org.apache.flink.cep.scala.conditions.Context
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Base class for a pattern definition.
*
* A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
* a [[org.apache.flink.cep.nfa.NFA]].
*
* {{{
* Pattern<T, F> pattern = Pattern.<T>begin("start")
* .next("middle").subtype(F.class)
* .followedBy("end").where(new MyCondition());
* }}}
*
* @param jPattern Underlying Java API Pattern
* @tparam T Base type of the elements appearing in the pattern
* @tparam F Subtype of T to which the current pattern operator is constrained
*/
class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
private[flink] def wrappedPattern = jPattern
/**
* @return The previous pattern
*/
def getPrevious: Option[Pattern[T, _ <: T]] = {
wrapPattern(jPattern.getPrevious)
}
/**
*
* @return Name of the pattern operator
*/
def getName: String = jPattern.getName
/**
*
* @return Window length in which the pattern match has to occur
*/
def getWindowTime: Option[Time] = {
Option(jPattern.getWindowTime)
}
/**
*
* @return currently applied quantifier to this pattern
*/
def getQuantifier: Quantifier = jPattern.getQuantifier
def getCondition: Option[IterativeCondition[F]] = {
Option(jPattern.getCondition)
}
def getUntilCondition: Option[IterativeCondition[F]] = {
Option(jPattern.getUntilCondition)
}
/**
* Adds a condition that has to be satisfied by an event
* in order to be considered a match. If another condition has already been
* set, the new one is going to be combined with the previous with a
* logical {{{AND}}}. In other case, this is going to be the only
* condition.
*
* @param condition The condition as an [[IterativeCondition]].
* @return The pattern with the new condition is set.
*/
def where(condition: IterativeCondition[F]): Pattern[T, F] = {
jPattern.where(condition)
this
}
/**
* Adds a condition that has to be satisfied by an event
* in order to be considered a match. If another condition has already been
* set, the new one is going to be combined with the previous with a
* logical {{{AND}}}. In other case, this is going to be the only
* condition.
*
* @param condition The condition to be set.
* @return The pattern with the new condition is set.
*/
def where(condition: (F, Context[F]) => Boolean): Pattern[T, F] = {
val condFun = new IterativeCondition[F] {
val cleanCond = cep.scala.cleanClosure(condition)
override def filter(value: F, ctx: JContext[F]): Boolean = {
cleanCond(value, new JContextWrapper(ctx))
}
}
where(condFun)
}
/**
* Adds a condition that has to be satisfied by an event
* in order to be considered a match. If another condition has already been
* set, the new one is going to be combined with the previous with a
* logical {{{AND}}}. In other case, this is going to be the only
* condition.
*
* @param condition The condition to be set.
* @return The pattern with the new condition is set.
*/
def where(condition: F => Boolean): Pattern[T, F] = {
val condFun = new IterativeCondition[F] {
val cleanCond = cep.scala.cleanClosure(condition)
override def filter(value: F, ctx: JContext[F]): Boolean = cleanCond(value)
}
where(condFun)
}
/**
* Adds a condition that has to be satisfied by an event
* in order to be considered a match. If another condition has already been
* set, the new one is going to be combined with the previous with a
* logical {{{OR}}}. In other case, this is going to be the only
* condition.
*
* @param condition The condition as an [[IterativeCondition]].
* @return The pattern with the new condition is set.
*/
def or(condition: IterativeCondition[F]): Pattern[T, F] = {
jPattern.or(condition)
this
}
/**
* Adds a condition that has to be satisfied by an event
* in order to be considered a match. If another condition has already been
* set, the new one is going to be combined with the previous with a
* logical {{{OR}}}. In other case, this is going to be the only
* condition.
*
* @param condition The {{{OR}}} condition.
* @return The pattern with the new condition is set.
*/
def or(condition: F => Boolean): Pattern[T, F] = {
val condFun = new SimpleCondition[F] {
val cleanCond = cep.scala.cleanClosure(condition)
override def filter(value: F): Boolean =
cleanCond(value)
}
or(condFun)
}
/**
* Adds a condition that has to be satisfied by an event
* in order to be considered a match. If another condition has already been
* set, the new one is going to be combined with the previous with a
* logical {{{OR}}}. In other case, this is going to be the only
* condition.
*
* @param condition The {{{OR}}} condition.
* @return The pattern with the new condition is set.
*/
def or(condition: (F, Context[F]) => Boolean): Pattern[T, F] = {
val condFun = new IterativeCondition[F] {
val cleanCond = cep.scala.cleanClosure(condition)
override def filter(value: F, ctx: JContext[F]): Boolean =
cleanCond(value, new JContextWrapper(ctx))
}
or(condFun)
}
/**
* Applies a subtype constraint on the current pattern. This means that an event has
* to be of the given subtype in order to be matched.
*
* @param clazz Class of the subtype
* @tparam S Type of the subtype
* @return The same pattern with the new subtype constraint
*/
def subtype[S <: F](clazz: Class[S]): Pattern[T, S] = {
jPattern.subtype(clazz)
this.asInstanceOf[Pattern[T, S]]
}
/**
* Applies a stop condition for a looping state. It allows cleaning the underlying state.
*
* @param untilCondition a condition an event has to satisfy to stop collecting events into
* looping state
* @return The same pattern with applied untilCondition
*/
def until(untilCondition: IterativeCondition[F]): Pattern[T, F] = {
jPattern.until(untilCondition)
this
}
/**
* Applies a stop condition for a looping state. It allows cleaning the underlying state.
*
* @param untilCondition a condition an event has to satisfy to stop collecting events into
* looping state
* @return The same pattern with applied untilCondition
*/
def until(untilCondition: (F, Context[F]) => Boolean): Pattern[T, F] = {
val condFun = new IterativeCondition[F] {
val cleanCond = cep.scala.cleanClosure(untilCondition)
override def filter(value: F, ctx: JContext[F]): Boolean =
cleanCond(value, new JContextWrapper(ctx))
}
until(condFun)
}
/**
* Applies a stop condition for a looping state. It allows cleaning the underlying state.
*
* @param untilCondition a condition an event has to satisfy to stop collecting events into
* looping state
* @return The same pattern with applied untilCondition
*/
def until(untilCondition: F => Boolean): Pattern[T, F] = {
val condFun = new IterativeCondition[F] {
val cleanCond = cep.scala.cleanClosure(untilCondition)
override def filter(value: F, ctx: JContext[F]): Boolean = cleanCond(value)
}
until(condFun)
}
/**
* Defines the maximum time interval in which a matching pattern has to be completed in
* order to be considered valid. This interval corresponds to the maximum time gap between first
* and the last event.
*
* @param windowTime Time of the matching window
* @return The same pattern operator with the new window length
*/
def within(windowTime: Time): Pattern[T, F] = {
jPattern.within(windowTime)
this
}
/**
* Appends a new pattern to the existing one. The new pattern enforces strict
* temporal contiguity. This means that the whole pattern sequence matches only
* if an event which matches this pattern directly follows the preceding matching
* event. Thus, there cannot be any events in between two matching events.
*
* @param name Name of the new pattern
* @return A new pattern which is appended to this one
*/
def next(name: String): Pattern[T, T] = {
Pattern[T, T](jPattern.next(name))
}
/**
* Appends a new pattern to the existing one. The new pattern enforces that there is no event
* matching this pattern right after the preceding matched event.
*
* @param name Name of the new pattern
* @return A new pattern which is appended to this one
*/
def notNext(name: String): Pattern[T, T] = {
Pattern[T, T](jPattern.notNext(name))
}
/**
* Appends a new pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
*
* @param name Name of the new pattern
* @return A new pattern which is appended to this one
*/
def followedBy(name: String): Pattern[T, T] = {
Pattern[T, T](jPattern.followedBy(name))
}
/**
* Appends a new pattern to the existing one. The new pattern enforces that there is no event
* matching this pattern between the preceding pattern and succeeding this one.
*
* NOTE: There has to be other pattern after this one.
*
* @param name Name of the new pattern
* @return A new pattern which is appended to this one
*/
def notFollowedBy(name : String): Pattern[T, T] = {
Pattern[T, T](jPattern.notFollowedBy(name))
}
/**
* Appends a new pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
*
* @param name Name of the new pattern
* @return A new pattern which is appended to this one
*/
def followedByAny(name: String): Pattern[T, T] = {
Pattern[T, T](jPattern.followedByAny(name))
}
/**
* Specifies that this pattern is optional for a final match of the pattern
* sequence to happen.
*
* @return The same pattern as optional.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def optional: Pattern[T, F] = {
jPattern.optional()
this
}
/**
* Specifies that this pattern can occur {{{one or more}}} times.
* This means at least one and at most infinite number of events can
* be matched to this pattern.
*
* If this quantifier is enabled for a
* pattern {{{A.oneOrMore().followedBy(B)}}} and a sequence of events
* {{{A1 A2 B}}} appears, this will generate patterns:
* {{{A1 B}}} and {{{A1 A2 B}}}. See also {{{allowCombinations()}}}.
*
* @return The same pattern with a [[Quantifier.looping()]] quantifier applied.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def oneOrMore: Pattern[T, F] = {
jPattern.oneOrMore()
this
}
/**
* Specifies that this pattern is greedy.
* This means as many events as possible will be matched to this pattern.
*
* @return The same pattern with { @link Quantifier#greedy} set to true.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def greedy: Pattern[T, F] = {
jPattern.greedy()
this
}
/**
* Specifies exact number of times that this pattern should be matched.
*
* @param times number of times matching event must appear
* @return The same pattern with number of times applied
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def times(times: Int): Pattern[T, F] = {
jPattern.times(times)
this
}
/**
* Specifies that the pattern can occur between from and to times.
*
* @param from number of times matching event must appear at least
* @param to number of times matching event must appear at most
* @return The same pattern with the number of times range applied
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def times(from: Int, to: Int): Pattern[T, F] = {
jPattern.times(from, to)
this
}
/**
* Specifies that this pattern can occur the specified times at least.
* This means at least the specified times and at most infinite number of events can
* be matched to this pattern.
*
* @return The same pattern with a { @link Quantifier#looping(ConsumingStrategy)} quantifier
* applied.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def timesOrMore(times: Int): Pattern[T, F] = {
jPattern.timesOrMore(times)
this
}
/**
* Applicable only to [[Quantifier.looping()]] and [[Quantifier.times()]] patterns,
* this option allows more flexibility to the matching events.
*
* If {{{allowCombinations()}}} is not applied for a
* pattern {{{A.oneOrMore().followedBy(B)}}} and a sequence of events
* {{{A1 A2 B}}} appears, this will generate patterns:
* {{{A1 B}}} and {{{A1 A2 B}}}. If this method is applied, we
* will have {{{A1 B}}}, {{{A2 B}}} and {{{A1 A2 B}}}.
*
* @return The same pattern with the updated quantifier.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def allowCombinations(): Pattern[T, F] = {
jPattern.allowCombinations()
this
}
/**
* Works in conjunction with [[Pattern#oneOrMore()]] or [[Pattern#times(int)]].
* Specifies that any not matching element breaks the loop.
*
* E.g. a pattern like:
* {{{
* Pattern.begin("start").where(_.getName().equals("c"))
* .followedBy("middle").where(_.getName().equals("a")).oneOrMore().consecutive()
* .followedBy("end1").where(_.getName().equals("b"));
* }}}
*
* For a sequence: C D A1 A2 A3 D A4 B
*
* will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
*
* By default a relaxed continuity is applied.
* @return pattern with continuity changed to strict
*/
def consecutive(): Pattern[T, F] = {
jPattern.consecutive()
this
}
/**
* Appends a new group pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
*
* @param pattern the pattern to append
* @return A new pattern which is appended to this one
*/
def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](jPattern.followedBy(pattern.wrappedPattern))
/**
* Appends a new group pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
*
* @param pattern the pattern to append
* @return A new pattern which is appended to this one
*/
def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](jPattern.followedByAny(pattern.wrappedPattern))
/**
* Appends a new group pattern to the existing one. The new pattern enforces strict
* temporal contiguity. This means that the whole pattern sequence matches only
* if an event which matches this pattern directly follows the preceding matching
* event. Thus, there cannot be any events in between two matching events.
*
* @param pattern the pattern to append
* @return A new pattern which is appended to this one
*/
def next(pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](jPattern.next(pattern.wrappedPattern))
/**
* Get after match skip strategy.
* @return current after match skip strategy
*/
def getAfterMatchSkipStrategy: AfterMatchSkipStrategy =
jPattern.getAfterMatchSkipStrategy
}
object Pattern {
/**
* Constructs a new Pattern by wrapping a given Java API Pattern
*
* @param jPattern Underlying Java API Pattern.
* @tparam T Base type of the elements appearing in the pattern
* @tparam F Subtype of T to which the current pattern operator is constrained
* @return New wrapping Pattern object
*/
def apply[T, F <: T](jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern)
/**
* Starts a new pattern sequence. The provided name is the one of the initial pattern
* of the new sequence. Furthermore, the base type of the event sequence is set.
*
* @param name The name of starting pattern of the new pattern sequence
* @tparam X Base type of the event pattern
* @return The first pattern of a pattern sequence
*/
def begin[X](name: String): Pattern[X, X] = Pattern(JPattern.begin(name))
/**
* Starts a new pattern sequence. The provided name is the one of the initial pattern
* of the new sequence. Furthermore, the base type of the event sequence is set.
*
* @param name The name of starting pattern of the new pattern sequence
* @param afterMatchSkipStrategy The skip strategy to use after each match
* @tparam X Base type of the event pattern
* @return The first pattern of a pattern sequence
*/
def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X] =
Pattern(JPattern.begin(name, afterMatchSkipStrategy))
/**
* Starts a new pattern sequence. The provided pattern is the initial pattern
* of the new sequence.
*
* @param pattern the pattern to begin with
* @return the first pattern of a pattern sequence
*/
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](JPattern.begin(pattern.wrappedPattern))
/**
* Starts a new pattern sequence. The provided pattern is the initial pattern
* of the new sequence.
*
* @param pattern the pattern to begin with
* @param afterMatchSkipStrategy The skip strategy to use after each match
* @return The first pattern of a pattern sequence
*/
def begin[T, F <: T](pattern: Pattern[T, F],
afterMatchSkipStrategy: AfterMatchSkipStrategy): GroupPattern[T, F] =
GroupPattern(JPattern.begin(pattern.wrappedPattern, afterMatchSkipStrategy))
}