[FLINK-27507] update operations-walkthrough playground for Flink 1.14
* [FLINK-27507] update operations-walkthrough playground for Flink 1.14
* [FLINK-27507] removed docker socket mount
diff --git a/README.md b/README.md
index f825a08..84937d4 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@
* The **Flink Operations Playground** (in the `operations-playground` folder) lets you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example
Flink job. The playground is presented in detail in
-["Flink Operations Playground"](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/flink-operations-playground), which is part of the _Try Flink_ section of the Flink documentation.
+["Flink Operations Playground"](https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/try-flink/flink-operations-playground), which is part of the _Try Flink_ section of the Flink documentation.
* The **Table Walkthrough** (in the `table-walkthrough` folder) shows to use the Table API to build an analytics pipeline that reads streaming data from Kafka and writes results to MySQL, along with a real-time dashboard in Grafana. The walkthrough is presented in detail in ["Real Time Reporting with the Table API"](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/table_api), which is part of the _Try Flink_ section of the Flink documentation.
diff --git a/docker/ops-playground-image/Dockerfile b/docker/ops-playground-image/Dockerfile
index e7c89aa..3673167 100644
--- a/docker/ops-playground-image/Dockerfile
+++ b/docker/ops-playground-image/Dockerfile
@@ -20,7 +20,7 @@
# Build Click Count Job
###############################################################################
-FROM maven:3.6-jdk-8-slim AS builder
+FROM maven:3.8-jdk-8-slim AS builder
# Get Click Count job and compile it
COPY ./java/flink-playground-clickcountjob /opt/flink-playground-clickcountjob
@@ -32,7 +32,7 @@
# Build Operations Playground Image
###############################################################################
-FROM apache/flink:1.13.1-scala_2.12-java8
+FROM apache/flink:1.14.4-scala_2.12-java8
WORKDIR /opt/flink/bin
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index a2e99ee..b62376d 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-playground-clickcountjob</artifactId>
- <version>1-FLINK-1.13_2.12</version>
+ <version>1-FLINK-1.14_2.12</version>
<name>flink-playground-clickcountjob</name>
<packaging>jar</packaging>
@@ -44,7 +44,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <flink.version>1.13.1</flink.version>
+ <flink.version>1.14.4</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 489fd19..359ef2e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -19,6 +19,9 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
@@ -34,7 +37,6 @@
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -115,13 +117,18 @@
new ClickEventStatisticsCollector())
.name("ClickEvent Counter");
- statistics
- .addSink(new FlinkKafkaProducer<>(
- outputTopic,
- new ClickEventStatisticsSerializationSchema(outputTopic),
- kafkaProps,
- FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
- .name("ClickEventStatistics Sink");
+ statistics.sinkTo(
+ KafkaSink.<ClickEventStatistics>builder()
+ .setBootstrapServers(kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .setKafkaProducerConfig(kafkaProps)
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(outputTopic)
+ .setValueSerializationSchema(new ClickEventStatisticsSerializationSchema())
+ .build())
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build())
+ .name("ClickEventStatistics Sink");
env.execute("Click Event Count");
}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
index b24807e..2152bb9 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
@@ -17,40 +17,27 @@
package org.apache.flink.playgrounds.ops.clickcount.records;
-
-import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
-
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import javax.annotation.Nullable;
+import java.io.IOException;
/**
- * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
+ * A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
*
*/
-public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {
-
+public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> {
private static final ObjectMapper objectMapper = new ObjectMapper();
- private String topic;
-
- public ClickEventStatisticsSerializationSchema(){
- }
-
- public ClickEventStatisticsSerializationSchema(String topic) {
- this.topic = topic;
- }
@Override
- public ProducerRecord<byte[], byte[]> serialize(
- final ClickEventStatistics message, @Nullable final Long timestamp) {
+ public byte[] serialize(ClickEventStatistics event) {
try {
//if topic is null, default topic will be used
- return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
+ return objectMapper.writeValueAsBytes(event);
} catch (JsonProcessingException e) {
- throw new IllegalArgumentException("Could not serialize record: " + message, e);
+ throw new IllegalArgumentException("Could not serialize record: " + event, e);
}
}
}
diff --git a/operations-playground/README.md b/operations-playground/README.md
index 9f29949..a2d60cf 100644
--- a/operations-playground/README.md
+++ b/operations-playground/README.md
@@ -61,4 +61,4 @@
## Further instructions
The playground setup and more detailed instructions are presented in the
-["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.13/try-flink/flink-operations-playground.html) of Flink's documentation.
+["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.14/try-flink/flink-operations-playground.html) of Flink's documentation.
diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
index af07458..4f003d3 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@
services:
client:
build: ../docker/ops-playground-image
- image: apache/flink-ops-playground:1-FLINK-1.13-scala_2.12
+ image: apache/flink-ops-playground:1-FLINK-1.14-scala_2.12
command: "flink run -d /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
@@ -30,12 +30,12 @@
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
- image: apache/flink-ops-playground:1-FLINK-1.13-scala_2.12
+ image: apache/flink-ops-playground:1-FLINK-1.14-scala_2.12
command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka
jobmanager:
- image: apache/flink:1.13.1-scala_2.12-java8
+ image: apache/flink:1.14.4-scala_2.12-java8
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
@@ -46,7 +46,7 @@
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
- image: apache/flink:1.13.1-scala_2.12-java8
+ image: apache/flink:1.14.4-scala_2.12-java8
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"
@@ -59,7 +59,7 @@
zookeeper:
image: wurstmeister/zookeeper:3.4.6
kafka:
- image: wurstmeister/kafka:2.12-2.2.1
+ image: wurstmeister/kafka:2.13-2.8.1
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
diff --git a/table-walkthrough/docker-compose.yml b/table-walkthrough/docker-compose.yml
index b4b641d..4f8ff6d 100644
--- a/table-walkthrough/docker-compose.yml
+++ b/table-walkthrough/docker-compose.yml
@@ -61,8 +61,6 @@
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "kafka:1:1"
- volumes:
- - /var/run/docker.sock:/var/run/docker.sock
data-generator:
image: apache/data-generator:1
build: ../docker/data-generator