blob: 520fa399be4930d0371fb0ad75f33581ed7eb9a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing.utils;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.Collector;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
/**
* Migration ITCases for a stateful job. The tests are parameterized to cover migrating for multiple
* previous Flink versions, as well as for different state backends.
*/
@RunWith(Parameterized.class)
public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {
private static final int NUM_SOURCE_ELEMENTS = 4;
@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
public static Collection<Tuple2<MigrationVersion, String>> parameters() {
return Arrays.asList(
Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
}
/**
* TODO to generate savepoints for a specific Flink version / backend type, TODO change these
* values accordingly, e.g. to generate for 1.3 with RocksDB, TODO set as
* (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) TODO Note: You should
* generate the savepoint based on the release branch instead of the master.
*/
private final MigrationVersion flinkGenerateSavepointVersion = MigrationVersion.v1_4;
private final String flinkGenerateSavepointBackendType =
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
private final MigrationVersion testMigrateVersion;
private final String testStateBackend;
public LegacyStatefulJobSavepointMigrationITCase(
Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
this.testMigrateVersion = testMigrateVersionAndBackend.f0;
this.testStateBackend = testMigrateVersionAndBackend.f1;
}
/** Manually run this to write binary snapshot data. */
@Test
@Ignore
public void writeSavepoint() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
switch (flinkGenerateSavepointBackendType) {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
break;
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
env.setStateBackend(new MemoryStateBackend());
break;
default:
throw new UnsupportedOperationException();
}
env.enableCheckpointing(500);
env.setParallelism(4);
env.setMaxParallelism(4);
env.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS))
.setMaxParallelism(1)
.uid("LegacyCheckpointedSource")
.flatMap(new LegacyCheckpointedFlatMap())
.startNewChain()
.uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState())
.startNewChain()
.uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap())
.startNewChain()
.uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
"custom_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState()))
.uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new TimelyStatefulOperator())
.uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
executeAndSavepoint(
env,
"src/test/resources/"
+ getSavepointPath(
flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType),
new Tuple2<>(
AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
@Test
public void testSavepointRestore() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
switch (testStateBackend) {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
break;
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
env.setStateBackend(new MemoryStateBackend());
break;
default:
throw new UnsupportedOperationException();
}
env.enableCheckpointing(500);
env.setParallelism(4);
env.setMaxParallelism(4);
env.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS))
.setMaxParallelism(1)
.uid("LegacyCheckpointedSource")
.flatMap(new CheckingRestoringFlatMap())
.startNewChain()
.uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState())
.startNewChain()
.uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new CheckingKeyedStateFlatMap())
.startNewChain()
.uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
"custom_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckingRestoringUdfOperator(
new CheckingRestoringFlatMapWithKeyedStateInOperator()))
.uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckingTimelyStatefulOperator())
.uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
restoreAndExecute(
env,
getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
new Tuple2<>(
CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
CheckingRestoringFlatMapWithKeyedStateInOperator
.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
new Tuple2<>(
AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
private String getSavepointPath(MigrationVersion savepointVersion, String backendType) {
switch (backendType) {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
return "stateful-udf-migration-itcase-flink"
+ savepointVersion
+ "-rocksdb-savepoint";
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
return "stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint";
default:
throw new UnsupportedOperationException();
}
}
private static class LegacyCheckpointedSource implements SourceFunction<Tuple2<Long, Long>> {
public static String checkpointedString = "Here be dragons!";
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
private final int numElements;
public LegacyCheckpointedSource(int numElements) {
this.numElements = numElements;
}
@Override
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
ctx.emitWatermark(new Watermark(0));
synchronized (ctx.getCheckpointLock()) {
for (long i = 0; i < numElements; i++) {
ctx.collect(new Tuple2<>(i, i));
}
}
// don't emit a final watermark so that we don't trigger the registered event-time
// timers
while (isRunning) {
Thread.sleep(20);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
private static class CheckingRestoringSource extends RichSourceFunction<Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingRestoringSource.class + "_RESTORE_CHECK";
private volatile boolean isRunning = true;
private final int numElements;
private String restoredState;
public CheckingRestoringSource(int numElements) {
this.numElements = numElements;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext()
.addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
}
@Override
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
// immediately trigger any set timers
ctx.emitWatermark(new Watermark(1000));
synchronized (ctx.getCheckpointLock()) {
for (long i = 0; i < numElements; i++) {
ctx.collect(new Tuple2<>(i, i));
}
}
while (isRunning) {
Thread.sleep(20);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
private static class LegacyCheckpointedFlatMap
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static Tuple2<String, Long> checkpointedTuple = new Tuple2<>("hello", 42L);
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
throws Exception {
out.collect(value);
}
}
private static class CheckingRestoringFlatMap
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingRestoringFlatMap.class + "_RESTORE_CHECK";
private transient Tuple2<String, Long> restoredState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext()
.addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
throws Exception {
out.collect(value);
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}
}
private static class LegacyCheckpointedFlatMapWithKeyedState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static Tuple2<String, Long> checkpointedTuple = new Tuple2<>("hello", 42L);
private final ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
throws Exception {
out.collect(value);
getRuntimeContext().getState(stateDescriptor).update(value.f1);
assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value());
}
}
private static class CheckingRestoringFlatMapWithKeyedState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingRestoringFlatMapWithKeyedState.class + "_RESTORE_CHECK";
private transient Tuple2<String, Long> restoredState;
private final ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext()
.addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
throws Exception {
out.collect(value);
ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
if (state == null) {
throw new RuntimeException("Missing key value state for " + value);
}
assertEquals(value.f1, state.value());
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}
}
private static class CheckingRestoringFlatMapWithKeyedStateInOperator
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingRestoringFlatMapWithKeyedStateInOperator.class + "_RESTORE_CHECK";
private transient Tuple2<String, Long> restoredState;
private final ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext()
.addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
throws Exception {
out.collect(value);
ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
if (state == null) {
throw new RuntimeException("Missing key value state for " + value);
}
assertEquals(value.f1, state.value());
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}
}
private static class KeyedStateSettingFlatMap
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
private final ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
throws Exception {
out.collect(value);
getRuntimeContext().getState(stateDescriptor).update(value.f1);
}
}
private static class CheckingKeyedStateFlatMap
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK";
private final ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext()
.addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
throws Exception {
out.collect(value);
ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
if (state == null) {
throw new RuntimeException("Missing key value state for " + value);
}
assertEquals(value.f1, state.value());
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}
}
private static class CheckpointedUdfOperator
extends AbstractUdfStreamOperator<
Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
public CheckpointedUdfOperator(
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
super(userFunction);
}
@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
}
@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}
}
private static class CheckingRestoringUdfOperator
extends AbstractUdfStreamOperator<
Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingRestoringUdfOperator.class + "_RESTORE_CHECK";
private String restoredState;
public CheckingRestoringUdfOperator(
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
super(userFunction);
}
@Override
public void open() throws Exception {
super.open();
getRuntimeContext()
.addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
}
@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}
}
private static class TimelyStatefulOperator extends AbstractStreamOperator<Tuple2<Long, Long>>
implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>,
Triggerable<Long, Long> {
private static final long serialVersionUID = 1L;
private final ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
private transient InternalTimerService<Long> timerService;
@Override
public void open() throws Exception {
super.open();
timerService = getInternalTimerService("timer", LongSerializer.INSTANCE, this);
}
@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
ValueState<Long> state =
getKeyedStateBackend()
.getPartitionedState(
element.getValue().f0,
LongSerializer.INSTANCE,
stateDescriptor);
state.update(element.getValue().f1);
timerService.registerEventTimeTimer(
element.getValue().f0, timerService.currentWatermark() + 10);
timerService.registerProcessingTimeTimer(
element.getValue().f0, timerService.currentProcessingTime() + 30_000);
output.collect(element);
}
@Override
public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {}
@Override
public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {}
@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}
}
private static class CheckingTimelyStatefulOperator
extends AbstractStreamOperator<Tuple2<Long, Long>>
implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>,
Triggerable<Long, Long> {
private static final long serialVersionUID = 1L;
public static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR =
CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS";
public static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR =
CheckingTimelyStatefulOperator.class + "_ET_CHECKS";
public static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR =
CheckingTimelyStatefulOperator.class + "_PT_CHECKS";
private final ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
private transient InternalTimerService<Long> timerService;
@Override
public void open() throws Exception {
super.open();
timerService = getInternalTimerService("timer", LongSerializer.INSTANCE, this);
getRuntimeContext()
.addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter());
getRuntimeContext()
.addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter());
getRuntimeContext()
.addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter());
}
@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
ValueState<Long> state =
getKeyedStateBackend()
.getPartitionedState(
element.getValue().f0,
LongSerializer.INSTANCE,
stateDescriptor);
assertEquals(state.value(), element.getValue().f1);
getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1);
output.collect(element);
}
@Override
public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
ValueState<Long> state =
getKeyedStateBackend()
.getPartitionedState(
timer.getNamespace(), LongSerializer.INSTANCE, stateDescriptor);
assertEquals(state.value(), timer.getNamespace());
getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1);
}
@Override
public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
ValueState<Long> state =
getKeyedStateBackend()
.getPartitionedState(
timer.getNamespace(), LongSerializer.INSTANCE, stateDescriptor);
assertEquals(state.value(), timer.getNamespace());
getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1);
}
}
private static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
private static final long serialVersionUID = 1L;
public static final String NUM_ELEMENTS_ACCUMULATOR =
AccumulatorCountingSink.class + "_NUM_ELEMENTS";
int count = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, new IntCounter());
}
@Override
public void invoke(T value) throws Exception {
count++;
getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
}
}
}