blob: 8d3242efda5d24e96c5a5c17a7c3cdd78b2a1bf6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.tests;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
/**
* This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive
* windows.
*/
public class SlidingWindowCheckMapper extends RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
private static final long serialVersionUID = -744070793650644485L;
/** This value state tracks previously seen events with the number of windows they appeared in. */
private transient ValueState<List<Tuple2<Event, Integer>>> eventsSeenSoFar;
private transient ValueState<Long> lastSequenceNumber;
private final int slideFactor;
SlidingWindowCheckMapper(int slideFactor) {
this.slideFactor = slideFactor;
}
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<List<Tuple2<Event, Integer>>> previousWindowDescriptor =
new ValueStateDescriptor<>("eventsSeenSoFar",
new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO)));
eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor);
ValueStateDescriptor<Long> lastSequenceNumberDescriptor =
new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO);
lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor);
}
@Override
public void flatMap(Tuple2<Integer, List<Event>> value, Collector<String> out) throws Exception {
List<Tuple2<Event, Integer>> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet(
Collections::emptyList);
List<Event> newValues = value.f1;
Optional<Event> lastEventInWindow = verifyWindowContiguity(newValues, out);
Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value();
List<Tuple2<Event, Integer>> newWindows =
verifyPreviousOccurences(previousWindowValues, newValues, lastSequenceNumberSeenSoFar, out);
if (lastEventInWindow.isPresent()) {
updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar, out);
}
eventsSeenSoFar.update(newWindows);
}
private void updateLastSeenSequenceNumber(
Event lastEventInWindow,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) throws IOException {
long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber();
if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) {
lastSequenceNumber.update(lastSequenceNumberInWindow);
} else if (lastSequenceNumberInWindow < lastSequenceNumberSeenSoFar) {
failWithSequenceNumberDecreased(lastEventInWindow, lastSequenceNumberSeenSoFar, out);
}
}
private void failWithSequenceNumberDecreased(
Event lastEventInWindow,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) {
out.collect(String.format("Last event in current window (%s) has sequence number lower than seen so far (%d)",
lastEventInWindow,
lastSequenceNumberSeenSoFar));
}
/**
* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that
* were not seen <b>slideFactor</b> number of times yet.
*/
private List<Tuple2<Event, Integer>> verifyPreviousOccurences(
List<Tuple2<Event, Integer>> previousWindowValues,
List<Event> newValues,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) {
List<Tuple2<Event, Integer>> newEventsSeenSoFar = new ArrayList<>();
List<Event> seenEvents = new ArrayList<>();
for (Tuple2<Event, Integer> windowValue : previousWindowValues) {
if (!newValues.contains(windowValue.f0)) {
failWithEventNotSeenAlertMessage(windowValue, newValues, out);
} else {
seenEvents.add(windowValue.f0);
preserveOrDiscardIfSeenSlideFactorTimes(newEventsSeenSoFar, windowValue);
}
}
addNotSeenValues(newEventsSeenSoFar, newValues, seenEvents, lastSequenceNumberSeenSoFar, out);
return newEventsSeenSoFar;
}
private void addNotSeenValues(
List<Tuple2<Event, Integer>> newEventsSeenSoFar,
List<Event> newValues,
List<Event> seenValues,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) {
newValues.stream()
.filter(e -> !seenValues.contains(e))
.forEach(e -> {
if (lastSequenceNumberSeenSoFar == null || e.getSequenceNumber() > lastSequenceNumberSeenSoFar) {
newEventsSeenSoFar.add(Tuple2.of(e, 1));
} else {
failWithEventSeenTooManyTimesMessage(e, out);
}
});
}
private void failWithEventSeenTooManyTimesMessage(Event e, Collector<String> out) {
out.collect(String.format("Alert: event %s seen more than %d times", e, slideFactor));
}
private void preserveOrDiscardIfSeenSlideFactorTimes(
List<Tuple2<Event, Integer>> newEvenstSeenSoFar,
Tuple2<Event, Integer> windowValue) {
int timesSeen = windowValue.f1 + 1;
if (timesSeen != slideFactor) {
newEvenstSeenSoFar.add(Tuple2.of(windowValue.f0, timesSeen));
}
}
private void failWithEventNotSeenAlertMessage(
Tuple2<Event, Integer> previousWindowValue,
List<Event> currentWindowValues,
Collector<String> out) {
out.collect(String.format(
"Alert: event %s did not belong to %d consecutive windows. " +
"Event seen so far %d times.Current window: %s",
previousWindowValue.f0,
slideFactor,
previousWindowValue.f1,
currentWindowValues));
}
private Optional<Event> verifyWindowContiguity(List<Event> newValues, Collector<String> out) {
return newValues.stream()
.sorted(Comparator.comparingLong(Event::getSequenceNumber))
.reduce((event, event2) -> {
if (event2.getSequenceNumber() - 1 != event.getSequenceNumber()) {
out.collect("Alert: events in window out ouf order!");
}
return event2;
});
}
}