[hotfix] Improve autoscaling example
diff --git a/examples/autoscaling/autoscaling.yaml b/examples/autoscaling/autoscaling.yaml
index 482e86b..509db5d 100644
--- a/examples/autoscaling/autoscaling.yaml
+++ b/examples/autoscaling/autoscaling.yaml
@@ -28,9 +28,8 @@
kubernetes.operator.job.autoscaler.scaling.sources.enabled: "false"
kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
kubernetes.operator.job.autoscaler.metrics.window: "3m"
- pipeline.max-parallelism: "8"
-
- taskmanager.numberOfTaskSlots: "2"
+ pipeline.max-parallelism: "24"
+ taskmanager.numberOfTaskSlots: "4"
state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
@@ -61,3 +60,4 @@
jarURI: local:///opt/flink/usrlib/autoscaling.jar
parallelism: 1
upgradeMode: last-state
+ args: ["10"]
diff --git a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
index c74fd52..e76b8ec 100644
--- a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
+++ b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
@@ -18,23 +18,38 @@
package autoscaling;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/** Autoscaling Example. */
public class AutoscalingExample {
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Long> stream = env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE);
+ long numIterations = Long.parseLong(args[0]);
+ DataStream<Long> stream =
+ env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).filter(i -> System.nanoTime() > 1);
stream =
stream.shuffle()
.map(
- i -> {
- // Add sleep to artificially slow down processing
- // Thread.sleep(sleep);
- return i;
+ new RichMapFunction<Long, Long>() {
+ @Override
+ public Long map(Long i) throws Exception {
+ long end = 0;
+ for (int j = 0; j < numIterations; j++) {
+ end = System.nanoTime();
+ }
+ return end;
+ }
});
- stream.print();
+ stream.addSink(
+ new SinkFunction<Long>() {
+ @Override
+ public void invoke(Long value, Context context) throws Exception {
+ // Do nothing
+ }
+ });
env.execute("Autoscaling Example");
}
}