[FLINK-13863] Update playgrounds to Flink 1.9.0

* Update Operations Playground (example job, dockerfile, docker-compose.yaml)
* Update README.md
diff --git a/README.md b/README.md
index c9881a3..2beaf5d 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@
 
 * The **Flink Operations Playground** in the (`operations-playground` folder) let's you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example 
 Flink job. The playground is presented in detail in the
-["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
+["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
 
 * The interactive SQL playground is still under development and will be added shortly.
 
diff --git a/docker/ops-playground-image/Dockerfile b/docker/ops-playground-image/Dockerfile
index 8b64428..59b40a0 100644
--- a/docker/ops-playground-image/Dockerfile
+++ b/docker/ops-playground-image/Dockerfile
@@ -32,7 +32,7 @@
 # Build Operations Playground Image
 ###############################################################################
 
-FROM flink:1.8.1-scala_2.11
+FROM flink:1.9.0-scala_2.11
 
 WORKDIR /opt/flink/bin
 
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index f1f9b89..3d17fcd 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@
 
 	<groupId>org.apache.flink</groupId>
 	<artifactId>flink-playground-clickcountjob</artifactId>
-	<version>1-FLINK-1.8_2.11</version>
+	<version>1-FLINK-1.9_2.11</version>
 
 	<name>flink-playground-clickcountjob</name>
 	<packaging>jar</packaging>
@@ -44,7 +44,7 @@
 
     <properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<flink.version>1.8.1</flink.version>
+		<flink.version>1.9.0</flink.version>
 		<java.version>1.8</java.version>
 		<scala.binary.version>2.11</scala.binary.version>
 		<maven.compiler.source>${java.version}</maven.compiler.source>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 9f609e9..0316bc6 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -86,10 +86,11 @@
 			.aggregate(new CountingAggregator(),
 				new ClickEventStatisticsCollector())
 			.name("ClickEvent Counter")
-			.addSink(new FlinkKafkaProducer<ClickEventStatistics>(
+			.addSink(new FlinkKafkaProducer<>(
 				outputTopic,
-				new ClickEventStatisticsSerializationSchema(),
-				kafkaProps))
+				new ClickEventStatisticsSerializationSchema(outputTopic),
+				kafkaProps,
+				FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
 			.name("ClickEventStatistics Sink");
 
 		env.execute("Click Event Count");
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
index 6a5c394..a789d83 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.playgrounds.ops.clickcount;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
-
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -67,12 +66,13 @@
 		KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps);
 
 		ClickIterator clickIterator = new ClickIterator();
-		SerializationSchema<ClickEvent> clickSerializer = new ClickEventSerializationSchema();
 
 		while (true) {
 
-			byte[] message = clickSerializer.serialize(clickIterator.next());
-			ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, message);
+			ProducerRecord<byte[], byte[]> record = new ClickEventSerializationSchema(topic).serialize(
+					clickIterator.next(),
+					null);
+
 			producer.send(record);
 
 			Thread.sleep(DELAY);
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
index fab05d1..eb64a87 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
@@ -17,28 +17,37 @@
 
 package org.apache.flink.playgrounds.ops.clickcount.records;
 
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
 /**
- * A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as JSON.
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s as JSON.
  *
  */
-public class ClickEventSerializationSchema implements SerializationSchema<ClickEvent> {
+public class ClickEventSerializationSchema implements KafkaSerializationSchema<ClickEvent> {
 
 	private static final ObjectMapper objectMapper = new ObjectMapper();
+	private String topic;
 
-	public ClickEventSerializationSchema() {
-		super();
+	public ClickEventSerializationSchema(){
+	}
+
+	public ClickEventSerializationSchema(String topic) {
+		this.topic = topic;
 	}
 
 	@Override
-	public byte[] serialize(ClickEvent message) {
+	public ProducerRecord<byte[], byte[]> serialize(
+			final ClickEvent message, @Nullable final Long timestamp) {
 		try {
 			//if topic is null, default topic will be used
-			return objectMapper.writeValueAsBytes(message);
+			return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
 		} catch (JsonProcessingException e) {
 			throw new IllegalArgumentException("Could not serialize record: " + message, e);
 		}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
index 40a0dbd..b24807e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
@@ -18,23 +18,37 @@
 package org.apache.flink.playgrounds.ops.clickcount.records;
 
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
 /**
- * A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
  *
  */
-public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> {
+public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {
 
 	private static final ObjectMapper objectMapper = new ObjectMapper();
+	private String topic;
+
+	public ClickEventStatisticsSerializationSchema(){
+	}
+
+	public ClickEventStatisticsSerializationSchema(String topic) {
+		this.topic = topic;
+	}
 
 	@Override
-	public byte[] serialize(ClickEventStatistics message) {
+	public ProducerRecord<byte[], byte[]> serialize(
+			final ClickEventStatistics message, @Nullable final Long timestamp) {
 		try {
 			//if topic is null, default topic will be used
-			return objectMapper.writeValueAsBytes(message);
+			return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
 		} catch (JsonProcessingException e) {
 			throw new IllegalArgumentException("Could not serialize record: " + message, e);
 		}
diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
index d498070..9ed71c5 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@
 services:
   client:
     build: ../docker/ops-playground-image
-    image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
     command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
     depends_on:
       - jobmanager
@@ -30,12 +30,12 @@
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   clickevent-generator:
-    image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
     command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
     depends_on:
       - kafka
   jobmanager:
-    image: flink:1.8-scala_2.11
+    image: flink:1.9-scala_2.11
     command: "jobmanager.sh start-foreground"
     ports:
       - 8081:8081
@@ -46,7 +46,7 @@
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   taskmanager:
-    image: flink:1.8-scala_2.11
+    image: flink:1.9-scala_2.11
     depends_on:
       - jobmanager
     command: "taskmanager.sh start-foreground"