blob: 8ddf6a40e0125a8f8f2c6671fa90c873e9ccd31f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.windowing.sessionwindows;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.List;
/**
* Implementation of EventGenerator that generates timely and late (in-lateness and after-lateness)
* events for a single session.
*
* @param <K> type of session key
* @param <E> type of session event
*/
public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
/** Event timing w.r.t the global watermark. */
public enum Timing {
TIMELY,
IN_LATENESS,
AFTER_LATENESS
}
// pseudo random engine
private final LongRandomGenerator randomGenerator;
// configuration for the generated session generator
private final SessionGeneratorConfiguration<K, E> configuration;
// precomputed timestamps for the timely events (could be a list of primitive longs)
private final List<Long> orderedTimelyTimestamps;
// the minimum timestamp for events emitted by this generator
private final long minTimestamp;
// the maximum timestamp for events emitted by this generator
private final long maxTimestamp;
// tracks how many events this generator has produced
private int producedEventsCount;
// sub-generator that capture the lifecycle w.r.t. the global watermark (timely, in lateness,
// after lateness)
private EventGenerator<K, E> timingAwareEventGenerator;
/**
* @param configuration session generator configuration
* @param randomGenerator random engine for the event generation
*/
public SessionEventGeneratorImpl(
SessionGeneratorConfiguration<K, E> configuration,
LongRandomGenerator randomGenerator) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(randomGenerator);
this.producedEventsCount = 0;
this.configuration = configuration;
this.randomGenerator = randomGenerator;
// pre-compute and store all timestamps for the timely events in this session
final int timelyEventsInSessionCount =
configuration.getSessionConfiguration().getNumberOfTimelyEvents();
this.orderedTimelyTimestamps = new ArrayList<>(timelyEventsInSessionCount);
this.minTimestamp = configuration.getSessionConfiguration().getMinEventTimestamp();
generateOrderedTimelyTimestamps(minTimestamp, timelyEventsInSessionCount);
this.maxTimestamp = orderedTimelyTimestamps.get(orderedTimelyTimestamps.size() - 1);
this.timingAwareEventGenerator = new TimelyGenerator();
}
/** @see EventGenerator */
@Override
public boolean canGenerateEventAtWatermark(long globalWatermark) {
return timingAwareEventGenerator.canGenerateEventAtWatermark(globalWatermark);
}
/** @see EventGenerator */
@Override
public E generateEvent(long globalWatermark) {
if (hasMoreEvents()) {
++producedEventsCount;
E event = timingAwareEventGenerator.generateEvent(globalWatermark);
while (!timingAwareEventGenerator.hasMoreEvents()) {
timingAwareEventGenerator =
timingAwareEventGenerator.getNextGenerator(globalWatermark);
}
return event;
} else {
throw new IllegalStateException("All events exhausted");
}
}
/** @see EventGenerator */
@Override
public long getLocalWatermark() {
return timingAwareEventGenerator.getLocalWatermark();
}
/** @see EventGenerator */
@Override
public boolean hasMoreEvents() {
return producedEventsCount < getAllEventsCount();
}
/**
* Pre-computes and stores the timestamps for timely events in this session in a list (ordered).
*
* @param minTimestamp the minimum event time in the session
* @param onTimeEventCountInSession the number of timestamps to generate
*/
private void generateOrderedTimelyTimestamps(long minTimestamp, int onTimeEventCountInSession) {
long generatedTimestamp = minTimestamp;
for (int i = 1; i < onTimeEventCountInSession; ++i) {
orderedTimelyTimestamps.add(generatedTimestamp);
generatedTimestamp += randomGenerator.randomLongBetween(0, getGap() - 1);
}
orderedTimelyTimestamps.add(generatedTimestamp);
}
private E createEventFromTimestamp(long eventTimestamp, long globalWatermark, Timing timing) {
return getEventFactory()
.createEvent(
getKey(),
getSessionId(),
producedEventsCount,
eventTimestamp,
globalWatermark,
timing);
}
/** @return a timestamp in the session that is timely */
private long generateTimelyInSessionTimestamp() {
int chosenTimestampIndex = randomGenerator.choseRandomIndex(orderedTimelyTimestamps);
// performance: consider that remove is an O(n) operation here, with n being the number of
// timely events but
// this should not matter too much for a IT case
return orderedTimelyTimestamps.remove(chosenTimestampIndex);
}
/** @return a timestamp in the session */
private long generateArbitraryInSessionTimestamp() {
return randomGenerator.randomLongBetween(minTimestamp, maxTimestamp + 1);
}
/**
* @param globalWatermark the current global watermark
* @return true if the session window for this session has already triggered at global watermark
*/
private boolean isTriggered(long globalWatermark) {
return globalWatermark >= maxTimestamp + getGap() - 1;
}
/**
* @param globalWatermark the current global watermark
* @return true if all future generated events are after lateness w.r.t global watermark
*/
private boolean isAfterLateness(long globalWatermark) {
return globalWatermark >= getAfterLatenessTimestamp();
}
/** @return timestamp of the watermark at events for this session are after the lateness */
private long getAfterLatenessTimestamp() {
return getTriggerTimestamp() + getLateness();
}
/** @return timestamp of the watermark at which the window for this session will trigger */
private long getTriggerTimestamp() {
return maxTimestamp + getGap() - 1;
}
@Override
public K getKey() {
return configuration.getSessionConfiguration().getKey();
}
private long getGap() {
return configuration.getSessionConfiguration().getGap();
}
private long getLateness() {
return configuration.getGeneratorConfiguration().getAllowedLateness();
}
private GeneratorEventFactory<K, E> getEventFactory() {
return configuration.getSessionConfiguration().getEventFactory();
}
private int getSessionId() {
return configuration.getSessionConfiguration().getSessionId();
}
private int getTimelyEventsCount() {
return configuration.getSessionConfiguration().getNumberOfTimelyEvents();
}
private int getLateEventsCount() {
return getTimelyEventsCount()
+ configuration.getGeneratorConfiguration().getLateEventsWithinLateness();
}
private int getAllEventsCount() {
return getLateEventsCount()
+ configuration.getGeneratorConfiguration().getLateEventsAfterLateness();
}
private boolean hasMoreTimelyEvents() {
return !orderedTimelyTimestamps.isEmpty();
}
private boolean hasMoreInLatenessEvents() {
return producedEventsCount < getLateEventsCount();
}
/** @see EventGenerator */
@Override
public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
GeneratorConfiguration generatorConfiguration = configuration.getGeneratorConfiguration();
SessionConfiguration<K, E> sessionConfiguration = configuration.getSessionConfiguration();
// compute the start timestamp for the next session
long maxAdditionalGap = generatorConfiguration.getMaxAdditionalSessionGap();
long nextStartTime =
Math.max(
getAfterLatenessTimestamp()
+ randomGenerator.randomLongBetween(0, maxAdditionalGap),
globalWatermark);
sessionConfiguration = sessionConfiguration.getFollowupSessionConfiguration(nextStartTime);
SessionGeneratorConfiguration<K, E> sessionGeneratorConfiguration =
new SessionGeneratorConfiguration<>(sessionConfiguration, generatorConfiguration);
return new SessionEventGeneratorImpl<>(sessionGeneratorConfiguration, randomGenerator);
}
private abstract class AbstractEventGenerator implements EventGenerator<K, E> {
@Override
public K getKey() {
return configuration.getSessionConfiguration().getKey();
}
}
/** Internal generator delegate for producing session events that are timely. */
private class TimelyGenerator extends AbstractEventGenerator {
@Override
public E generateEvent(long globalWatermark) {
return createEventFromTimestamp(
generateTimelyInSessionTimestamp(), globalWatermark, Timing.TIMELY);
}
@Override
public long getLocalWatermark() {
return orderedTimelyTimestamps.get(0);
}
@Override
public boolean canGenerateEventAtWatermark(long globalWatermark) {
return true;
}
@Override
public boolean hasMoreEvents() {
return hasMoreTimelyEvents();
}
@Override
public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
return new InLatenessGenerator();
}
}
/**
* Internal generator delegate for producing late session events with timestamps within the
* allowed lateness.
*/
private class InLatenessGenerator extends AbstractEventGenerator {
@Override
public E generateEvent(long globalWatermark) {
return createEventFromTimestamp(
generateArbitraryInSessionTimestamp(), globalWatermark, Timing.IN_LATENESS);
}
@Override
public long getLocalWatermark() {
return getAfterLatenessTimestamp() - 1;
}
@Override
public boolean canGenerateEventAtWatermark(long globalWatermark) {
return isTriggered(globalWatermark);
}
@Override
public boolean hasMoreEvents() {
return hasMoreInLatenessEvents();
}
@Override
public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
return new AfterLatenessGenerator();
}
}
/**
* Internal generator delegate for producing late session events with timestamps after the
* lateness.
*/
private class AfterLatenessGenerator extends AbstractEventGenerator {
@Override
public E generateEvent(long globalWatermark) {
return createEventFromTimestamp(
generateArbitraryInSessionTimestamp(), globalWatermark, Timing.AFTER_LATENESS);
}
@Override
public long getLocalWatermark() {
return getAfterLatenessTimestamp();
}
@Override
public boolean canGenerateEventAtWatermark(long globalWatermark) {
return isAfterLateness(globalWatermark);
}
@Override
public boolean hasMoreEvents() {
return true;
}
@Override
public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
throw new IllegalStateException("This generator has no successor");
}
}
}