[FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground

This closes #4.
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index 3d17fcd..893c11e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@
 
 	<groupId>org.apache.flink</groupId>
 	<artifactId>flink-playground-clickcountjob</artifactId>
-	<version>1-FLINK-1.9_2.11</version>
+	<version>2-FLINK-1.9_2.11</version>
 
 	<name>flink-playground-clickcountjob</name>
 	<packaging>jar</packaging>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 0316bc6..f3d628c 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.playgrounds.ops.clickcount;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
 import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
@@ -25,6 +26,7 @@
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@
  * <p>The Job can be configured via the command line:</p>
  * * "--checkpointing": enables checkpointing
  * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--backpressure": insert an operator that causes periodic backpressure
  * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
  * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
  * * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@
 
 	public static final String CHECKPOINTING_OPTION = "checkpointing";
 	public static final String EVENT_TIME_OPTION = "event-time";
+	public static final String BACKPRESSURE_OPTION = "backpressure";
 
 	public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
 
@@ -66,6 +70,8 @@
 
 		configureEnvironment(params, env);
 
+		boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
+
 		String inputTopic = params.get("input-topic", "input");
 		String outputTopic = params.get("output-topic", "output");
 		String brokers = params.get("bootstrap.servers", "localhost:9092");
@@ -73,19 +79,32 @@
 		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
 
-		env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
+		DataStream<ClickEvent> clicks =
+				env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
 			.name("ClickEvent Source")
 			.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
 				@Override
 				public long extractTimestamp(final ClickEvent element) {
 					return element.getTimestamp().getTime();
 				}
-			})
+			});
+
+		if (inflictBackpressure) {
+			// Force a network shuffle so that the backpressure will affect the buffer pools
+			clicks = clicks
+				.keyBy(ClickEvent::getPage)
+				.map(new BackpressureMap())
+				.name("Backpressure");
+		}
+
+		DataStream<ClickEventStatistics> statistics = clicks
 			.keyBy(ClickEvent::getPage)
 			.timeWindow(WINDOW_SIZE)
 			.aggregate(new CountingAggregator(),
 				new ClickEventStatisticsCollector())
-			.name("ClickEvent Counter")
+			.name("ClickEvent Counter");
+
+		statistics
 			.addSink(new FlinkKafkaProducer<>(
 				outputTopic,
 				new ClickEventStatisticsSerializationSchema(outputTopic),
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
new file mode 100644
index 0000000..ee68573
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+
+import java.time.LocalTime;
+
+/**
+ * This MapFunction causes severe backpressure during even-numbered minutes.
+ * E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec,
+ * but from 10:13:00 to 10:13:59 events will pass through unimpeded.
+ */
+public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> {
+
+	private boolean causeBackpressure() {
+		return ((LocalTime.now().getMinute() % 2) == 0);
+	}
+
+	@Override
+	public ClickEvent map(ClickEvent event) throws Exception {
+		if (causeBackpressure()) {
+			Thread.sleep(100);
+		}
+
+		return event;
+	}
+
+}
diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
index 9ed71c5..7907092 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@
 services:
   client:
     build: ../docker/ops-playground-image
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
     depends_on:
       - jobmanager
@@ -30,7 +30,7 @@
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   clickevent-generator:
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
     depends_on:
       - kafka