[FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground
This closes #4.
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index 3d17fcd..893c11e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-playground-clickcountjob</artifactId>
- <version>1-FLINK-1.9_2.11</version>
+ <version>2-FLINK-1.9_2.11</version>
<name>flink-playground-clickcountjob</name>
<packaging>jar</packaging>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 0316bc6..f3d628c 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -18,6 +18,7 @@
package org.apache.flink.playgrounds.ops.clickcount;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
@@ -25,6 +26,7 @@
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@
* <p>The Job can be configured via the command line:</p>
* * "--checkpointing": enables checkpointing
* * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--backpressure": insert an operator that causes periodic backpressure
* * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
* * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
* * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@
public static final String CHECKPOINTING_OPTION = "checkpointing";
public static final String EVENT_TIME_OPTION = "event-time";
+ public static final String BACKPRESSURE_OPTION = "backpressure";
public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
@@ -66,6 +70,8 @@
configureEnvironment(params, env);
+ boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
+
String inputTopic = params.get("input-topic", "input");
String outputTopic = params.get("output-topic", "output");
String brokers = params.get("bootstrap.servers", "localhost:9092");
@@ -73,19 +79,32 @@
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
- env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
+ DataStream<ClickEvent> clicks =
+ env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
.name("ClickEvent Source")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(final ClickEvent element) {
return element.getTimestamp().getTime();
}
- })
+ });
+
+ if (inflictBackpressure) {
+ // Force a network shuffle so that the backpressure will affect the buffer pools
+ clicks = clicks
+ .keyBy(ClickEvent::getPage)
+ .map(new BackpressureMap())
+ .name("Backpressure");
+ }
+
+ DataStream<ClickEventStatistics> statistics = clicks
.keyBy(ClickEvent::getPage)
.timeWindow(WINDOW_SIZE)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
- .name("ClickEvent Counter")
+ .name("ClickEvent Counter");
+
+ statistics
.addSink(new FlinkKafkaProducer<>(
outputTopic,
new ClickEventStatisticsSerializationSchema(outputTopic),
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
new file mode 100644
index 0000000..ee68573
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+
+import java.time.LocalTime;
+
+/**
+ * This MapFunction causes severe backpressure during even-numbered minutes.
+ * E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec,
+ * but from 10:13:00 to 10:13:59 events will pass through unimpeded.
+ */
+public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> {
+
+ private boolean causeBackpressure() {
+ return ((LocalTime.now().getMinute() % 2) == 0);
+ }
+
+ @Override
+ public ClickEvent map(ClickEvent event) throws Exception {
+ if (causeBackpressure()) {
+ Thread.sleep(100);
+ }
+
+ return event;
+ }
+
+}
diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
index 9ed71c5..7907092 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@
services:
client:
build: ../docker/ops-playground-image
- image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+ image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
@@ -30,7 +30,7 @@
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
- image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+ image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka