| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.flink.cep.scala |
| |
| import java.util.{UUID, List => JList, Map => JMap} |
| |
| import org.apache.flink.api.common.typeinfo.TypeInformation |
| import org.apache.flink.cep.pattern.{Pattern => JPattern} |
| import org.apache.flink.cep.scala.pattern.Pattern |
| import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream} |
| import org.apache.flink.streaming.api.scala.{asScalaStream, _} |
| import org.apache.flink.util.Collector |
| |
| import org.apache.flink.cep.operator.CEPOperatorUtils |
| import org.apache.flink.cep.scala.pattern.Pattern |
| import scala.collection.Map |
| |
| /** |
| * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected |
| * pattern sequences as a map of events associated with their names. The pattern is detected using |
| * a [[org.apache.flink.cep.nfa.NFA]]. In order to process the detected sequences, the user has to |
| * specify a [[PatternSelectFunction]] or a [[PatternFlatSelectFunction]]. |
| * |
| * @param jPatternStream Underlying pattern stream from Java API |
| * @tparam T Type of the events |
| */ |
| class PatternStream[T](jPatternStream: JPatternStream[T]) { |
| |
| private[flink] def wrappedPatternStream = jPatternStream |
| |
| def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]]) |
| |
| def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream) |
| |
| def getComparator: EventComparator[T] = jPatternStream.getComparator |
| |
| /** |
| * Applies a select function to the detected pattern sequence. For each pattern sequence the |
| * provided [[PatternSelectFunction]] is called. The pattern select function can produce |
| * exactly one resulting element. |
| * |
| * @param patternSelectFunction The pattern select function which is called for each detected |
| * pattern sequence. |
| * @tparam R Type of the resulting elements |
| * @return [[DataStream]] which contains the resulting elements from the pattern select function. |
| */ |
| def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R]) |
| : DataStream[R] = { |
| asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]])) |
| } |
| |
| /** |
| * Applies a select function to the detected pattern sequence. For each pattern sequence the |
| * provided [[PatternSelectFunction]] is called. The pattern select function can produce |
| * exactly one resulting element. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern |
| * timeout function has to produce exactly one resulting timeout event. |
| * |
| * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance. |
| * |
| * @param patternTimeoutFunction The pattern timeout function which is called for each partial |
| * pattern sequence which has timed out. |
| * @param patternSelectFunction The pattern select function which is called for each detected |
| * pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @deprecated Use the version that returns timeouted events as a side-output |
| * @return Data stream of either type which contains the resulting events and resulting timeout |
| * events. |
| */ |
| @deprecated |
| def select[L: TypeInformation, R: TypeInformation]( |
| patternTimeoutFunction: PatternTimeoutFunction[T, L], |
| patternSelectFunction: PatternSelectFunction[T, R]) |
| : DataStream[Either[L, R]] = { |
| val outputTag = OutputTag[L](UUID.randomUUID().toString) |
| val mainStream = select(outputTag, patternTimeoutFunction, patternSelectFunction) |
| mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => Right(r), l => Left(l)) |
| } |
| |
| /** |
| * Applies a select function to the detected pattern sequence. For each pattern sequence the |
| * provided [[PatternSelectFunction]] is called. The pattern select function can produce |
| * exactly one resulting element. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern |
| * timeout function has to produce exactly one resulting timeout event. |
| * |
| * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the |
| * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. |
| * |
| * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns |
| * @param patternTimeoutFunction The pattern timeout function which is called for each partial |
| * pattern sequence which has timed out. |
| * @param patternSelectFunction The pattern select function which is called for each detected |
| * pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @return Data stream which contains the resulting elements with the resulting timeout elements |
| * in a side output. |
| */ |
| def select[L: TypeInformation, R: TypeInformation]( |
| outputTag: OutputTag[L], |
| patternTimeoutFunction: PatternTimeoutFunction[T, L], |
| patternSelectFunction: PatternSelectFunction[T, R]) |
| : DataStream[R] = { |
| val cleanedSelect = cleanClosure(patternSelectFunction) |
| val cleanedTimeout = cleanClosure(patternTimeoutFunction) |
| |
| asScalaStream( |
| jPatternStream |
| .select(outputTag, cleanedTimeout, implicitly[TypeInformation[R]], cleanedSelect) |
| ) |
| } |
| |
| /** |
| * Applies a flat select function to the detected pattern sequence. For each pattern sequence |
| * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can |
| * produce an arbitrary number of resulting elements. |
| * |
| * @param patternFlatSelectFunction The pattern flat select function which is called for each |
| * detected pattern sequence. |
| * @tparam R Type of the resulting elements |
| * @return [[DataStream]] which contains the resulting elements from the pattern flat select |
| * function. |
| */ |
| def flatSelect[R: TypeInformation](patternFlatSelectFunction: PatternFlatSelectFunction[T, R]) |
| : DataStream[R] = { |
| asScalaStream(jPatternStream |
| .flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]])) |
| } |
| |
| /** |
| * Applies a flat select function to the detected pattern sequence. For each pattern sequence |
| * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can |
| * produce an arbitrary number of resulting elements. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The |
| * pattern timeout function can produce an arbitrary number of resulting timeout events. |
| * |
| * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance. |
| * |
| * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each |
| * partially matched pattern sequence which has timed out. |
| * @param patternFlatSelectFunction The pattern flat select function which is called for each |
| * detected pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @deprecated Use the version that returns timeouted events as a side-output |
| * @return Data stream of either type which contains the resulting events and the resulting |
| * timeout events wrapped in a [[Either]] type. |
| */ |
| @deprecated |
| def flatSelect[L: TypeInformation, R: TypeInformation]( |
| patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L], |
| patternFlatSelectFunction: PatternFlatSelectFunction[T, R]) |
| : DataStream[Either[L, R]] = { |
| |
| val outputTag = OutputTag[L]("dummy-timeouted") |
| val mainStream = flatSelect(outputTag, patternFlatTimeoutFunction, patternFlatSelectFunction) |
| mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => Right(r), l => Left(l)) |
| } |
| |
| /** |
| * Applies a flat select function to the detected pattern sequence. For each pattern sequence |
| * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can |
| * produce an arbitrary number of resulting elements. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The |
| * pattern timeout function can produce an arbitrary number of resulting timeout events. |
| * |
| * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the |
| * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. |
| * |
| * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns |
| * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each |
| * partially matched pattern sequence which has timed out. |
| * @param patternFlatSelectFunction The pattern flat select function which is called for each |
| * detected pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @return Data stream which contains the resulting elements with the resulting timeout elements |
| * in a side output. |
| */ |
| def flatSelect[L: TypeInformation, R: TypeInformation]( |
| outputTag: OutputTag[L], |
| patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L], |
| patternFlatSelectFunction: PatternFlatSelectFunction[T, R]) |
| : DataStream[R] = { |
| |
| val cleanedSelect = cleanClosure(patternFlatSelectFunction) |
| val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction) |
| |
| asScalaStream( |
| jPatternStream.flatSelect( |
| outputTag, |
| cleanedTimeout, |
| implicitly[TypeInformation[R]], |
| cleanedSelect)) |
| } |
| |
| /** |
| * Applies a select function to the detected pattern sequence. For each pattern sequence the |
| * provided [[PatternSelectFunction]] is called. The pattern select function can produce exactly |
| * one resulting element. |
| * |
| * @param patternSelectFun The pattern select function which is called for each detected |
| * pattern sequence. |
| * @tparam R Type of the resulting elements |
| * @return [[DataStream]] which contains the resulting elements from the pattern select function. |
| */ |
| def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R] = { |
| val cleanFun = cleanClosure(patternSelectFun) |
| |
| val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] { |
| |
| def select(in: JMap[String, JList[T]]): R = cleanFun(mapToScala(in)) |
| } |
| select(patternSelectFunction) |
| } |
| |
| /** |
| * Applies a select function to the detected pattern sequence. For each pattern sequence the |
| * provided [[PatternSelectFunction]] is called. The pattern select function can produce |
| * exactly one resulting element. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern |
| * timeout function has to produce exactly one resulting element. |
| * |
| * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance. |
| * |
| * @param patternTimeoutFunction The pattern timeout function which is called for each partial |
| * pattern sequence which has timed out. |
| * @param patternSelectFunction The pattern select function which is called for each detected |
| * pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @deprecated Use the version that returns timeouted events as a side-output |
| * @return Data stream of either type which contain the resulting events and resulting timeout |
| * events. |
| */ |
| @deprecated |
| def select[L: TypeInformation, R: TypeInformation]( |
| patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) ( |
| patternSelectFunction: Map[String, Iterable[T]] => R) |
| : DataStream[Either[L, R]] = { |
| |
| val cleanSelectFun = cleanClosure(patternSelectFunction) |
| val cleanTimeoutFun = cleanClosure(patternTimeoutFunction) |
| |
| val patternSelectFun = new PatternSelectFunction[T, R] { |
| override def select(pattern: JMap[String, JList[T]]): R = |
| cleanSelectFun(mapToScala(pattern)) |
| } |
| val patternTimeoutFun = new PatternTimeoutFunction[T, L] { |
| override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L = |
| cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp) |
| } |
| |
| select(patternTimeoutFun, patternSelectFun) |
| } |
| |
| /** |
| * Applies a select function to the detected pattern sequence. For each pattern sequence the |
| * provided [[PatternSelectFunction]] is called. The pattern select function can produce |
| * exactly one resulting element. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern |
| * timeout function has to produce exactly one resulting element. |
| * |
| * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the |
| * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. |
| * |
| * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns |
| * @param patternTimeoutFunction The pattern timeout function which is called for each partial |
| * pattern sequence which has timed out. |
| * @param patternSelectFunction The pattern select function which is called for each detected |
| * pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @return Data stream of either type which contain the resulting events and resulting timeout |
| * events. |
| */ |
| def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])( |
| patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) ( |
| patternSelectFunction: Map[String, Iterable[T]] => R) |
| : DataStream[R] = { |
| |
| val cleanSelectFun = cleanClosure(patternSelectFunction) |
| val cleanTimeoutFun = cleanClosure(patternTimeoutFunction) |
| |
| val patternSelectFun = new PatternSelectFunction[T, R] { |
| override def select(pattern: JMap[String, JList[T]]): R = |
| cleanSelectFun(mapToScala(pattern)) |
| } |
| val patternTimeoutFun = new PatternTimeoutFunction[T, L] { |
| override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L = |
| cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp) |
| } |
| |
| select(outputTag, patternTimeoutFun, patternSelectFun) |
| } |
| |
| /** |
| * Applies a flat select function to the detected pattern sequence. For each pattern sequence |
| * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function |
| * can produce an arbitrary number of resulting elements. |
| * |
| * @param patternFlatSelectFun The pattern flat select function which is called for each |
| * detected pattern sequence. |
| * @tparam R Type of the resulting elements |
| * @return [[DataStream]] which contains the resulting elements from the pattern flat select |
| * function. |
| */ |
| def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]], |
| Collector[R]) => Unit): DataStream[R] = { |
| val cleanFun = cleanClosure(patternFlatSelectFun) |
| |
| val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] = |
| new PatternFlatSelectFunction[T, R] { |
| |
| def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit = |
| cleanFun(mapToScala(pattern), out) |
| } |
| flatSelect(patternFlatSelectFunction) |
| } |
| |
| /** |
| * Applies a flat select function to the detected pattern sequence. For each pattern sequence |
| * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can |
| * produce an arbitrary number of resulting elements. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The |
| * pattern timeout function can produce an arbitrary number of resulting timeout events. |
| * |
| * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance. |
| * |
| * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each |
| * partially matched pattern sequence which has timed out. |
| * @param patternFlatSelectFunction The pattern flat select function which is called for each |
| * detected pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @deprecated Use the version that returns timeouted events as a side-output |
| * @return Data stream of either type which contains the resulting events and the resulting |
| * timeout events wrapped in a [[Either]] type. |
| */ |
| @deprecated |
| def flatSelect[L: TypeInformation, R: TypeInformation]( |
| patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit) ( |
| patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit) |
| : DataStream[Either[L, R]] = { |
| |
| val cleanSelectFun = cleanClosure(patternFlatSelectFunction) |
| val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction) |
| |
| val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] { |
| override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit = |
| cleanSelectFun(mapToScala(pattern), out) |
| } |
| |
| val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] { |
| override def timeout( |
| pattern: JMap[String, JList[T]], |
| timeoutTimestamp: Long, out: Collector[L]) |
| : Unit = { |
| cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out) |
| } |
| } |
| |
| flatSelect(patternFlatTimeoutFun, patternFlatSelectFun) |
| } |
| |
| /** |
| * Applies a flat select function to the detected pattern sequence. For each pattern sequence |
| * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can |
| * produce an arbitrary number of resulting elements. |
| * |
| * Additionally a timeout function is applied to partial event patterns which have timed out. For |
| * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The |
| * pattern timeout function can produce an arbitrary number of resulting timeout events. |
| * |
| * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the |
| * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. |
| * |
| * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns |
| * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each |
| * partially matched pattern sequence which has timed out. |
| * @param patternFlatSelectFunction The pattern flat select function which is called for each |
| * detected pattern sequence. |
| * @tparam L Type of the resulting timeout event |
| * @tparam R Type of the resulting event |
| * @return Data stream of either type which contains the resulting events and the resulting |
| * timeout events wrapped in a [[Either]] type. |
| */ |
| def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])( |
| patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit) ( |
| patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit) |
| : DataStream[R] = { |
| |
| val cleanSelectFun = cleanClosure(patternFlatSelectFunction) |
| val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction) |
| |
| val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] { |
| override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit = |
| cleanSelectFun(mapToScala(pattern), out) |
| } |
| |
| val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] { |
| override def timeout( |
| pattern: JMap[String, JList[T]], |
| timeoutTimestamp: Long, out: Collector[L]) |
| : Unit = { |
| cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out) |
| } |
| } |
| |
| flatSelect(outputTag, patternFlatTimeoutFun, patternFlatSelectFun) |
| } |
| |
| def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = { |
| jPatternStream.sideOutputLateData(lateDataOutputTag) |
| this |
| } |
| } |
| |
| object PatternStream { |
| /** |
| * |
| * @param jPatternStream Underlying pattern stream from Java API |
| * @tparam T Type of the events |
| * @return A new pattern stream wrapping the pattern stream from Java APU |
| */ |
| def apply[T](jPatternStream: JPatternStream[T]): PatternStream[T] = { |
| new PatternStream[T](jPatternStream) |
| } |
| } |