| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.test.streaming.runtime; |
| |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.functions.MapFunction; |
| import org.apache.flink.api.common.functions.ReduceFunction; |
| import org.apache.flink.api.common.typeinfo.BasicTypeInfo; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.client.program.ClusterClient; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.TaskManagerOptions; |
| import org.apache.flink.core.testutils.MultiShotLatch; |
| import org.apache.flink.runtime.checkpoint.CheckpointException; |
| import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; |
| import org.apache.flink.runtime.client.JobStatusMessage; |
| import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; |
| import org.apache.flink.streaming.api.TimeCharacteristic; |
| import org.apache.flink.streaming.api.datastream.DataStream; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; |
| import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; |
| import org.apache.flink.streaming.api.functions.co.CoMapFunction; |
| import org.apache.flink.streaming.api.functions.sink.DiscardingSink; |
| import org.apache.flink.streaming.api.functions.source.SourceFunction; |
| import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; |
| import org.apache.flink.streaming.api.operators.AbstractStreamOperator; |
| import org.apache.flink.streaming.api.operators.ChainingStrategy; |
| import org.apache.flink.streaming.api.operators.OneInputStreamOperator; |
| import org.apache.flink.streaming.api.watermark.Watermark; |
| import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; |
| import org.apache.flink.streaming.api.windowing.time.Time; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; |
| import org.apache.flink.test.util.MiniClusterWithClientResource; |
| import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.TestLogger; |
| |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Tests for timestamps, watermarks, and event-time sources. |
| */ |
| @SuppressWarnings("serial") |
| @Category(AlsoRunWithSchedulerNG.class) |
| public class TimestampITCase extends TestLogger { |
| |
| private static final int NUM_TASK_MANAGERS = 2; |
| private static final int NUM_TASK_SLOTS = 3; |
| private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; |
| |
| // this is used in some tests to synchronize |
| static MultiShotLatch latch; |
| |
| @ClassRule |
| public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( |
| new MiniClusterResourceConfiguration.Builder() |
| .setConfiguration(getConfiguration()) |
| .setNumberTaskManagers(NUM_TASK_MANAGERS) |
| .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) |
| .build()); |
| |
| private static Configuration getConfiguration() { |
| Configuration config = new Configuration(); |
| config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "12m"); |
| return config; |
| } |
| |
| @Before |
| public void setupLatch() { |
| // ensure that we get a fresh latch for each test |
| latch = new MultiShotLatch(); |
| } |
| |
| /** |
| * These check whether custom timestamp emission works at sources and also whether timestamps |
| * arrive at operators throughout a topology. |
| * |
| * <p>This also checks whether watermarks keep propagating if a source closes early. |
| * |
| * <p>This only uses map to test the workings of watermarks in a complete, running topology. All |
| * tasks and stream operators have dedicated tests that test the watermark propagation |
| * behaviour. |
| */ |
| @Test |
| public void testWatermarkPropagation() throws Exception { |
| final int numWatermarks = 10; |
| |
| long initialTime = 0L; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.setParallelism(PARALLELISM); |
| |
| DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, numWatermarks)); |
| DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, numWatermarks / 2)); |
| |
| source1.union(source2) |
| .map(new IdentityMap()) |
| .connect(source2).map(new IdentityCoMap()) |
| .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) |
| .addSink(new DiscardingSink<Integer>()); |
| |
| env.execute(); |
| |
| // verify that all the watermarks arrived at the final custom operator |
| for (int i = 0; i < PARALLELISM; i++) { |
| // we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the |
| // other source stops emitting after that |
| for (int j = 0; j < numWatermarks / 2; j++) { |
| if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) { |
| System.err.println("All Watermarks: "); |
| for (int k = 0; k <= numWatermarks / 2; k++) { |
| System.err.println(CustomOperator.finalWatermarks[i].get(k)); |
| } |
| |
| fail("Wrong watermark."); |
| } |
| } |
| |
| assertEquals(Watermark.MAX_WATERMARK, |
| CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size() - 1)); |
| } |
| } |
| |
| @Test |
| public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception { |
| |
| // for this test to work, we need to be sure that no other jobs are being executed |
| final ClusterClient<?> clusterClient = CLUSTER.getClusterClient(); |
| while (!getRunningJobs(clusterClient).isEmpty()) { |
| Thread.sleep(100); |
| } |
| |
| final int numWatermarks = 10; |
| |
| long initialTime = 0L; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.setParallelism(PARALLELISM); |
| |
| DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, numWatermarks)); |
| DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, numWatermarks / 2)); |
| |
| source1.union(source2) |
| .map(new IdentityMap()) |
| .connect(source2).map(new IdentityCoMap()) |
| .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) |
| .addSink(new DiscardingSink<Integer>()); |
| |
| Thread t = new Thread("stopper") { |
| @Override |
| public void run() { |
| try { |
| // try until we get the running jobs |
| List<JobID> running = getRunningJobs(clusterClient); |
| while (running.isEmpty()) { |
| Thread.sleep(10); |
| running = getRunningJobs(clusterClient); |
| } |
| |
| JobID id = running.get(0); |
| |
| // send stop until the job is stopped |
| do { |
| try { |
| clusterClient.stopWithSavepoint(id, false, "test"); |
| } |
| catch (Exception e) { |
| boolean ignoreException = ExceptionUtils.findThrowable(e, CheckpointException.class) |
| .map(CheckpointException::getCheckpointFailureReason) |
| .map(reason -> reason == CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING) |
| .orElse(false); |
| if (!ignoreException) { |
| throw e; |
| } |
| } |
| Thread.sleep(10); |
| } |
| while (!getRunningJobs(clusterClient).isEmpty()); |
| } |
| catch (Throwable t) { |
| t.printStackTrace(); |
| } |
| } |
| }; |
| t.start(); |
| |
| env.execute(); |
| |
| // verify that all the watermarks arrived at the final custom operator |
| for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) { |
| |
| // we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the |
| // other source stops emitting after that |
| for (int j = 0; j < subtaskWatermarks.size(); j++) { |
| if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) { |
| System.err.println("All Watermarks: "); |
| for (int k = 0; k <= numWatermarks / 2; k++) { |
| System.err.println(subtaskWatermarks.get(k)); |
| } |
| |
| fail("Wrong watermark."); |
| } |
| } |
| |
| // if there are watermarks, the final one must not be the MAX watermark |
| if (subtaskWatermarks.size() > 0) { |
| assertNotEquals(Watermark.MAX_WATERMARK, |
| subtaskWatermarks.get(subtaskWatermarks.size() - 1)); |
| } |
| } |
| t.join(); |
| } |
| |
| /** |
| * These check whether timestamps are properly assigned at the sources and handled in |
| * network transmission and between chained operators when timestamps are enabled. |
| */ |
| @Test |
| public void testTimestampHandling() throws Exception { |
| final int numElements = 10; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.setParallelism(PARALLELISM); |
| |
| DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, numElements)); |
| DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, numElements)); |
| |
| source1 |
| .map(new IdentityMap()) |
| .connect(source2).map(new IdentityCoMap()) |
| .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()) |
| .addSink(new DiscardingSink<Integer>()); |
| |
| env.execute(); |
| } |
| |
| /** |
| * These check whether timestamps are properly ignored when they are disabled. |
| */ |
| @Test |
| public void testDisabledTimestamps() throws Exception { |
| final int numElements = 10; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); |
| env.setParallelism(PARALLELISM); |
| |
| DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(numElements)); |
| DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(numElements)); |
| |
| source1 |
| .map(new IdentityMap()) |
| .connect(source2).map(new IdentityCoMap()) |
| .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator()) |
| .addSink(new DiscardingSink<Integer>()); |
| |
| env.execute(); |
| } |
| |
| /** |
| * This tests whether timestamps are properly extracted in the timestamp |
| * extractor and whether watermarks are also correctly forwarded from this with the auto watermark |
| * interval. |
| */ |
| @Test |
| public void testTimestampExtractorWithAutoInterval() throws Exception { |
| final int numElements = 10; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.getConfig().setAutoWatermarkInterval(10); |
| env.setParallelism(1); |
| |
| DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| int index = 1; |
| while (index <= numElements) { |
| ctx.collect(index); |
| latch.await(); |
| index++; |
| } |
| } |
| |
| @Override |
| public void cancel() {} |
| }); |
| |
| DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks( |
| new AscendingTimestampExtractor<Integer>() { |
| @Override |
| public long extractAscendingTimestamp(Integer element) { |
| return element; |
| } |
| }); |
| |
| extractOp |
| .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) |
| .transform("Timestamp Check", |
| BasicTypeInfo.INT_TYPE_INFO, |
| new TimestampCheckingOperator()); |
| |
| // verify that extractor picks up source parallelism |
| Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism()); |
| |
| env.execute(); |
| |
| // verify that we get NUM_ELEMENTS watermarks |
| for (int j = 0; j < numElements; j++) { |
| if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { |
| long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp(); |
| Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]); |
| } |
| } |
| |
| // the input is finite, so it should have a MAX Watermark |
| assertEquals(Watermark.MAX_WATERMARK, |
| CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); |
| } |
| |
| /** |
| * This tests whether timestamps are properly extracted in the timestamp |
| * extractor and whether watermark are correctly forwarded from the custom watermark emit |
| * function. |
| */ |
| @Test |
| public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception { |
| final int numElements = 10; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.getConfig().setAutoWatermarkInterval(10); |
| env.setParallelism(1); |
| |
| DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| int index = 1; |
| while (index <= numElements) { |
| ctx.collect(index); |
| latch.await(); |
| index++; |
| } |
| } |
| |
| @Override |
| public void cancel() {} |
| }); |
| |
| source1 |
| .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { |
| |
| @Override |
| public long extractTimestamp(Integer element, long currentTimestamp) { |
| return element; |
| } |
| |
| @Override |
| public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) { |
| return new Watermark(extractedTimestamp - 1); |
| } |
| }) |
| .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) |
| .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); |
| |
| env.execute(); |
| |
| // verify that we get NUM_ELEMENTS watermarks |
| for (int j = 0; j < numElements; j++) { |
| if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { |
| Assert.fail("Wrong watermark."); |
| } |
| } |
| |
| // the input is finite, so it should have a MAX Watermark |
| assertEquals(Watermark.MAX_WATERMARK, |
| CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); |
| } |
| |
| /** |
| * This test verifies that the timestamp extractor does not emit decreasing watermarks. |
| */ |
| @Test |
| public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception { |
| final int numElements = 10; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.getConfig().setAutoWatermarkInterval(1); |
| env.setParallelism(1); |
| |
| DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| int index = 1; |
| while (index <= numElements) { |
| ctx.collect(index); |
| Thread.sleep(100); |
| ctx.collect(index - 1); |
| latch.await(); |
| index++; |
| } |
| } |
| |
| @Override |
| public void cancel() {} |
| }); |
| |
| source1 |
| .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { |
| |
| @Override |
| public long extractTimestamp(Integer element, long previousTimestamp) { |
| return element; |
| } |
| |
| @Override |
| public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) { |
| return new Watermark(extractedTimestamp - 1); |
| } |
| }) |
| .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) |
| .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); |
| |
| env.execute(); |
| |
| // verify that we get NUM_ELEMENTS watermarks |
| for (int j = 0; j < numElements; j++) { |
| if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { |
| Assert.fail("Wrong watermark."); |
| } |
| } |
| // the input is finite, so it should have a MAX Watermark |
| assertEquals(Watermark.MAX_WATERMARK, |
| CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); |
| } |
| |
| /** |
| * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks. |
| */ |
| @Test |
| public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception { |
| final int numElements = 10; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.getConfig().setAutoWatermarkInterval(1); |
| env.setParallelism(2); |
| |
| DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| int index = 1; |
| while (index <= numElements) { |
| ctx.collectWithTimestamp(index, index); |
| ctx.collectWithTimestamp(index - 1, index - 1); |
| index++; |
| ctx.emitWatermark(new Watermark(index - 2)); |
| } |
| |
| // emit the final Long.MAX_VALUE watermark, do it twice and verify that |
| // we only see one in the result |
| ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); |
| ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); |
| } |
| |
| @Override |
| public void cancel() {} |
| }); |
| |
| source1 |
| .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { |
| |
| @Override |
| public long extractTimestamp(Integer element, long currentTimestamp) { |
| return element; |
| } |
| |
| @Override |
| public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) { |
| return null; |
| } |
| }) |
| .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); |
| |
| env.execute(); |
| |
| Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); |
| Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); |
| } |
| |
| /** |
| * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks. |
| * |
| * <p>Same test as before, but using a different timestamp extractor. |
| */ |
| @Test |
| public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception { |
| final int numElements = 10; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment |
| .getExecutionEnvironment(); |
| |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| env.getConfig().setAutoWatermarkInterval(10); |
| env.setParallelism(2); |
| |
| DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| int index = 1; |
| while (index <= numElements) { |
| ctx.collectWithTimestamp(index, index); |
| ctx.collectWithTimestamp(index - 1, index - 1); |
| index++; |
| ctx.emitWatermark(new Watermark(index - 2)); |
| } |
| |
| // emit the final Long.MAX_VALUE watermark, do it twice and verify that |
| // we only see one in the result |
| ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); |
| ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); |
| } |
| |
| @Override |
| public void cancel() {} |
| }); |
| |
| source1 |
| .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Integer>() { |
| |
| @Override |
| public long extractTimestamp(Integer element, long currentTimestamp) { |
| return element; |
| } |
| |
| @Override |
| public Watermark getCurrentWatermark() { |
| return null; |
| } |
| }) |
| .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); |
| |
| env.execute(); |
| |
| Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); |
| Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); |
| } |
| |
| /** |
| * This verifies that an event time source works when setting stream time characteristic to |
| * processing time. In this case, the watermarks should just be swallowed. |
| */ |
| @Test |
| public void testEventTimeSourceWithProcessingTime() throws Exception { |
| StreamExecutionEnvironment env = |
| StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setParallelism(2); |
| env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); |
| |
| DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10)); |
| |
| source1 |
| .map(new IdentityMap()) |
| .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false)); |
| |
| env.execute(); |
| |
| // verify that we don't get any watermarks, the source is used as watermark source in |
| // other tests, so it normally emits watermarks |
| Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0); |
| } |
| |
| @Test |
| public void testErrorOnEventTimeOverProcessingTime() { |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setParallelism(2); |
| env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); |
| |
| DataStream<Tuple2<String, Integer>> source1 = |
| env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2)); |
| |
| source1 |
| .keyBy(0) |
| .window(TumblingEventTimeWindows.of(Time.seconds(5))) |
| .reduce(new ReduceFunction<Tuple2<String, Integer>>() { |
| @Override |
| public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { |
| return value1; |
| } |
| }) |
| .print(); |
| |
| try { |
| env.execute(); |
| fail("this should fail with an exception"); |
| } catch (Exception e) { |
| // expected |
| } |
| } |
| |
| @Test |
| public void testErrorOnEventTimeWithoutTimestamps() { |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env.setParallelism(2); |
| env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
| |
| DataStream<Tuple2<String, Integer>> source1 = |
| env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2)); |
| |
| source1 |
| .keyBy(0) |
| .window(TumblingEventTimeWindows.of(Time.seconds(5))) |
| .reduce(new ReduceFunction<Tuple2<String, Integer>>() { |
| @Override |
| public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { |
| return value1; |
| } |
| }) |
| .print(); |
| |
| try { |
| env.execute(); |
| fail("this should fail with an exception"); |
| } catch (Exception e) { |
| // expected |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Custom Operators and Functions |
| // ------------------------------------------------------------------------ |
| |
| @SuppressWarnings("unchecked") |
| private static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { |
| |
| List<Watermark> watermarks; |
| public static List<Watermark>[] finalWatermarks = new List[PARALLELISM]; |
| private final boolean timestampsEnabled; |
| |
| public CustomOperator(boolean timestampsEnabled) { |
| setChainingStrategy(ChainingStrategy.ALWAYS); |
| this.timestampsEnabled = timestampsEnabled; |
| } |
| |
| @Override |
| public void processElement(StreamRecord<Integer> element) throws Exception { |
| if (timestampsEnabled) { |
| if (element.getTimestamp() != element.getValue()) { |
| Assert.fail("Timestamps are not properly handled."); |
| } |
| } |
| output.collect(element); |
| } |
| |
| @Override |
| public void processWatermark(Watermark mark) throws Exception { |
| super.processWatermark(mark); |
| |
| for (Watermark previousMark: watermarks) { |
| assertTrue(previousMark.getTimestamp() < mark.getTimestamp()); |
| } |
| watermarks.add(mark); |
| latch.trigger(); |
| output.emitWatermark(mark); |
| } |
| |
| @Override |
| public void open() throws Exception { |
| super.open(); |
| watermarks = new ArrayList<>(); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| super.close(); |
| finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks; |
| } |
| } |
| |
| private static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { |
| |
| public TimestampCheckingOperator() { |
| setChainingStrategy(ChainingStrategy.ALWAYS); |
| } |
| |
| @Override |
| public void processElement(StreamRecord<Integer> element) throws Exception { |
| if (element.getTimestamp() != element.getValue()) { |
| Assert.fail("Timestamps are not properly handled."); |
| } |
| output.collect(element); |
| } |
| } |
| |
| private static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { |
| |
| @Override |
| public void processElement(StreamRecord<Integer> element) throws Exception { |
| if (element.hasTimestamp()) { |
| Assert.fail("Timestamps are not properly handled."); |
| } |
| output.collect(element); |
| } |
| } |
| |
| private static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> { |
| @Override |
| public Integer map1(Integer value) throws Exception { |
| return value; |
| } |
| |
| @Override |
| public Integer map2(Integer value) throws Exception { |
| return value; |
| } |
| } |
| |
| private static class IdentityMap implements MapFunction<Integer, Integer> { |
| @Override |
| public Integer map(Integer value) throws Exception { |
| return value; |
| } |
| } |
| |
| private static class MyTimestampSource implements SourceFunction<Integer> { |
| |
| private final long initialTime; |
| private final int numWatermarks; |
| |
| public MyTimestampSource(long initialTime, int numWatermarks) { |
| this.initialTime = initialTime; |
| this.numWatermarks = numWatermarks; |
| } |
| |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| for (int i = 0; i < numWatermarks; i++) { |
| ctx.collectWithTimestamp(i, initialTime + i); |
| ctx.emitWatermark(new Watermark(initialTime + i)); |
| } |
| } |
| |
| @Override |
| public void cancel() {} |
| } |
| |
| private static class MyTimestampSourceInfinite implements SourceFunction<Integer> { |
| |
| private final long initialTime; |
| private final int numWatermarks; |
| |
| private volatile boolean running = true; |
| |
| public MyTimestampSourceInfinite(long initialTime, int numWatermarks) { |
| this.initialTime = initialTime; |
| this.numWatermarks = numWatermarks; |
| } |
| |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| for (int i = 0; i < numWatermarks; i++) { |
| ctx.collectWithTimestamp(i, initialTime + i); |
| ctx.emitWatermark(new Watermark(initialTime + i)); |
| } |
| |
| while (running) { |
| Thread.sleep(20); |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| } |
| |
| private static class MyNonWatermarkingSource implements SourceFunction<Integer> { |
| |
| int numWatermarks; |
| |
| public MyNonWatermarkingSource(int numWatermarks) { |
| this.numWatermarks = numWatermarks; |
| } |
| |
| @Override |
| public void run(SourceContext<Integer> ctx) throws Exception { |
| for (int i = 0; i < numWatermarks; i++) { |
| ctx.collect(i); |
| } |
| } |
| |
| @Override |
| public void cancel() {} |
| } |
| |
| private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception { |
| Collection<JobStatusMessage> statusMessages = client.listJobs().get(); |
| return statusMessages.stream() |
| .filter(status -> !status.getJobState().isGloballyTerminalState()) |
| .map(JobStatusMessage::getJobId) |
| .collect(Collectors.toList()); |
| } |
| } |