[hotfix][operations] Use FLIP-27 Kafka Source
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index e56a39e..489fd19 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
 import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
@@ -33,7 +34,6 @@
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -84,17 +84,17 @@
 		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
 
-		WatermarkStrategy<ClickEvent> strategy = WatermarkStrategy
+		KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
+				.setTopics(inputTopic)
+				.setValueOnlyDeserializer(new ClickEventDeserializationSchema())
+				.setProperties(kafkaProps)
+				.build();
+
+		WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
 				.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
 				.withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());
 
-		DataStream<ClickEvent> clicks =
-				env.addSource(new FlinkKafkaConsumer<>(
-						inputTopic,
-						new ClickEventDeserializationSchema(),
-						kafkaProps)
-					.assignTimestampsAndWatermarks(strategy))
-			.name("ClickEvent Source");
+		DataStream<ClickEvent> clicks = env.fromSource(source, watermarkStrategy, "ClickEvent Source");
 
 		if (inflictBackpressure) {
 			// Force a network shuffle so that the backpressure will affect the buffer pools