blob: ff302351b36bcf02990f36f8ab62a96193af0761 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.windowing.sessionwindows;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* Generator that employs several (sub-) event generators to produce events for multiple sessions in
* parallel, i.e. events are emitted in an interleaved way.
*
* <p>Even events that belong to different sessions for the same key can be generated in parallel.
*
* <p>The watermark is computed as the minimum of watermarks among all current sub-generators.
*
* @param <K> session key type
* @param <E> session event type
*/
public class ParallelSessionsEventGenerator<K, E> {
// set of all possible keys for generated sessions
private final Set<K> sessionKeys;
// factory for generators that generate exactly one session
private final EventGeneratorFactory<K, E> generatorFactory;
// list of sub-generators for the current sessions
private final List<EventGenerator<K, E>> subGeneratorLists;
// pseudo random engine
private final LongRandomGenerator randomGenerator;
// maximum number of sessions this will generate
private final long sessionCountLimit;
public ParallelSessionsEventGenerator(
Set<K> keys,
EventGeneratorFactory<K, E> generatorFactory,
int parallelSessions,
long sessionCountLimit,
LongRandomGenerator randomGenerator) {
Preconditions.checkNotNull(keys);
Preconditions.checkNotNull(generatorFactory);
Preconditions.checkArgument(parallelSessions > 0);
Preconditions.checkArgument(!keys.isEmpty());
Preconditions.checkNotNull(randomGenerator);
this.sessionKeys = keys;
this.randomGenerator = randomGenerator;
this.generatorFactory = generatorFactory;
this.sessionCountLimit = sessionCountLimit;
this.subGeneratorLists = new ArrayList<>(parallelSessions);
initParallelSessionGenerators(parallelSessions);
}
/** @return the next generated event */
public E nextEvent() {
// the session limit is reached and all generators are exhausted
if (subGeneratorLists.isEmpty()) {
return null;
}
final long globalWatermark = getWatermark();
// iterates at most once over all sub-generators, starting at a randomly chosen index to
// find one that can
// currently produce events
final int choice = randomGenerator.choseRandomIndex(subGeneratorLists);
for (int i = choice; i < choice + subGeneratorLists.size(); ++i) {
final int index = i % subGeneratorLists.size();
EventGenerator<K, E> subGenerator = subGeneratorLists.get(index);
// check if the sub-generator can produce an event under the current global watermark
if (subGenerator.canGenerateEventAtWatermark(globalWatermark)) {
E event = subGenerator.generateEvent(globalWatermark);
// check if the sub-generator produced it's last event
if (!subGenerator.hasMoreEvents()) {
// replaces exhausted generator if the session limit is not met
if (generatorFactory.getProducedGeneratorsCount() < sessionCountLimit) {
subGeneratorLists.set(
index,
generatorFactory.newSessionGeneratorForKey(
randomGenerator.chooseRandomElement(sessionKeys),
getWatermark()));
} else {
// otherwise removes the sub-generator and shrinks the list of open sessions
// permanently
subGeneratorLists.remove(index);
}
}
return event;
}
}
// if everything works correctly, this should never happen
throw new IllegalStateException(
"Unable to find an open sub-generator that can produce events");
}
/**
* @return a global watermark that is the minimum of all individual watermarks of the
* sub-generators
*/
public long getWatermark() {
long watermark = Long.MAX_VALUE;
for (EventGenerator<K, E> eventGenerator : subGeneratorLists) {
watermark = Math.min(watermark, eventGenerator.getLocalWatermark());
}
return watermark;
}
/** @param parallelSessions the number of parallel sessions to initialize */
private void initParallelSessionGenerators(int parallelSessions) {
for (int i = 0;
i < parallelSessions
&& generatorFactory.getProducedGeneratorsCount() < sessionCountLimit;
++i) {
subGeneratorLists.add(
generatorFactory.newSessionGeneratorForKey(
randomGenerator.chooseRandomElement(sessionKeys), 0L));
}
}
}