blob: 357fbc78b10c2ee2c9909780421952bf7699f81c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.cep.operator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFA.MigratedNFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.time.TimerService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
* CEP pattern operator for a keyed input stream. For each key, the operator creates
* a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
* stored using the managed keyed state.
* @param <IN> Type of the input elements
* @param <KEY> Type of the key on which the input stream is keyed
* @param <OUT> Type of the output elements
public class CepOperator<IN, KEY, OUT>
extends AbstractUdfStreamOperator<OUT, PatternProcessFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace> {
private static final long serialVersionUID = -4166778210774160757L;
private final boolean isProcessingTime;
private final TypeSerializer<IN> inputSerializer;
/////////////// State //////////////
private static final String NFA_STATE_NAME = "nfaStateName";
private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
private final NFACompiler.NFAFactory<IN> nfaFactory;
private transient ValueState<NFAState> computationStates;
private transient MapState<Long, List<IN>> elementQueueState;
private transient SharedBuffer<IN> partialMatches;
private transient InternalTimerService<VoidNamespace> timerService;
private transient NFA<IN> nfa;
* The last seen watermark. This will be used to
* decide if an incoming element is late or not.
private long lastWatermark;
/** Comparator for secondary sorting. Primary sorting is always done on time. */
private final EventComparator<IN> comparator;
* {@link OutputTag} to use for late arriving events. Elements with timestamp smaller than
* the current watermark will be emitted to this.
private final OutputTag<IN> lateDataOutputTag;
/** Strategy which element to skip after a match was found. */
private final AfterMatchSkipStrategy afterMatchSkipStrategy;
/** Context passed to user function. */
private transient ContextFunctionImpl context;
/** Main output collector, that sets a proper timestamp to the StreamRecord. */
private transient TimestampedCollector<OUT> collector;
/** Wrapped RuntimeContext that limits the underlying context features. */
private transient CepRuntimeContext cepRuntimeContext;
/** Thin context passed to NFA that gives access to time related characteristics. */
private transient TimerService cepTimerService;
public CepOperator(
final TypeSerializer<IN> inputSerializer,
final boolean isProcessingTime,
final NFACompiler.NFAFactory<IN> nfaFactory,
@Nullable final EventComparator<IN> comparator,
@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
final PatternProcessFunction<IN, OUT> function,
@Nullable final OutputTag<IN> lateDataOutputTag) {
this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
this.isProcessingTime = isProcessingTime;
this.comparator = comparator;
this.lateDataOutputTag = lateDataOutputTag;
if (afterMatchSkipStrategy == null) {
this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
} else {
this.afterMatchSkipStrategy = afterMatchSkipStrategy;
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
this.cepRuntimeContext = new CepRuntimeContext(getRuntimeContext());
FunctionUtils.setFunctionRuntimeContext(getUserFunction(), this.cepRuntimeContext);
public void initializeState(StateInitializationContext context) throws Exception {
// initializeState through the provided context
computationStates = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>(
partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), inputSerializer);
elementQueueState = context.getKeyedStateStore().getMapState(
new MapStateDescriptor<>(
new ListSerializer<>(inputSerializer)));
private void migrateOldState() throws Exception {
new ValueStateDescriptor<>(
new NFA.NFASerializer<>(inputSerializer)
new KeyedStateFunction<Object, ValueState<MigratedNFA<IN>>>() {
public void process(Object key, ValueState<MigratedNFA<IN>> state) throws Exception {
MigratedNFA<IN> oldState = state.value();
computationStates.update(new NFAState(oldState.getComputationStates()));
org.apache.flink.cep.nfa.SharedBuffer<IN> sharedBuffer = oldState.getSharedBuffer();
partialMatches.init(sharedBuffer.getEventsBuffer(), sharedBuffer.getPages());
public void open() throws Exception {;
timerService = getInternalTimerService(
nfa = nfaFactory.createNFA();, new Configuration());
context = new ContextFunctionImpl();
collector = new TimestampedCollector<>(output);
cepTimerService = new TimerServiceImpl();
public void close() throws Exception {
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
if (comparator == null) {
// there can be no out of order elements in processing time
NFAState nfaState = getNFAState();
long timestamp = getProcessingTimeService().getCurrentProcessingTime();
advanceTime(nfaState, timestamp);
processEvent(nfaState, element.getValue(), timestamp);
} else {
long currentTime = timerService.currentProcessingTime();
bufferEvent(element.getValue(), currentTime);
// register a timer for the next millisecond to sort and emit buffered data
timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);
} else {
long timestamp = element.getTimestamp();
IN value = element.getValue();
// In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than or equal with the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one.
if (timestamp > lastWatermark) {
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark.
bufferEvent(value, timestamp);
} else if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
* Registers a timer for {@code current watermark + 1}, this means that we get triggered
* whenever the watermark advances, which is what we want for working off the queue of
* buffered elements.
private void saveRegisterWatermarkTimer() {
long currentWatermark = timerService.currentWatermark();
// protect against overflow
if (currentWatermark + 1 > currentWatermark) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
private void bufferEvent(IN event, long currentTime) throws Exception {
List<IN> elementsForTimestamp = elementQueueState.get(currentTime);
if (elementsForTimestamp == null) {
elementsForTimestamp = new ArrayList<>();
if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed
} else {
elementQueueState.put(currentTime, elementsForTimestamp);
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in event time order and custom comparator if exists
// by feeding them in the NFA
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// 5) update the last seen watermark.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFAState nfaState = getNFAState();
// STEP 2
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
long timestamp = sortedTimestamps.poll();
advanceTime(nfaState, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
event -> {
try {
processEvent(nfaState, event, timestamp);
} catch (Exception e) {
throw new RuntimeException(e);
// STEP 3
advanceTime(nfaState, timerService.currentWatermark());
// STEP 4
if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
// STEP 5
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in process time order and custom comparator if exists
// by feeding them in the NFA
// 3) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFAState nfa = getNFAState();
// STEP 2
while (!sortedTimestamps.isEmpty()) {
long timestamp = sortedTimestamps.poll();
advanceTime(nfa, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
event -> {
try {
processEvent(nfa, event, timestamp);
} catch (Exception e) {
throw new RuntimeException(e);
// STEP 3
private Stream<IN> sort(Collection<IN> elements) {
Stream<IN> stream =;
return (comparator == null) ? stream : stream.sorted(comparator);
private void updateLastSeenWatermark(long timestamp) {
this.lastWatermark = timestamp;
private NFAState getNFAState() throws IOException {
NFAState nfaState = computationStates.value();
return nfaState != null ? nfaState : nfa.createInitialNFAState();
private void updateNFA(NFAState nfaState) throws IOException {
if (nfaState.isStateChanged()) {
private PriorityQueue<Long> getSortedTimestamps() throws Exception {
PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
for (Long timestamp : elementQueueState.keys()) {
return sortedTimestamps;
* Process the given event by giving it to the NFA and outputting the produced set of matched
* event sequences.
* @param nfaState Our NFAState object
* @param event The current event to be processed
* @param timestamp The timestamp of the event
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
Collection<Map<String, List<IN>>> patterns =
nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
processMatchedSequences(patterns, timestamp);
* Advances the time for the given NFA to the given timestamp. This means that no more events with timestamp
* <b>lower</b> than the given timestamp should be passed to the nfa, This can lead to pruning and timeouts.
private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
if (!timedOut.isEmpty()) {
private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
PatternProcessFunction<IN, OUT> function = getUserFunction();
for (Map<String, List<IN>> matchingSequence : matchingSequences) {
function.processMatch(matchingSequence, context, collector);
private void processTimedOutSequences(Collection<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences) throws Exception {
PatternProcessFunction<IN, OUT> function = getUserFunction();
if (function instanceof TimedOutPartialMatchHandler) {
TimedOutPartialMatchHandler<IN> timeoutHandler = (TimedOutPartialMatchHandler<IN>) function;
for (Tuple2<Map<String, List<IN>>, Long> matchingSequence : timedOutSequences) {
timeoutHandler.processTimedOutMatch(matchingSequence.f0, context);
private void setTimestamp(long timestamp) {
if (!isProcessingTime) {
* Gives {@link NFA} access to {@link InternalTimerService} and tells if {@link CepOperator} works in
* processing time. Should be instantiated once per operator.
private class TimerServiceImpl implements TimerService {
public long currentProcessingTime() {
return timerService.currentProcessingTime();
* Implementation of {@link PatternProcessFunction.Context}. Design to be instantiated once per operator.
* It serves three methods:
* <ul>
* <li>gives access to currentProcessingTime through {@link InternalTimerService}</li>
* <li>gives access to timestamp of current record (or null if Processing time)</li>
* <li>enables side outputs with proper timestamp of StreamRecord handling based on either Processing or
* Event time</li>
* </ul>
private class ContextFunctionImpl implements PatternProcessFunction.Context {
private Long timestamp;
public <X> void output(final OutputTag<X> outputTag, final X value) {
final StreamRecord<X> record;
if (isProcessingTime) {
record = new StreamRecord<>(value);
} else {
record = new StreamRecord<>(value, timestamp());
output.collect(outputTag, record);
void setTimestamp(long timestamp) {
this.timestamp = timestamp;
public long timestamp() {
return timestamp;
public long currentProcessingTime() {
return timerService.currentProcessingTime();
////////////////////// Testing Methods //////////////////////
boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
return !partialMatches.isEmpty();
boolean hasNonEmptyPQ(KEY key) throws Exception {
return elementQueueState.keys().iterator().hasNext();
int getPQSize(KEY key) throws Exception {
int counter = 0;
for (List<IN> elements: elementQueueState.values()) {
counter += elements.size();
return counter;