[hotfix] Update ClickEventCount to use modern API
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 534ef9f..e56a39e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -17,6 +17,7 @@
package org.apache.flink.playgrounds.ops.clickcount;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
@@ -25,17 +26,20 @@
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializationSchema;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -48,7 +52,7 @@
*
* <p>The Job can be configured via the command line:</p>
* * "--checkpointing": enables checkpointing
- * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--event-time": use an event time window assigner
* * "--backpressure": insert an operator that causes periodic backpressure
* * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
* * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
@@ -80,15 +84,17 @@
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
+ WatermarkStrategy<ClickEvent> strategy = WatermarkStrategy
+ .<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
+ .withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());
+
DataStream<ClickEvent> clicks =
- env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
- .name("ClickEvent Source")
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
- @Override
- public long extractTimestamp(final ClickEvent element) {
- return element.getTimestamp().getTime();
- }
- });
+ env.addSource(new FlinkKafkaConsumer<>(
+ inputTopic,
+ new ClickEventDeserializationSchema(),
+ kafkaProps)
+ .assignTimestampsAndWatermarks(strategy))
+ .name("ClickEvent Source");
if (inflictBackpressure) {
// Force a network shuffle so that the backpressure will affect the buffer pools
@@ -98,9 +104,13 @@
.name("Backpressure");
}
+ WindowAssigner<Object, TimeWindow> assigner = params.has(EVENT_TIME_OPTION) ?
+ TumblingEventTimeWindows.of(WINDOW_SIZE) :
+ TumblingProcessingTimeWindows.of(WINDOW_SIZE);
+
DataStream<ClickEventStatistics> statistics = clicks
.keyBy(ClickEvent::getPage)
- .timeWindow(WINDOW_SIZE)
+ .window(assigner)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter");
@@ -121,17 +131,12 @@
final StreamExecutionEnvironment env) {
boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION);
- boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION);
boolean enableChaining = params.has(OPERATOR_CHAINING_OPTION);
if (checkpointingEnabled) {
env.enableCheckpointing(1000);
}
- if (eventTimeSemantics) {
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- }
-
if(!enableChaining){
//disabling Operator chaining to make it easier to follow the Job in the WebUI
env.disableOperatorChaining();