[hotfix] Update ClickEventCount to use modern API
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 534ef9f..e56a39e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.playgrounds.ops.clickcount;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
@@ -25,17 +26,20 @@
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializationSchema;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
+import java.time.Duration;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -48,7 +52,7 @@
  *
  * <p>The Job can be configured via the command line:</p>
  * * "--checkpointing": enables checkpointing
- * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--event-time": use an event time window assigner
  * * "--backpressure": insert an operator that causes periodic backpressure
  * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
  * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
@@ -80,15 +84,17 @@
 		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
 
+		WatermarkStrategy<ClickEvent> strategy = WatermarkStrategy
+				.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
+				.withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());
+
 		DataStream<ClickEvent> clicks =
-				env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
-			.name("ClickEvent Source")
-			.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
-				@Override
-				public long extractTimestamp(final ClickEvent element) {
-					return element.getTimestamp().getTime();
-				}
-			});
+				env.addSource(new FlinkKafkaConsumer<>(
+						inputTopic,
+						new ClickEventDeserializationSchema(),
+						kafkaProps)
+					.assignTimestampsAndWatermarks(strategy))
+			.name("ClickEvent Source");
 
 		if (inflictBackpressure) {
 			// Force a network shuffle so that the backpressure will affect the buffer pools
@@ -98,9 +104,13 @@
 				.name("Backpressure");
 		}
 
+		WindowAssigner<Object, TimeWindow> assigner = params.has(EVENT_TIME_OPTION) ?
+				TumblingEventTimeWindows.of(WINDOW_SIZE) :
+				TumblingProcessingTimeWindows.of(WINDOW_SIZE);
+
 		DataStream<ClickEventStatistics> statistics = clicks
 			.keyBy(ClickEvent::getPage)
-			.timeWindow(WINDOW_SIZE)
+			.window(assigner)
 			.aggregate(new CountingAggregator(),
 				new ClickEventStatisticsCollector())
 			.name("ClickEvent Counter");
@@ -121,17 +131,12 @@
 			final StreamExecutionEnvironment env) {
 
 		boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION);
-		boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION);
 		boolean enableChaining = params.has(OPERATOR_CHAINING_OPTION);
 
 		if (checkpointingEnabled) {
 			env.enableCheckpointing(1000);
 		}
 
-		if (eventTimeSemantics) {
-			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		}
-
 		if(!enableChaining){
 			//disabling Operator chaining to make it easier to follow the Job in the WebUI
 			env.disableOperatorChaining();