[FLINK-13863] Update playgrounds to Flink 1.9.0
* Update Operations Playground (example job, dockerfile, docker-compose.yaml)
* Update README.md
diff --git a/README.md b/README.md
index c9881a3..2beaf5d 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@
* The **Flink Operations Playground** in the (`operations-playground` folder) let's you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example
Flink job. The playground is presented in detail in the
-["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
+["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
* The interactive SQL playground is still under development and will be added shortly.
diff --git a/docker/ops-playground-image/Dockerfile b/docker/ops-playground-image/Dockerfile
index 8b64428..59b40a0 100644
--- a/docker/ops-playground-image/Dockerfile
+++ b/docker/ops-playground-image/Dockerfile
@@ -32,7 +32,7 @@
# Build Operations Playground Image
###############################################################################
-FROM flink:1.8.1-scala_2.11
+FROM flink:1.9.0-scala_2.11
WORKDIR /opt/flink/bin
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index f1f9b89..3d17fcd 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-playground-clickcountjob</artifactId>
- <version>1-FLINK-1.8_2.11</version>
+ <version>1-FLINK-1.9_2.11</version>
<name>flink-playground-clickcountjob</name>
<packaging>jar</packaging>
@@ -44,7 +44,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <flink.version>1.8.1</flink.version>
+ <flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 9f609e9..0316bc6 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -86,10 +86,11 @@
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter")
- .addSink(new FlinkKafkaProducer<ClickEventStatistics>(
+ .addSink(new FlinkKafkaProducer<>(
outputTopic,
- new ClickEventStatisticsSerializationSchema(),
- kafkaProps))
+ new ClickEventStatisticsSerializationSchema(outputTopic),
+ kafkaProps,
+ FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
.name("ClickEventStatistics Sink");
env.execute("Click Event Count");
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
index 6a5c394..a789d83 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
@@ -18,11 +18,10 @@
package org.apache.flink.playgrounds.ops.clickcount;
-import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
-
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema;
+
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -67,12 +66,13 @@
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps);
ClickIterator clickIterator = new ClickIterator();
- SerializationSchema<ClickEvent> clickSerializer = new ClickEventSerializationSchema();
while (true) {
- byte[] message = clickSerializer.serialize(clickIterator.next());
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, message);
+ ProducerRecord<byte[], byte[]> record = new ClickEventSerializationSchema(topic).serialize(
+ clickIterator.next(),
+ null);
+
producer.send(record);
Thread.sleep(DELAY);
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
index fab05d1..eb64a87 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
@@ -17,28 +17,37 @@
package org.apache.flink.playgrounds.ops.clickcount.records;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
/**
- * A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as JSON.
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s as JSON.
*
*/
-public class ClickEventSerializationSchema implements SerializationSchema<ClickEvent> {
+public class ClickEventSerializationSchema implements KafkaSerializationSchema<ClickEvent> {
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private String topic;
- public ClickEventSerializationSchema() {
- super();
+ public ClickEventSerializationSchema(){
+ }
+
+ public ClickEventSerializationSchema(String topic) {
+ this.topic = topic;
}
@Override
- public byte[] serialize(ClickEvent message) {
+ public ProducerRecord<byte[], byte[]> serialize(
+ final ClickEvent message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
- return objectMapper.writeValueAsBytes(message);
+ return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
index 40a0dbd..b24807e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
@@ -18,23 +18,37 @@
package org.apache.flink.playgrounds.ops.clickcount.records;
-import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
/**
- * A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
*
*/
-public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> {
+public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private String topic;
+
+ public ClickEventStatisticsSerializationSchema(){
+ }
+
+ public ClickEventStatisticsSerializationSchema(String topic) {
+ this.topic = topic;
+ }
@Override
- public byte[] serialize(ClickEventStatistics message) {
+ public ProducerRecord<byte[], byte[]> serialize(
+ final ClickEventStatistics message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
- return objectMapper.writeValueAsBytes(message);
+ return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
index d498070..9ed71c5 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@
services:
client:
build: ../docker/ops-playground-image
- image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+ image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
@@ -30,12 +30,12 @@
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
- image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+ image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka
jobmanager:
- image: flink:1.8-scala_2.11
+ image: flink:1.9-scala_2.11
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
@@ -46,7 +46,7 @@
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
- image: flink:1.8-scala_2.11
+ image: flink:1.9-scala_2.11
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"