blob: 44f80afe9f75e4ebcdc5f9305140940789254198 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.scala
import java.util.{UUID, List => JList, Map => JMap}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.pattern.{Pattern => JPattern}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.streaming.api.scala.{asScalaStream, _}
import org.apache.flink.util.Collector
import org.apache.flink.cep.operator.CEPOperatorUtils
import org.apache.flink.cep.scala.pattern.Pattern
import scala.collection.Map
/**
* Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
* pattern sequences as a map of events associated with their names. The pattern is detected using
* a [[org.apache.flink.cep.nfa.NFA]]. In order to process the detected sequences, the user has to
* specify a [[PatternSelectFunction]] or a [[PatternFlatSelectFunction]].
*
* @param jPatternStream Underlying pattern stream from Java API
* @tparam T Type of the events
*/
class PatternStream[T](jPatternStream: JPatternStream[T]) {
private[flink] def wrappedPatternStream = jPatternStream
def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream)
def getComparator: EventComparator[T] = jPatternStream.getComparator
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided [[PatternSelectFunction]] is called. The pattern select function can produce
* exactly one resulting element.
*
* @param patternSelectFunction The pattern select function which is called for each detected
* pattern sequence.
* @tparam R Type of the resulting elements
* @return [[DataStream]] which contains the resulting elements from the pattern select function.
*/
def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R])
: DataStream[R] = {
asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]]))
}
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided [[PatternSelectFunction]] is called. The pattern select function can produce
* exactly one resulting element.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
* timeout function has to produce exactly one resulting timeout event.
*
* The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
*
* @param patternTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternSelectFunction The pattern select function which is called for each detected
* pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @deprecated Use the version that returns timeouted events as a side-output
* @return Data stream of either type which contains the resulting events and resulting timeout
* events.
*/
@deprecated
def select[L: TypeInformation, R: TypeInformation](
patternTimeoutFunction: PatternTimeoutFunction[T, L],
patternSelectFunction: PatternSelectFunction[T, R])
: DataStream[Either[L, R]] = {
val outputTag = OutputTag[L](UUID.randomUUID().toString)
val mainStream = select(outputTag, patternTimeoutFunction, patternSelectFunction)
mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => Right(r), l => Left(l))
}
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided [[PatternSelectFunction]] is called. The pattern select function can produce
* exactly one resulting element.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
* timeout function has to produce exactly one resulting timeout event.
*
* You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the
* [[DataStream]] resulting from the windowed operation with the same [[OutputTag]].
*
* @param outputTag [[OutputTag]] that identifies side output with timeouted patterns
* @param patternTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternSelectFunction The pattern select function which is called for each detected
* pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @return Data stream which contains the resulting elements with the resulting timeout elements
* in a side output.
*/
def select[L: TypeInformation, R: TypeInformation](
outputTag: OutputTag[L],
patternTimeoutFunction: PatternTimeoutFunction[T, L],
patternSelectFunction: PatternSelectFunction[T, R])
: DataStream[R] = {
val cleanedSelect = cleanClosure(patternSelectFunction)
val cleanedTimeout = cleanClosure(patternTimeoutFunction)
asScalaStream(
jPatternStream
.select(outputTag, cleanedTimeout, implicitly[TypeInformation[R]], cleanedSelect)
)
}
/**
* Applies a flat select function to the detected pattern sequence. For each pattern sequence
* the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
* produce an arbitrary number of resulting elements.
*
* @param patternFlatSelectFunction The pattern flat select function which is called for each
* detected pattern sequence.
* @tparam R Type of the resulting elements
* @return [[DataStream]] which contains the resulting elements from the pattern flat select
* function.
*/
def flatSelect[R: TypeInformation](patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
: DataStream[R] = {
asScalaStream(jPatternStream
.flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
}
/**
* Applies a flat select function to the detected pattern sequence. For each pattern sequence
* the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
* produce an arbitrary number of resulting elements.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
* pattern timeout function can produce an arbitrary number of resulting timeout events.
*
* The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
*
* @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
* partially matched pattern sequence which has timed out.
* @param patternFlatSelectFunction The pattern flat select function which is called for each
* detected pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @deprecated Use the version that returns timeouted events as a side-output
* @return Data stream of either type which contains the resulting events and the resulting
* timeout events wrapped in a [[Either]] type.
*/
@deprecated
def flatSelect[L: TypeInformation, R: TypeInformation](
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
: DataStream[Either[L, R]] = {
val outputTag = OutputTag[L]("dummy-timeouted")
val mainStream = flatSelect(outputTag, patternFlatTimeoutFunction, patternFlatSelectFunction)
mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => Right(r), l => Left(l))
}
/**
* Applies a flat select function to the detected pattern sequence. For each pattern sequence
* the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
* produce an arbitrary number of resulting elements.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
* pattern timeout function can produce an arbitrary number of resulting timeout events.
*
* You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the
* [[DataStream]] resulting from the windowed operation with the same [[OutputTag]].
*
* @param outputTag [[OutputTag]] that identifies side output with timeouted patterns
* @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
* partially matched pattern sequence which has timed out.
* @param patternFlatSelectFunction The pattern flat select function which is called for each
* detected pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @return Data stream which contains the resulting elements with the resulting timeout elements
* in a side output.
*/
def flatSelect[L: TypeInformation, R: TypeInformation](
outputTag: OutputTag[L],
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
: DataStream[R] = {
val cleanedSelect = cleanClosure(patternFlatSelectFunction)
val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction)
asScalaStream(
jPatternStream.flatSelect(
outputTag,
cleanedTimeout,
implicitly[TypeInformation[R]],
cleanedSelect))
}
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided [[PatternSelectFunction]] is called. The pattern select function can produce exactly
* one resulting element.
*
* @param patternSelectFun The pattern select function which is called for each detected
* pattern sequence.
* @tparam R Type of the resulting elements
* @return [[DataStream]] which contains the resulting elements from the pattern select function.
*/
def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R] = {
val cleanFun = cleanClosure(patternSelectFun)
val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] {
def select(in: JMap[String, JList[T]]): R = cleanFun(mapToScala(in))
}
select(patternSelectFunction)
}
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided [[PatternSelectFunction]] is called. The pattern select function can produce
* exactly one resulting element.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
* timeout function has to produce exactly one resulting element.
*
* The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
*
* @param patternTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternSelectFunction The pattern select function which is called for each detected
* pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @deprecated Use the version that returns timeouted events as a side-output
* @return Data stream of either type which contain the resulting events and resulting timeout
* events.
*/
@deprecated
def select[L: TypeInformation, R: TypeInformation](
patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) (
patternSelectFunction: Map[String, Iterable[T]] => R)
: DataStream[Either[L, R]] = {
val cleanSelectFun = cleanClosure(patternSelectFunction)
val cleanTimeoutFun = cleanClosure(patternTimeoutFunction)
val patternSelectFun = new PatternSelectFunction[T, R] {
override def select(pattern: JMap[String, JList[T]]): R =
cleanSelectFun(mapToScala(pattern))
}
val patternTimeoutFun = new PatternTimeoutFunction[T, L] {
override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L =
cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp)
}
select(patternTimeoutFun, patternSelectFun)
}
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided [[PatternSelectFunction]] is called. The pattern select function can produce
* exactly one resulting element.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
* timeout function has to produce exactly one resulting element.
*
* You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the
* [[DataStream]] resulting from the windowed operation with the same [[OutputTag]].
*
* @param outputTag [[OutputTag]] that identifies side output with timeouted patterns
* @param patternTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternSelectFunction The pattern select function which is called for each detected
* pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @return Data stream of either type which contain the resulting events and resulting timeout
* events.
*/
def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) (
patternSelectFunction: Map[String, Iterable[T]] => R)
: DataStream[R] = {
val cleanSelectFun = cleanClosure(patternSelectFunction)
val cleanTimeoutFun = cleanClosure(patternTimeoutFunction)
val patternSelectFun = new PatternSelectFunction[T, R] {
override def select(pattern: JMap[String, JList[T]]): R =
cleanSelectFun(mapToScala(pattern))
}
val patternTimeoutFun = new PatternTimeoutFunction[T, L] {
override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L =
cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp)
}
select(outputTag, patternTimeoutFun, patternSelectFun)
}
/**
* Applies a flat select function to the detected pattern sequence. For each pattern sequence
* the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function
* can produce an arbitrary number of resulting elements.
*
* @param patternFlatSelectFun The pattern flat select function which is called for each
* detected pattern sequence.
* @tparam R Type of the resulting elements
* @return [[DataStream]] which contains the resulting elements from the pattern flat select
* function.
*/
def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]],
Collector[R]) => Unit): DataStream[R] = {
val cleanFun = cleanClosure(patternFlatSelectFun)
val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] =
new PatternFlatSelectFunction[T, R] {
def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit =
cleanFun(mapToScala(pattern), out)
}
flatSelect(patternFlatSelectFunction)
}
/**
* Applies a flat select function to the detected pattern sequence. For each pattern sequence
* the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
* produce an arbitrary number of resulting elements.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
* pattern timeout function can produce an arbitrary number of resulting timeout events.
*
* The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
*
* @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
* partially matched pattern sequence which has timed out.
* @param patternFlatSelectFunction The pattern flat select function which is called for each
* detected pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @deprecated Use the version that returns timeouted events as a side-output
* @return Data stream of either type which contains the resulting events and the resulting
* timeout events wrapped in a [[Either]] type.
*/
@deprecated
def flatSelect[L: TypeInformation, R: TypeInformation](
patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit) (
patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit)
: DataStream[Either[L, R]] = {
val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit =
cleanSelectFun(mapToScala(pattern), out)
}
val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
override def timeout(
pattern: JMap[String, JList[T]],
timeoutTimestamp: Long, out: Collector[L])
: Unit = {
cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out)
}
}
flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
}
/**
* Applies a flat select function to the detected pattern sequence. For each pattern sequence
* the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
* produce an arbitrary number of resulting elements.
*
* Additionally a timeout function is applied to partial event patterns which have timed out. For
* each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
* pattern timeout function can produce an arbitrary number of resulting timeout events.
*
* You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the
* [[DataStream]] resulting from the windowed operation with the same [[OutputTag]].
*
* @param outputTag [[OutputTag]] that identifies side output with timeouted patterns
* @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
* partially matched pattern sequence which has timed out.
* @param patternFlatSelectFunction The pattern flat select function which is called for each
* detected pattern sequence.
* @tparam L Type of the resulting timeout event
* @tparam R Type of the resulting event
* @return Data stream of either type which contains the resulting events and the resulting
* timeout events wrapped in a [[Either]] type.
*/
def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit) (
patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit)
: DataStream[R] = {
val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit =
cleanSelectFun(mapToScala(pattern), out)
}
val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
override def timeout(
pattern: JMap[String, JList[T]],
timeoutTimestamp: Long, out: Collector[L])
: Unit = {
cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out)
}
}
flatSelect(outputTag, patternFlatTimeoutFun, patternFlatSelectFun)
}
def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = {
jPatternStream.sideOutputLateData(lateDataOutputTag)
this
}
}
object PatternStream {
/**
*
* @param jPatternStream Underlying pattern stream from Java API
* @tparam T Type of the events
* @return A new pattern stream wrapping the pattern stream from Java APU
*/
def apply[T](jPatternStream: JPatternStream[T]): PatternStream[T] = {
new PatternStream[T](jPatternStream)
}
}