blob: 7280e8fa26cb11c782603f2b9f429409b482f5bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* This verifies that checkpointing works correctly with event time windows. This is more strict
* than {@link ProcessingTimeWindowCheckpointingITCase} because for event-time the contents of the
* emitted windows are deterministic.
*
* <p>Split into multiple test classes in order to decrease the runtime per backend and not run into
* CI infrastructure limits like no std output being emitted for I/O heavy variants.
*/
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class EventTimeWindowCheckpointingITCase extends TestLogger {
private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
private static final int PARALLELISM = 4;
private static final int NUM_OF_TASK_MANAGERS = 2;
private TestingServer zkServer;
public MiniClusterWithClientResource miniClusterResource;
@ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
@Rule public TestName name = new TestName();
private AbstractStateBackend stateBackend;
public StateBackendEnum stateBackendEnum;
private final int buffersPerChannel;
enum StateBackendEnum {
MEM,
FILE,
ROCKSDB_FULLY_ASYNC,
ROCKSDB_INCREMENTAL,
ROCKSDB_INCREMENTAL_ZK,
MEM_ASYNC,
FILE_ASYNC
}
@Parameterized.Parameters(name = "statebackend type ={0}, buffersPerChannel = {1}")
public static Collection<Object[]> parameter() {
return Arrays.stream(StateBackendEnum.values())
.map((type) -> new Object[][] {{type, 0}, {type, 2}})
.flatMap(Arrays::stream)
.collect(Collectors.toList());
}
public EventTimeWindowCheckpointingITCase(
StateBackendEnum stateBackendEnum, int buffersPerChannel) {
this.stateBackendEnum = stateBackendEnum;
this.buffersPerChannel = buffersPerChannel;
}
protected StateBackendEnum getStateBackend() {
return this.stateBackendEnum;
}
protected final MiniClusterWithClientResource getMiniClusterResource() {
return new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfigurationSafe())
.setNumberTaskManagers(NUM_OF_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(PARALLELISM / NUM_OF_TASK_MANAGERS)
.build());
}
private Configuration getConfigurationSafe() {
try {
return getConfiguration();
} catch (Exception e) {
throw new AssertionError("Could not initialize test.", e);
}
}
private Configuration getConfiguration() throws Exception {
// print a message when starting a test method to avoid Travis' <tt>"Maven produced no
// output for xxx seconds."</tt> messages
System.out.println(
"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
StateBackendEnum stateBackendEnum = getStateBackend();
if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
zkServer = new TestingServer();
zkServer.start();
}
Configuration config = createClusterConfig();
config.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel);
switch (stateBackendEnum) {
case MEM:
this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
break;
case FILE:
{
String backups = tempFolder.newFolder().getAbsolutePath();
this.stateBackend = new FsStateBackend("file://" + backups, false);
break;
}
case MEM_ASYNC:
this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
break;
case FILE_ASYNC:
{
String backups = tempFolder.newFolder().getAbsolutePath();
this.stateBackend = new FsStateBackend("file://" + backups, true);
break;
}
case ROCKSDB_FULLY_ASYNC:
{
setupRocksDB(config, -1, false);
break;
}
case ROCKSDB_INCREMENTAL:
// Test RocksDB based timer service as well
config.set(
RocksDBOptions.TIMER_SERVICE_FACTORY,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
setupRocksDB(config, 16, true);
break;
case ROCKSDB_INCREMENTAL_ZK:
{
setupRocksDB(config, 16, true);
break;
}
default:
throw new IllegalStateException("No backend selected.");
}
return config;
}
private void setupRocksDB(
Configuration config, int fileSizeThreshold, boolean incrementalCheckpoints)
throws IOException {
// Configure the managed memory size as 64MB per slot for rocksDB state backend.
config.set(
TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 64));
String rocksDb = tempFolder.newFolder().getAbsolutePath();
String backups = tempFolder.newFolder().getAbsolutePath();
// we use the fs backend with small threshold here to test the behaviour with file
// references, not self contained byte handles
RocksDBStateBackend rdb =
new RocksDBStateBackend(
new FsStateBackend(
new Path("file://" + backups).toUri(), fileSizeThreshold),
incrementalCheckpoints);
rdb.setDbStoragePath(rocksDb);
this.stateBackend = rdb;
}
protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final File haDir = temporaryFolder.newFolder();
Configuration config = new Configuration();
config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
if (zkServer != null) {
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
}
return config;
}
@Before
public void setupTestCluster() throws Exception {
miniClusterResource = getMiniClusterResource();
miniClusterResource.before();
}
@After
public void stopTestCluster() throws IOException {
if (miniClusterResource != null) {
miniClusterResource.after();
miniClusterResource = null;
}
if (zkServer != null) {
zkServer.stop();
zkServer = null;
}
// Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no
// output
// for xxx seconds."</tt> messages.
System.out.println(
"Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
}
// ------------------------------------------------------------------------
@Test
public void testTumblingTimeWindow() {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env.addSource(
new FailingSource(
new KeyedEventTimeGenerator(numKeys, windowSize),
numElementsPerKey))
.rebalance()
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(
new RichWindowFunction<
Tuple2<Long, IntType>,
Tuple4<Long, Long, Long, IntType>,
Tuple,
TimeWindow>() {
private boolean open = false;
@Override
public void open(Configuration parameters) {
assertEquals(
PARALLELISM,
getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
}
@Override
public void apply(
Tuple tuple,
TimeWindow window,
Iterable<Tuple2<Long, IntType>> values,
Collector<Tuple4<Long, Long, Long, IntType>> out) {
// validate that the function has been opened properly
assertTrue(open);
int sum = 0;
long key = -1;
for (Tuple2<Long, IntType> value : values) {
sum += value.f1.value;
key = value.f0;
}
final Tuple4<Long, Long, Long, IntType> result =
new Tuple4<>(
key,
window.getStart(),
window.getEnd(),
new IntType(sum));
out.collect(result);
}
})
.addSink(
new ValidatingSink<>(
new SinkValidatorUpdateFun(numElementsPerKey),
new SinkValidatorCheckFun(
numKeys, numElementsPerKey, windowSize)))
.setParallelism(1);
env.execute("Tumbling Window Test");
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
doTestTumblingTimeWindowWithKVState(PARALLELISM);
}
@Test
public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
doTestTumblingTimeWindowWithKVState(1 << 15);
}
public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setMaxParallelism(maxParallelism);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env.addSource(
new FailingSource(
new KeyedEventTimeGenerator(numKeys, windowSize),
numElementsPerKey))
.rebalance()
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(
new RichWindowFunction<
Tuple2<Long, IntType>,
Tuple4<Long, Long, Long, IntType>,
Tuple,
TimeWindow>() {
private boolean open = false;
private ValueState<Integer> count;
@Override
public void open(Configuration parameters) {
assertEquals(
PARALLELISM,
getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
count =
getRuntimeContext()
.getState(
new ValueStateDescriptor<>(
"count", Integer.class, 0));
}
@Override
public void apply(
Tuple tuple,
TimeWindow window,
Iterable<Tuple2<Long, IntType>> values,
Collector<Tuple4<Long, Long, Long, IntType>> out)
throws Exception {
// the window count state starts with the key, so that we get
// different count results for each key
if (count.value() == 0) {
count.update(tuple.<Long>getField(0).intValue());
}
// validate that the function has been opened properly
assertTrue(open);
count.update(count.value() + 1);
out.collect(
new Tuple4<>(
tuple.<Long>getField(0),
window.getStart(),
window.getEnd(),
new IntType(count.value())));
}
})
.addSink(
new ValidatingSink<>(
new CountingSinkValidatorUpdateFun(),
new SinkValidatorCheckFun(
numKeys, numElementsPerKey, windowSize)))
.setParallelism(1);
env.execute("Tumbling Window Test");
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSlidingTimeWindow() {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int windowSlide = windowSlide();
final int numKeys = numKeys();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(2 * PARALLELISM);
env.setParallelism(PARALLELISM);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env.addSource(
new FailingSource(
new KeyedEventTimeGenerator(numKeys, windowSlide),
numElementsPerKey))
.rebalance()
.keyBy(0)
.window(
SlidingEventTimeWindows.of(
Time.milliseconds(windowSize), Time.milliseconds(windowSlide)))
.apply(
new RichWindowFunction<
Tuple2<Long, IntType>,
Tuple4<Long, Long, Long, IntType>,
Tuple,
TimeWindow>() {
private boolean open = false;
@Override
public void open(Configuration parameters) {
assertEquals(
PARALLELISM,
getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
}
@Override
public void apply(
Tuple tuple,
TimeWindow window,
Iterable<Tuple2<Long, IntType>> values,
Collector<Tuple4<Long, Long, Long, IntType>> out) {
// validate that the function has been opened properly
assertTrue(open);
int sum = 0;
long key = -1;
for (Tuple2<Long, IntType> value : values) {
sum += value.f1.value;
key = value.f0;
}
final Tuple4<Long, Long, Long, IntType> output =
new Tuple4<>(
key,
window.getStart(),
window.getEnd(),
new IntType(sum));
out.collect(output);
}
})
.addSink(
new ValidatingSink<>(
new SinkValidatorUpdateFun(numElementsPerKey),
new SinkValidatorCheckFun(
numKeys, numElementsPerKey, windowSlide)))
.setParallelism(1);
env.execute("Tumbling Window Test");
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testPreAggregatedTumblingTimeWindow() {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env.addSource(
new FailingSource(
new KeyedEventTimeGenerator(numKeys, windowSize),
numElementsPerKey))
.rebalance()
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.reduce(
new ReduceFunction<Tuple2<Long, IntType>>() {
@Override
public Tuple2<Long, IntType> reduce(
Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
}
},
new RichWindowFunction<
Tuple2<Long, IntType>,
Tuple4<Long, Long, Long, IntType>,
Tuple,
TimeWindow>() {
private boolean open = false;
@Override
public void open(Configuration parameters) {
assertEquals(
PARALLELISM,
getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
}
@Override
public void apply(
Tuple tuple,
TimeWindow window,
Iterable<Tuple2<Long, IntType>> input,
Collector<Tuple4<Long, Long, Long, IntType>> out) {
// validate that the function has been opened properly
assertTrue(open);
for (Tuple2<Long, IntType> in : input) {
final Tuple4<Long, Long, Long, IntType> output =
new Tuple4<>(
in.f0,
window.getStart(),
window.getEnd(),
in.f1);
out.collect(output);
}
}
})
.addSink(
new ValidatingSink<>(
new SinkValidatorUpdateFun(numElementsPerKey),
new SinkValidatorCheckFun(
numKeys, numElementsPerKey, windowSize)))
.setParallelism(1);
env.execute("Tumbling Window Test");
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testPreAggregatedSlidingTimeWindow() {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int windowSlide = windowSlide();
final int numKeys = numKeys();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env.addSource(
new FailingSource(
new KeyedEventTimeGenerator(numKeys, windowSlide),
numElementsPerKey))
.rebalance()
.keyBy(0)
.window(
SlidingEventTimeWindows.of(
Time.milliseconds(windowSize), Time.milliseconds(windowSlide)))
.reduce(
new ReduceFunction<Tuple2<Long, IntType>>() {
@Override
public Tuple2<Long, IntType> reduce(
Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
// validate that the function has been opened properly
return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
}
},
new RichWindowFunction<
Tuple2<Long, IntType>,
Tuple4<Long, Long, Long, IntType>,
Tuple,
TimeWindow>() {
private boolean open = false;
@Override
public void open(Configuration parameters) {
assertEquals(
PARALLELISM,
getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
}
@Override
public void apply(
Tuple tuple,
TimeWindow window,
Iterable<Tuple2<Long, IntType>> input,
Collector<Tuple4<Long, Long, Long, IntType>> out) {
// validate that the function has been opened properly
assertTrue(open);
for (Tuple2<Long, IntType> in : input) {
out.collect(
new Tuple4<>(
in.f0,
window.getStart(),
window.getEnd(),
in.f1));
}
}
})
.addSink(
new ValidatingSink<>(
new SinkValidatorUpdateFun(numElementsPerKey),
new SinkValidatorCheckFun(
numKeys, numElementsPerKey, windowSlide)))
.setParallelism(1);
env.execute("Tumbling Window Test");
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/** For validating the stateful window counts. */
static class CountingSinkValidatorUpdateFun
implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
@Override
public void updateCount(
Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
windowCounts.merge(value.f0, 1, (a, b) -> a + b);
// verify the contents of that window, the contents should be:
// (key + num windows so far)
assertEquals(
"Window counts don't match for key " + value.f0 + ".",
value.f0.intValue() + windowCounts.get(value.f0),
value.f3.value);
}
}
// ------------------------------------
static class SinkValidatorUpdateFun
implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
private final int elementsPerKey;
SinkValidatorUpdateFun(int elementsPerKey) {
this.elementsPerKey = elementsPerKey;
}
@Override
public void updateCount(
Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
// the sum should be "sum (start .. end-1)"
int expectedSum = 0;
// we shorten the range if it goes beyond elementsPerKey, because those are "incomplete"
// sliding windows
long countUntil = Math.min(value.f2, elementsPerKey);
for (long i = value.f1; i < countUntil; i++) {
// only sum up positive vals, to filter out the negative start of the
// first sliding windows
if (i > 0) {
expectedSum += i;
}
}
assertEquals(
"Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
windowCounts.merge(value.f0, 1, (val, increment) -> val + increment);
}
}
static class SinkValidatorCheckFun implements ValidatingSink.ResultChecker {
private final int numKeys;
private final int numWindowsExpected;
SinkValidatorCheckFun(int numKeys, int elementsPerKey, int elementsPerWindow) {
this.numKeys = numKeys;
this.numWindowsExpected = elementsPerKey / elementsPerWindow;
}
@Override
public boolean checkResult(Map<Long, Integer> windowCounts) {
if (windowCounts.size() == numKeys) {
for (Integer windowCount : windowCounts.values()) {
if (windowCount < numWindowsExpected) {
return false;
}
}
return true;
}
return false;
}
}
static class KeyedEventTimeGenerator implements FailingSource.EventEmittingGenerator {
private final int keyUniverseSize;
private final int watermarkTrailing;
public KeyedEventTimeGenerator(int keyUniverseSize, int numElementsPerWindow) {
this.keyUniverseSize = keyUniverseSize;
// we let the watermark a bit behind, so that there can be in-flight timers that
// required checkpointing
// to include correct timer snapshots in our testing.
this.watermarkTrailing = 4 * numElementsPerWindow / 3;
}
@Override
public void emitEvent(
SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) {
final IntType intTypeNext = new IntType(eventSequenceNo);
for (long i = 0; i < keyUniverseSize; i++) {
final Tuple2<Long, IntType> generatedEvent = new Tuple2<>(i, intTypeNext);
ctx.collectWithTimestamp(generatedEvent, eventSequenceNo);
}
ctx.emitWatermark(new Watermark(eventSequenceNo - watermarkTrailing));
}
}
private int numElementsPerKey() {
return 3000;
}
private int windowSize() {
return 1000;
}
private int windowSlide() {
return 100;
}
private int numKeys() {
return 100;
}
}