| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.cep.operator; |
| |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.common.functions.Function; |
| import org.apache.flink.api.common.functions.util.FunctionUtils; |
| import org.apache.flink.api.common.state.ValueState; |
| import org.apache.flink.api.common.state.ValueStateDescriptor; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.common.typeutils.base.ListSerializer; |
| import org.apache.flink.api.common.typeutils.base.LongSerializer; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.cep.EventComparator; |
| import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; |
| import org.apache.flink.cep.nfa.NFA; |
| import org.apache.flink.cep.nfa.NFA.MigratedNFA; |
| import org.apache.flink.cep.nfa.NFAState; |
| import org.apache.flink.cep.nfa.NFAStateSerializer; |
| import org.apache.flink.cep.nfa.State; |
| import org.apache.flink.cep.nfa.StateTransition; |
| import org.apache.flink.cep.nfa.compiler.NFACompiler; |
| import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer; |
| import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor; |
| import org.apache.flink.cep.pattern.conditions.IterativeCondition; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.runtime.state.KeyedStateFunction; |
| import org.apache.flink.runtime.state.StateInitializationContext; |
| import org.apache.flink.runtime.state.VoidNamespace; |
| import org.apache.flink.runtime.state.VoidNamespaceSerializer; |
| import org.apache.flink.runtime.state.context.ContextKeyedState; |
| import org.apache.flink.runtime.state.keyed.KeyedMapState; |
| import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor; |
| import org.apache.flink.runtime.state.keyed.KeyedValueState; |
| import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor; |
| import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; |
| import org.apache.flink.streaming.api.operators.InternalTimer; |
| import org.apache.flink.streaming.api.operators.InternalTimerService; |
| import org.apache.flink.streaming.api.operators.OneInputStreamOperator; |
| import org.apache.flink.streaming.api.operators.Triggerable; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.OutputTag; |
| import org.apache.flink.util.Preconditions; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.PriorityQueue; |
| import java.util.stream.Stream; |
| |
| /** |
| * Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates |
| * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are |
| * stored using the managed keyed state. Additionally, the set of all seen keys is kept as part of the |
| * operator state. This is necessary to trigger the execution for all keys upon receiving a new |
| * watermark. |
| * |
| * @param <IN> Type of the input elements |
| * @param <KEY> Type of the key on which the input stream is keyed |
| * @param <OUT> Type of the output elements |
| */ |
| public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function> |
| extends AbstractUdfStreamOperator<OUT, F> |
| implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace> { |
| |
| private static final long serialVersionUID = -4166778210774160757L; |
| |
| private final boolean isProcessingTime; |
| |
| private final TypeSerializer<IN> inputSerializer; |
| |
| /////////////// State ////////////// |
| |
| private static final String NFA_STATE_NAME = "nfaStateName"; |
| private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName"; |
| |
| private final NFACompiler.NFAFactory<IN> nfaFactory; |
| |
| private transient KeyedValueState<KEY, NFAState> computationStates; |
| private transient KeyedMapState<KEY, Long, List<IN>> elementQueueState; |
| private transient SharedBuffer<IN> partialMatches; |
| |
| private transient InternalTimerService<VoidNamespace> timerService; |
| |
| private transient NFA<IN> nfa; |
| |
| /** |
| * The last seen watermark. This will be used to |
| * decide if an incoming element is late or not. |
| */ |
| private long lastWatermark; |
| |
| private final EventComparator<IN> comparator; |
| |
| /** |
| * {@link OutputTag} to use for late arriving events. Elements with timestamp smaller than |
| * the current watermark will be emitted to this. |
| */ |
| protected final OutputTag<IN> lateDataOutputTag; |
| |
| protected final AfterMatchSkipStrategy afterMatchSkipStrategy; |
| |
| public AbstractKeyedCEPPatternOperator( |
| final TypeSerializer<IN> inputSerializer, |
| final boolean isProcessingTime, |
| final NFACompiler.NFAFactory<IN> nfaFactory, |
| final EventComparator<IN> comparator, |
| final AfterMatchSkipStrategy afterMatchSkipStrategy, |
| final F function, |
| final OutputTag<IN> lateDataOutputTag) { |
| super(function); |
| |
| this.inputSerializer = Preconditions.checkNotNull(inputSerializer); |
| this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime); |
| this.nfaFactory = Preconditions.checkNotNull(nfaFactory); |
| this.comparator = comparator; |
| this.lateDataOutputTag = lateDataOutputTag; |
| |
| if (afterMatchSkipStrategy == null) { |
| this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip(); |
| } else { |
| this.afterMatchSkipStrategy = afterMatchSkipStrategy; |
| } |
| } |
| |
| @Override |
| public void initializeState(StateInitializationContext context) throws Exception { |
| super.initializeState(context); |
| |
| // initializeState through the provided context |
| computationStates = getKeyedState( |
| new KeyedValueStateDescriptor<>( |
| NFA_STATE_NAME, |
| (TypeSerializer<KEY>) getKeySerializer(), |
| NFAStateSerializer.INSTANCE)); |
| |
| partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), inputSerializer); |
| |
| elementQueueState = getKeyedState( |
| new KeyedMapStateDescriptor<>( |
| EVENT_QUEUE_STATE_NAME, |
| (TypeSerializer<KEY>) getKeySerializer(), |
| LongSerializer.INSTANCE, |
| new ListSerializer<>(inputSerializer))); |
| |
| migrateOldState(); |
| } |
| |
| private void migrateOldState() throws Exception { |
| ValueStateDescriptor<MigratedNFA<IN>> stateDescriptor = new ValueStateDescriptor<>( |
| "nfaOperatorStateName", |
| new NFA.NFASerializer<>(inputSerializer)); |
| ValueState<MigratedNFA<IN>> state = stateDescriptor.bind(contextStateHelper); |
| Preconditions.checkState(state instanceof ContextKeyedState, "Only apply to state warps on top of KeyedState"); |
| |
| KeyedStateFunction<Object, ValueState<MigratedNFA<IN>>> function = new KeyedStateFunction<Object, ValueState<MigratedNFA<IN>>>() { |
| @Override |
| public void process(Object key, ValueState<MigratedNFA<IN>> state) throws Exception { |
| MigratedNFA<IN> oldState = state.value(); |
| computationStates.put((KEY) key, new NFAState(oldState.getComputationStates())); |
| org.apache.flink.cep.nfa.SharedBuffer<IN> sharedBuffer = oldState.getSharedBuffer(); |
| partialMatches.init(sharedBuffer.getEventsBuffer(), sharedBuffer.getPages()); |
| state.clear(); |
| } |
| }; |
| |
| Iterator<KEY> keys = ((ContextKeyedState) state).getKeyedState().keys().iterator(); |
| while (keys.hasNext()) { |
| KEY key = keys.next(); |
| setCurrentKey(key); |
| try { |
| function.process(key, state); |
| } catch (Throwable e) { |
| ExceptionUtils.rethrow(e); |
| } |
| } |
| } |
| |
| @Override |
| public void open() throws Exception { |
| super.open(); |
| |
| timerService = getInternalTimerService( |
| "watermark-callbacks", |
| VoidNamespaceSerializer.INSTANCE, |
| this); |
| |
| this.nfa = nfaFactory.createNFA(); |
| |
| openNFA(nfa); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| super.close(); |
| |
| closeNFA(nfa); |
| } |
| |
| @Override |
| public void processElement(StreamRecord<IN> element) throws Exception { |
| if (isProcessingTime) { |
| if (comparator == null) { |
| // there can be no out of order elements in processing time |
| NFAState nfaState = getNFAState(); |
| long timestamp = getProcessingTimeService().getCurrentProcessingTime(); |
| advanceTime(nfaState, timestamp); |
| processEvent(nfaState, element.getValue(), timestamp); |
| updateNFA(nfaState); |
| } else { |
| long currentTime = timerService.currentProcessingTime(); |
| bufferEvent(element.getValue(), currentTime); |
| |
| // register a timer for the next millisecond to sort and emit buffered data |
| timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1); |
| } |
| |
| } else { |
| |
| long timestamp = element.getTimestamp(); |
| IN value = element.getValue(); |
| |
| // In event-time processing we assume correctness of the watermark. |
| // Events with timestamp smaller than or equal with the last seen watermark are considered late. |
| // Late events are put in a dedicated side output, if the user has specified one. |
| |
| if (timestamp > lastWatermark) { |
| |
| // we have an event with a valid timestamp, so |
| // we buffer it until we receive the proper watermark. |
| |
| saveRegisterWatermarkTimer(); |
| |
| bufferEvent(value, timestamp); |
| |
| } else if (lateDataOutputTag != null) { |
| output.collect(lateDataOutputTag, element); |
| } |
| } |
| } |
| |
| /** |
| * Registers a timer for {@code current watermark + 1}, this means that we get triggered |
| * whenever the watermark advances, which is what we want for working off the queue of |
| * buffered elements. |
| */ |
| private void saveRegisterWatermarkTimer() { |
| long currentWatermark = timerService.currentWatermark(); |
| // protect against overflow |
| if (currentWatermark + 1 > currentWatermark) { |
| timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1); |
| } |
| } |
| |
| private void bufferEvent(IN event, long currentTime) throws Exception { |
| List<IN> elementsForTimestamp = elementQueueState.get((KEY) getCurrentKey(), currentTime); |
| if (elementsForTimestamp == null) { |
| elementsForTimestamp = new ArrayList<>(); |
| } |
| |
| if (getExecutionConfig().isObjectReuseEnabled()) { |
| // copy the StreamRecord so that it cannot be changed |
| elementsForTimestamp.add(inputSerializer.copy(event)); |
| } else { |
| elementsForTimestamp.add(event); |
| } |
| elementQueueState.add((KEY) getCurrentKey(), currentTime, elementsForTimestamp); |
| } |
| |
| @Override |
| public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { |
| |
| // 1) get the queue of pending elements for the key and the corresponding NFA, |
| // 2) process the pending elements in event time order and custom comparator if exists |
| // by feeding them in the NFA |
| // 3) advance the time to the current watermark, so that expired patterns are discarded. |
| // 4) update the stored state for the key, by only storing the new NFA and MapState iff they |
| // have state to be used later. |
| // 5) update the last seen watermark. |
| |
| // STEP 1 |
| PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(); |
| NFAState nfaState = getNFAState(); |
| |
| // STEP 2 |
| while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) { |
| long timestamp = sortedTimestamps.poll(); |
| List<IN> elements = elementQueueState.get((KEY) getCurrentKey(), timestamp); |
| if (elements != null) { |
| advanceTime(nfaState, timestamp); |
| try (Stream<IN> sortedElements = sort(elements)) { |
| sortedElements.forEachOrdered( |
| event -> { |
| try { |
| processEvent(nfaState, event, timestamp); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| ); |
| } |
| } else { |
| // Skip elements if state value of the timestamp is cleared. |
| String message = "The element state is cleared because of state ttl. This will " + |
| "result in incorrect result. You can increase the state ttl to " + |
| "avoid this."; |
| LOG.warn(message); |
| } |
| elementQueueState.remove((KEY) getCurrentKey(), timestamp); |
| } |
| |
| // STEP 3 |
| advanceTime(nfaState, timerService.currentWatermark()); |
| |
| // STEP 4 |
| updateNFA(nfaState); |
| |
| if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) { |
| saveRegisterWatermarkTimer(); |
| } |
| |
| // STEP 5 |
| updateLastSeenWatermark(timerService.currentWatermark()); |
| } |
| |
| @Override |
| public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { |
| // 1) get the queue of pending elements for the key and the corresponding NFA, |
| // 2) process the pending elements in process time order and custom comparator if exists |
| // by feeding them in the NFA |
| // 3) update the stored state for the key, by only storing the new NFA and MapState iff they |
| // have state to be used later. |
| |
| // STEP 1 |
| PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(); |
| NFAState nfa = getNFAState(); |
| |
| // STEP 2 |
| while (!sortedTimestamps.isEmpty()) { |
| long timestamp = sortedTimestamps.poll(); |
| List<IN> elements = elementQueueState.get((KEY) getCurrentKey(), timestamp); |
| if (elements != null) { |
| advanceTime(nfa, timestamp); |
| try (Stream<IN> sortedElements = sort(elements)) { |
| sortedElements.forEachOrdered( |
| event -> { |
| try { |
| processEvent(nfa, event, timestamp); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| ); |
| } |
| } else { |
| String message = "The element state is cleared because of state ttl. This will " + |
| "result in incorrect result. You can increase the state ttl to " + |
| "avoid this."; |
| LOG.warn(message); |
| } |
| elementQueueState.remove((KEY) getCurrentKey(), timestamp); |
| } |
| |
| // STEP 3 |
| updateNFA(nfa); |
| } |
| |
| private Stream<IN> sort(Collection<IN> elements) { |
| Stream<IN> stream = elements.stream(); |
| return (comparator == null) ? stream : stream.sorted(comparator); |
| } |
| |
| private void updateLastSeenWatermark(long timestamp) { |
| this.lastWatermark = timestamp; |
| } |
| |
| private NFAState getNFAState() throws IOException { |
| NFAState nfaState = computationStates.get((KEY) getCurrentKey()); |
| return nfaState != null ? nfaState : nfa.createInitialNFAState(); |
| } |
| |
| private void updateNFA(NFAState nfaState) throws IOException { |
| if (nfaState.isStateChanged()) { |
| nfaState.resetStateChanged(); |
| computationStates.put((KEY) getCurrentKey(), nfaState); |
| } |
| } |
| |
| private PriorityQueue<Long> getSortedTimestamps() throws Exception { |
| PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>(); |
| for (Long timestamp : elementQueueState.mapKeys((KEY) getCurrentKey())) { |
| sortedTimestamps.offer(timestamp); |
| } |
| return sortedTimestamps; |
| } |
| |
| /** |
| * Process the given event by giving it to the NFA and outputting the produced set of matched |
| * event sequences. |
| * |
| * @param nfaState Our NFAState object |
| * @param event The current event to be processed |
| * @param timestamp The timestamp of the event |
| */ |
| private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception { |
| try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) { |
| Collection<Map<String, List<IN>>> patterns = |
| nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy); |
| processMatchedSequences(patterns, timestamp); |
| } |
| } |
| |
| /** |
| * Advances the time for the given NFA to the given timestamp. This means that no more events with timestamp |
| * <b>lower</b> than the given timestamp should be passed to the nfa, This can lead to pruning and timeouts. |
| */ |
| private void advanceTime(NFAState nfaState, long timestamp) throws Exception { |
| try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) { |
| Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut = |
| nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp); |
| processTimedOutSequences(timedOut, timestamp); |
| } |
| } |
| |
| private void openNFA(NFA<IN> nfa) throws Exception { |
| Configuration conf = new Configuration(); |
| for (State<IN> state : nfa.getStates()) { |
| for (StateTransition<IN> transition : state.getStateTransitions()) { |
| IterativeCondition condition = transition.getCondition(); |
| FunctionUtils.setFunctionRuntimeContext(condition, getRuntimeContext()); |
| FunctionUtils.openFunction(condition, conf); |
| } |
| } |
| } |
| |
| private void closeNFA(NFA<IN> nfa) throws Exception { |
| for (State<IN> state : nfa.getStates()) { |
| for (StateTransition<IN> transition : state.getStateTransitions()) { |
| IterativeCondition condition = transition.getCondition(); |
| FunctionUtils.closeFunction(condition); |
| } |
| } |
| } |
| |
| protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception; |
| |
| protected void processTimedOutSequences( |
| Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, |
| long timestamp) throws Exception { |
| } |
| |
| ////////////////////// Testing Methods ////////////////////// |
| |
| @VisibleForTesting |
| public boolean hasNonEmptySharedBuffer(KEY key) throws Exception { |
| setCurrentKey(key); |
| return !partialMatches.isEmpty(); |
| } |
| |
| @VisibleForTesting |
| public boolean hasNonEmptyPQ(KEY key) throws Exception { |
| setCurrentKey(key); |
| return elementQueueState.mapKeys(key).iterator().hasNext(); |
| } |
| |
| @VisibleForTesting |
| public int getPQSize(KEY key) throws Exception { |
| setCurrentKey(key); |
| int counter = 0; |
| for (List<IN> elements: elementQueueState.mapValues(key)) { |
| counter += elements.size(); |
| } |
| return counter; |
| } |
| |
| @Override |
| public boolean requireState() { |
| return true; |
| } |
| } |