| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.api.operators; |
| |
| import org.apache.flink.api.common.state.ValueStateDescriptor; |
| import org.apache.flink.api.common.typeinfo.BasicTypeInfo; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.base.StringSerializer; |
| import org.apache.flink.api.java.functions.KeySelector; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; |
| import org.apache.flink.runtime.state.KeyGroupRange; |
| import org.apache.flink.runtime.state.KeyGroupRangeAssignment; |
| import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; |
| import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; |
| import org.apache.flink.runtime.state.StateInitializationContext; |
| import org.apache.flink.runtime.state.StateSnapshotContext; |
| import org.apache.flink.runtime.state.VoidNamespace; |
| import org.apache.flink.runtime.state.VoidNamespaceSerializer; |
| import org.apache.flink.streaming.api.watermark.Watermark; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; |
| import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; |
| import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; |
| import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; |
| import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; |
| import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; |
| import org.apache.flink.streaming.util.TestHarnessUtil; |
| import org.apache.flink.util.Preconditions; |
| |
| import org.hamcrest.Description; |
| import org.hamcrest.Matcher; |
| import org.hamcrest.TypeSafeMatcher; |
| import org.junit.Test; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| |
| import static junit.framework.TestCase.assertTrue; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.contains; |
| import static org.hamcrest.Matchers.empty; |
| |
| /** |
| * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly tests timers and |
| * state and whether they are correctly checkpointed/restored with key-group reshuffling. |
| */ |
| public class AbstractStreamOperatorTest { |
| protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| createTestHarness() throws Exception { |
| return createTestHarness(1, 1, 0); |
| } |
| |
| protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex) |
| throws Exception { |
| TestOperator testOperator = new TestOperator(); |
| return new KeyedOneInputStreamOperatorTestHarness<>( |
| testOperator, |
| new TestKeySelector(), |
| BasicTypeInfo.INT_TYPE_INFO, |
| maxParalelism, |
| numSubtasks, |
| subtaskIndex); |
| } |
| |
| protected <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> createTestHarness( |
| int maxParalelism, |
| int numSubtasks, |
| int subtaskIndex, |
| OneInputStreamOperator<IN, OUT> testOperator, |
| KeySelector<IN, K> keySelector, |
| TypeInformation<K> keyTypeInfo) |
| throws Exception { |
| return new KeyedOneInputStreamOperatorTestHarness<>( |
| testOperator, keySelector, keyTypeInfo, maxParalelism, numSubtasks, subtaskIndex); |
| } |
| |
| @Test |
| public void testStateDoesNotInterfere() throws Exception { |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness = createTestHarness()) { |
| testHarness.open(); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); |
| testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0); |
| |
| testHarness.processElement(new Tuple2<>(1, "EMIT_STATE"), 0); |
| testHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0); |
| |
| assertThat( |
| extractResult(testHarness), |
| contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO")); |
| } |
| } |
| |
| /** |
| * Verify that firing event-time timers see the state of the key that was active when the timer |
| * was set. |
| */ |
| @Test |
| public void testEventTimeTimersDontInterfere() throws Exception { |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness = createTestHarness()) { |
| testHarness.open(); |
| |
| testHarness.processWatermark(0L); |
| |
| testHarness.processElement(new Tuple2<>(1, "SET_EVENT_TIME_TIMER:20"), 0); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); |
| testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:10"), 0); |
| |
| testHarness.processWatermark(10L); |
| |
| assertThat(extractResult(testHarness), contains("ON_EVENT_TIME:HELLO")); |
| |
| testHarness.processWatermark(20L); |
| |
| assertThat(extractResult(testHarness), contains("ON_EVENT_TIME:CIAO")); |
| } |
| } |
| |
| /** |
| * Verify that firing processing-time timers see the state of the key that was active when the |
| * timer was set. |
| */ |
| @Test |
| public void testProcessingTimeTimersDontInterfere() throws Exception { |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness = createTestHarness()) { |
| testHarness.open(); |
| |
| testHarness.setProcessingTime(0L); |
| |
| testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); |
| testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0); |
| |
| testHarness.setProcessingTime(10L); |
| |
| assertThat(extractResult(testHarness), contains("ON_PROC_TIME:HELLO")); |
| |
| testHarness.setProcessingTime(20L); |
| |
| assertThat(extractResult(testHarness), contains("ON_PROC_TIME:CIAO")); |
| } |
| } |
| |
| /** Verify that a low-level timer is set for processing-time timers in case of restore. */ |
| @Test |
| public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception { |
| OperatorSubtaskState snapshot; |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness = createTestHarness()) { |
| testHarness.open(); |
| |
| testHarness.setProcessingTime(0L); |
| |
| testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); |
| testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0); |
| |
| snapshot = testHarness.snapshot(0, 0); |
| } |
| |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness1 = createTestHarness()) { |
| testHarness1.setProcessingTime(0L); |
| |
| testHarness1.setup(); |
| testHarness1.initializeState(snapshot); |
| testHarness1.open(); |
| |
| testHarness1.setProcessingTime(10L); |
| |
| assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:HELLO")); |
| |
| testHarness1.setProcessingTime(20L); |
| |
| assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:CIAO")); |
| } |
| } |
| |
| /** Verify that timers for the different time domains don't clash. */ |
| @Test |
| public void testProcessingTimeAndEventTimeDontInterfere() throws Exception { |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness = createTestHarness()) { |
| testHarness.open(); |
| |
| testHarness.setProcessingTime(0L); |
| testHarness.processWatermark(0L); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0); |
| testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:20"), 0); |
| |
| testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); |
| |
| testHarness.processWatermark(20L); |
| |
| assertThat(extractResult(testHarness), contains("ON_EVENT_TIME:HELLO")); |
| |
| testHarness.setProcessingTime(10L); |
| |
| assertThat(extractResult(testHarness), contains("ON_PROC_TIME:HELLO")); |
| } |
| } |
| |
| /** |
| * Verify that state and timers are checkpointed per key group and that they are correctly |
| * assigned to operator subtasks when restoring. |
| */ |
| @Test |
| public void testStateAndTimerStateShufflingScalingUp() throws Exception { |
| final int maxParallelism = 10; |
| |
| // first get two keys that will fall into different key-group ranges that go |
| // to different operator subtasks when we restore |
| |
| // get two sub key-ranges so that we can restore two ranges separately |
| KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (maxParallelism / 2) - 1); |
| KeyGroupRange subKeyGroupRange2 = |
| new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, maxParallelism - 1); |
| |
| // get two different keys, one per sub range |
| int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism); |
| int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism); |
| |
| OperatorSubtaskState snapshot; |
| |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness = createTestHarness(maxParallelism, 1, 0)) { |
| testHarness.open(); |
| |
| testHarness.processWatermark(0L); |
| testHarness.setProcessingTime(0L); |
| |
| testHarness.processElement(new Tuple2<>(key1, "SET_EVENT_TIME_TIMER:10"), 0); |
| testHarness.processElement(new Tuple2<>(key2, "SET_EVENT_TIME_TIMER:20"), 0); |
| |
| testHarness.processElement(new Tuple2<>(key1, "SET_PROC_TIME_TIMER:10"), 0); |
| testHarness.processElement(new Tuple2<>(key2, "SET_PROC_TIME_TIMER:20"), 0); |
| |
| testHarness.processElement(new Tuple2<>(key1, "SET_STATE:HELLO"), 0); |
| testHarness.processElement(new Tuple2<>(key2, "SET_STATE:CIAO"), 0); |
| |
| assertTrue(extractResult(testHarness).isEmpty()); |
| |
| snapshot = testHarness.snapshot(0, 0); |
| } |
| |
| // now, restore in two operators, first operator 1 |
| OperatorSubtaskState initState1 = |
| AbstractStreamOperatorTestHarness.repartitionOperatorState( |
| snapshot, maxParallelism, 1, 2, 0); |
| |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness1 = createTestHarness(maxParallelism, 2, 0)) { |
| testHarness1.setup(); |
| testHarness1.initializeState(initState1); |
| testHarness1.open(); |
| |
| testHarness1.processWatermark(10L); |
| |
| assertThat(extractResult(testHarness1), contains("ON_EVENT_TIME:HELLO")); |
| |
| assertTrue(extractResult(testHarness1).isEmpty()); |
| |
| // this should not trigger anything, the trigger for WM=20 should sit in the |
| // other operator subtask |
| testHarness1.processWatermark(20L); |
| |
| assertTrue(extractResult(testHarness1).isEmpty()); |
| |
| testHarness1.setProcessingTime(10L); |
| |
| assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:HELLO")); |
| |
| assertTrue(extractResult(testHarness1).isEmpty()); |
| |
| // this should not trigger anything, the trigger for TIME=20 should sit in the |
| // other operator subtask |
| testHarness1.setProcessingTime(20L); |
| |
| assertTrue(extractResult(testHarness1).isEmpty()); |
| } |
| |
| // now, for the second operator |
| OperatorSubtaskState initState2 = |
| AbstractStreamOperatorTestHarness.repartitionOperatorState( |
| snapshot, maxParallelism, 1, 2, 1); |
| |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness2 = createTestHarness(maxParallelism, 2, 1)) { |
| testHarness2.setup(); |
| testHarness2.initializeState(initState2); |
| testHarness2.open(); |
| |
| testHarness2.processWatermark(10L); |
| |
| // nothing should happen because this timer is in the other subtask |
| assertTrue(extractResult(testHarness2).isEmpty()); |
| |
| testHarness2.processWatermark(20L); |
| |
| assertThat(extractResult(testHarness2), contains("ON_EVENT_TIME:CIAO")); |
| |
| testHarness2.setProcessingTime(10L); |
| |
| // nothing should happen because this timer is in the other subtask |
| assertTrue(extractResult(testHarness2).isEmpty()); |
| |
| testHarness2.setProcessingTime(20L); |
| |
| assertThat(extractResult(testHarness2), contains("ON_PROC_TIME:CIAO")); |
| |
| assertTrue(extractResult(testHarness2).isEmpty()); |
| } |
| } |
| |
| @Test |
| public void testStateAndTimerStateShufflingScalingDown() throws Exception { |
| final int maxParallelism = 10; |
| |
| // first get two keys that will fall into different key-group ranges that go |
| // to different operator subtasks when we restore |
| |
| // get two sub key-ranges so that we can restore two ranges separately |
| KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (maxParallelism / 2) - 1); |
| KeyGroupRange subKeyGroupRange2 = |
| new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, maxParallelism - 1); |
| |
| // get two different keys, one per sub range |
| int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism); |
| int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism); |
| |
| OperatorSubtaskState snapshot1, snapshot2; |
| // register some state with both instances and scale down to parallelism 1 |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness1 = createTestHarness(maxParallelism, 2, 0)) { |
| |
| testHarness1.setup(); |
| testHarness1.open(); |
| |
| testHarness1.processWatermark(0L); |
| testHarness1.setProcessingTime(0L); |
| |
| testHarness1.processElement(new Tuple2<>(key1, "SET_EVENT_TIME_TIMER:30"), 0); |
| testHarness1.processElement(new Tuple2<>(key1, "SET_PROC_TIME_TIMER:30"), 0); |
| testHarness1.processElement(new Tuple2<>(key1, "SET_STATE:HELLO"), 0); |
| |
| snapshot1 = testHarness1.snapshot(0, 0); |
| } |
| |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness2 = createTestHarness(maxParallelism, 2, 1)) { |
| testHarness2.setup(); |
| testHarness2.open(); |
| |
| testHarness2.processWatermark(0L); |
| testHarness2.setProcessingTime(0L); |
| |
| testHarness2.processElement(new Tuple2<>(key2, "SET_EVENT_TIME_TIMER:40"), 0); |
| testHarness2.processElement(new Tuple2<>(key2, "SET_PROC_TIME_TIMER:40"), 0); |
| testHarness2.processElement(new Tuple2<>(key2, "SET_STATE:CIAO"), 0); |
| |
| snapshot2 = testHarness2.snapshot(0, 0); |
| } |
| // take a snapshot from each one of the "parallel" instances of the operator |
| // and combine them into one so that we can scale down |
| |
| OperatorSubtaskState repackagedState = |
| AbstractStreamOperatorTestHarness.repackageState(snapshot1, snapshot2); |
| |
| OperatorSubtaskState initSubTaskState = |
| AbstractStreamOperatorTestHarness.repartitionOperatorState( |
| repackagedState, maxParallelism, 2, 1, 0); |
| |
| try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> |
| testHarness3 = createTestHarness(maxParallelism, 1, 0)) { |
| testHarness3.setup(); |
| testHarness3.initializeState(initSubTaskState); |
| testHarness3.open(); |
| |
| testHarness3.processWatermark(30L); |
| assertThat(extractResult(testHarness3), contains("ON_EVENT_TIME:HELLO")); |
| assertTrue(extractResult(testHarness3).isEmpty()); |
| |
| testHarness3.processWatermark(40L); |
| assertThat(extractResult(testHarness3), contains("ON_EVENT_TIME:CIAO")); |
| assertTrue(extractResult(testHarness3).isEmpty()); |
| |
| testHarness3.setProcessingTime(30L); |
| assertThat(extractResult(testHarness3), contains("ON_PROC_TIME:HELLO")); |
| assertTrue(extractResult(testHarness3).isEmpty()); |
| |
| testHarness3.setProcessingTime(40L); |
| assertThat(extractResult(testHarness3), contains("ON_PROC_TIME:CIAO")); |
| assertTrue(extractResult(testHarness3).isEmpty()); |
| } |
| } |
| |
| @Test |
| public void testCustomRawKeyedStateSnapshotAndRestore() throws Exception { |
| // setup: 10 key groups, all assigned to single subtask |
| final int maxParallelism = 10; |
| final int numSubtasks = 1; |
| final int subtaskIndex = 0; |
| final List<Integer> keyGroupsToWrite = Arrays.asList(2, 3, 8); |
| |
| final byte[] testSnapshotData = "TEST".getBytes(); |
| final CustomRawKeyedStateTestOperator testOperator = |
| new CustomRawKeyedStateTestOperator(testSnapshotData, keyGroupsToWrite); |
| |
| // snapshot and then restore |
| OperatorSubtaskState snapshot; |
| try (KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = |
| createTestHarness( |
| maxParallelism, |
| numSubtasks, |
| subtaskIndex, |
| testOperator, |
| input -> input, |
| BasicTypeInfo.STRING_TYPE_INFO)) { |
| testHarness.setup(); |
| testHarness.open(); |
| snapshot = testHarness.snapshot(0, 0); |
| } |
| |
| try (KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = |
| createTestHarness( |
| maxParallelism, |
| numSubtasks, |
| subtaskIndex, |
| testOperator, |
| input -> input, |
| BasicTypeInfo.STRING_TYPE_INFO)) { |
| testHarness.setup(); |
| testHarness.initializeState(snapshot); |
| testHarness.open(); |
| } |
| |
| assertThat( |
| testOperator.restoredRawKeyedState, |
| hasRestoredKeyGroupsWith(testSnapshotData, keyGroupsToWrite)); |
| } |
| |
| @Test |
| public void testIdleWatermarkHandling() throws Exception { |
| final WatermarkTestingOperator testOperator = new WatermarkTestingOperator(); |
| |
| ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); |
| KeySelector<Long, Integer> dummyKeySelector = l -> 0; |
| try (KeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness = |
| new KeyedTwoInputStreamOperatorTestHarness<>( |
| testOperator, |
| dummyKeySelector, |
| dummyKeySelector, |
| BasicTypeInfo.INT_TYPE_INFO)) { |
| testHarness.setup(); |
| testHarness.open(); |
| testHarness.processElement1(1L, 1L); |
| testHarness.processElement1(3L, 3L); |
| testHarness.processElement1(4L, 4L); |
| testHarness.processWatermark1(new Watermark(1L)); |
| assertThat(testHarness.getOutput(), empty()); |
| |
| testHarness.processWatermarkStatus2(WatermarkStatus.IDLE); |
| expectedOutput.add(new StreamRecord<>(1L)); |
| expectedOutput.add(new Watermark(1L)); |
| TestHarnessUtil.assertOutputEquals( |
| "Output was not correct", expectedOutput, testHarness.getOutput()); |
| |
| testHarness.processWatermark1(new Watermark(3L)); |
| expectedOutput.add(new StreamRecord<>(3L)); |
| expectedOutput.add(new Watermark(3L)); |
| TestHarnessUtil.assertOutputEquals( |
| "Output was not correct", expectedOutput, testHarness.getOutput()); |
| |
| testHarness.processWatermarkStatus2(WatermarkStatus.ACTIVE); |
| // the other input is active now, we should not emit the watermark |
| testHarness.processWatermark1(new Watermark(4L)); |
| TestHarnessUtil.assertOutputEquals( |
| "Output was not correct", expectedOutput, testHarness.getOutput()); |
| } |
| } |
| |
| @Test |
| public void testIdlenessForwarding() throws Exception { |
| final WatermarkTestingOperator testOperator = new WatermarkTestingOperator(); |
| ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); |
| KeySelector<Long, Integer> dummyKeySelector = l -> 0; |
| try (KeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness = |
| new KeyedTwoInputStreamOperatorTestHarness<>( |
| testOperator, |
| dummyKeySelector, |
| dummyKeySelector, |
| BasicTypeInfo.INT_TYPE_INFO)) { |
| testHarness.setup(); |
| testHarness.open(); |
| |
| testHarness.processWatermarkStatus1(WatermarkStatus.IDLE); |
| testHarness.processWatermarkStatus2(WatermarkStatus.IDLE); |
| expectedOutput.add(WatermarkStatus.IDLE); |
| TestHarnessUtil.assertOutputEquals( |
| "Output was not correct", expectedOutput, testHarness.getOutput()); |
| } |
| } |
| |
| /** Extracts the result values form the test harness and clear the output queue. */ |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) { |
| List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords(); |
| List<T> result = new ArrayList<>(); |
| for (Object in : streamRecords) { |
| if (in instanceof StreamRecord) { |
| result.add((T) ((StreamRecord) in).getValue()); |
| } |
| } |
| testHarness.getOutput().clear(); |
| return result; |
| } |
| |
| /** {@link KeySelector} for tests. */ |
| protected static class TestKeySelector |
| implements KeySelector<Tuple2<Integer, String>, Integer> { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Integer getKey(Tuple2<Integer, String> value) throws Exception { |
| return value.f0; |
| } |
| } |
| |
| private static class WatermarkTestingOperator extends AbstractStreamOperator<Long> |
| implements TwoInputStreamOperator<Long, Long, Long>, |
| Triggerable<Integer, VoidNamespace> { |
| |
| private transient InternalTimerService<VoidNamespace> timerService; |
| |
| @Override |
| public void open() throws Exception { |
| super.open(); |
| |
| this.timerService = |
| getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this); |
| } |
| |
| @Override |
| public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception { |
| output.collect(new StreamRecord<>(timer.getTimestamp())); |
| } |
| |
| @Override |
| public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) |
| throws Exception {} |
| |
| @Override |
| public void processElement1(StreamRecord<Long> element) throws Exception { |
| timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue()); |
| } |
| |
| @Override |
| public void processElement2(StreamRecord<Long> element) throws Exception { |
| timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue()); |
| } |
| } |
| |
| /** |
| * Testing operator that can respond to commands by either setting/deleting state, emitting |
| * state or setting timers. |
| */ |
| private static class TestOperator extends AbstractStreamOperator<String> |
| implements OneInputStreamOperator<Tuple2<Integer, String>, String>, |
| Triggerable<Integer, VoidNamespace> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private transient InternalTimerService<VoidNamespace> timerService; |
| |
| private final ValueStateDescriptor<String> stateDescriptor = |
| new ValueStateDescriptor<>("state", StringSerializer.INSTANCE); |
| |
| @Override |
| public void open() throws Exception { |
| super.open(); |
| |
| this.timerService = |
| getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this); |
| } |
| |
| @Override |
| public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception { |
| String[] command = element.getValue().f1.split(":"); |
| switch (command[0]) { |
| case "SET_STATE": |
| getPartitionedState(stateDescriptor).update(command[1]); |
| break; |
| case "DELETE_STATE": |
| getPartitionedState(stateDescriptor).clear(); |
| break; |
| case "SET_EVENT_TIME_TIMER": |
| timerService.registerEventTimeTimer( |
| VoidNamespace.INSTANCE, Long.parseLong(command[1])); |
| break; |
| case "SET_PROC_TIME_TIMER": |
| timerService.registerProcessingTimeTimer( |
| VoidNamespace.INSTANCE, Long.parseLong(command[1])); |
| break; |
| case "EMIT_STATE": |
| String stateValue = getPartitionedState(stateDescriptor).value(); |
| output.collect( |
| new StreamRecord<>( |
| "ON_ELEMENT:" + element.getValue().f0 + ":" + stateValue)); |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| } |
| |
| @Override |
| public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception { |
| String stateValue = getPartitionedState(stateDescriptor).value(); |
| output.collect(new StreamRecord<>("ON_EVENT_TIME:" + stateValue)); |
| } |
| |
| @Override |
| public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception { |
| String stateValue = getPartitionedState(stateDescriptor).value(); |
| output.collect(new StreamRecord<>("ON_PROC_TIME:" + stateValue)); |
| } |
| } |
| |
| /** Operator that writes arbitrary bytes to raw keyed state on snapshots. */ |
| private static class CustomRawKeyedStateTestOperator extends AbstractStreamOperator<String> |
| implements OneInputStreamOperator<String, String> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private final byte[] snapshotBytes; |
| private final List<Integer> keyGroupsToWrite; |
| |
| private Map<Integer, byte[]> restoredRawKeyedState; |
| |
| CustomRawKeyedStateTestOperator(byte[] snapshotBytes, List<Integer> keyGroupsToWrite) { |
| this.snapshotBytes = Arrays.copyOf(snapshotBytes, snapshotBytes.length); |
| this.keyGroupsToWrite = Preconditions.checkNotNull(keyGroupsToWrite); |
| } |
| |
| @Override |
| public void processElement(StreamRecord<String> element) throws Exception { |
| // do nothing |
| } |
| |
| @Override |
| protected boolean isUsingCustomRawKeyedState() { |
| return true; |
| } |
| |
| @Override |
| public void snapshotState(StateSnapshotContext context) throws Exception { |
| super.snapshotState(context); |
| KeyedStateCheckpointOutputStream rawKeyedStateStream = |
| context.getRawKeyedOperatorStateOutput(); |
| for (int keyGroupId : keyGroupsToWrite) { |
| rawKeyedStateStream.startNewKeyGroup(keyGroupId); |
| rawKeyedStateStream.write(snapshotBytes); |
| } |
| rawKeyedStateStream.close(); |
| } |
| |
| @Override |
| public void initializeState(StateInitializationContext context) throws Exception { |
| super.initializeState(context); |
| |
| restoredRawKeyedState = new HashMap<>(); |
| for (KeyGroupStatePartitionStreamProvider streamProvider : |
| context.getRawKeyedStateInputs()) { |
| byte[] readBuffer = new byte[snapshotBytes.length]; |
| int ignored = streamProvider.getStream().read(readBuffer); |
| restoredRawKeyedState.put(streamProvider.getKeyGroupId(), readBuffer); |
| } |
| } |
| } |
| |
| private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) { |
| Random rand = new Random(System.currentTimeMillis()); |
| int result = rand.nextInt(); |
| while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism))) { |
| result = rand.nextInt(); |
| } |
| return result; |
| } |
| |
| private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith( |
| byte[] testSnapshotData, List<Integer> writtenKeyGroups) { |
| return new TypeSafeMatcher<Map<Integer, byte[]>>() { |
| @Override |
| protected boolean matchesSafely(Map<Integer, byte[]> restored) { |
| if (restored.size() != writtenKeyGroups.size()) { |
| return false; |
| } |
| |
| for (int writtenKeyGroupId : writtenKeyGroups) { |
| if (!Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| @Override |
| public void describeTo(Description description) { |
| description.appendText( |
| "Key groups: " |
| + writtenKeyGroups |
| + " with snapshot data " |
| + Arrays.toString(testSnapshotData)); |
| } |
| }; |
| } |
| } |