[FLINK-18194][walkthroughs] Add transactions data generator
diff --git a/docker/data-generator/Dockerfile b/docker/data-generator/Dockerfile
new file mode 100644
index 0000000..5434ade
--- /dev/null
+++ b/docker/data-generator/Dockerfile
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM maven:3.6-jdk-8-slim AS builder
+
+# Get data producer code and compile it
+COPY ./src /opt/data-producer/src
+COPY ./pom.xml /opt/data-producer/pom.xml
+
+RUN cd /opt/data-producer; \
+    mvn clean install
+
+FROM openjdk:8-jre
+
+COPY --from=builder /opt/data-producer/target/data-generator-*.jar /opt/data-generator.jar
+
+RUN cd /opt
+
+COPY docker-entrypoint.sh /
+
+ENTRYPOINT ["/docker-entrypoint.sh"]
diff --git a/docker/data-generator/docker-entrypoint.sh b/docker/data-generator/docker-entrypoint.sh
new file mode 100755
index 0000000..9233baa
--- /dev/null
+++ b/docker/data-generator/docker-entrypoint.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+java -classpath /opt/data-generator.jar org.apache.flink.playground.datagen.DataGenerator
diff --git a/docker/data-generator/pom.xml b/docker/data-generator/pom.xml
new file mode 100644
index 0000000..95ced48
--- /dev/null
+++ b/docker/data-generator/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>data-generator</artifactId>
+    <version>1.0.0</version>
+
+    <url>http://flink.apache.org</url>
+    <inceptionYear>2014</inceptionYear>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <kafka.version>2.2.0</kafka.version>
+        <avro.version>1.8.2</avro.version>
+        <java.version>1.8</java.version>
+        <java.version>1.8</java.version>
+        <spotless-maven-plugin.version>1.20.0</spotless-maven-plugin.version>
+        <maven.compiler.source>${java.version}</maven.compiler.source>
+        <maven.compiler.target>${java.version}</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Java Compiler -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+
+            <!-- Shade plugin to include all dependencies -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.13</version>
+                <inherited>false</inherited>
+                <executions>
+                    <execution>
+                        <phase>verify</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <excludes>
+                        <!-- Additional files like .gitignore etc.-->
+                        <exclude>**/.*/**</exclude>
+                        <exclude>**/*.prefs</exclude>
+                        <exclude>**/*.log</exclude>
+                        <!-- Administrative files in the main trunk. -->
+                        <exclude>**/README.md</exclude>
+                        <exclude>**/CODE_OF_CONDUCT.md</exclude>
+                        <exclude>.github/**</exclude>
+                        <!-- IDE files. -->
+                        <exclude>**/*.iml</exclude>
+                        <!-- Generated content -->
+                        <exclude>**/target/**</exclude>
+                        <exclude>**/dependency-reduced-pom.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+
+            <!-- Java code style -->
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>${spotless-maven-plugin.version}</version>
+                <configuration>
+                    <java>
+                        <googleJavaFormat>
+                            <version>1.7</version>
+                            <style>GOOGLE</style>
+                        </googleJavaFormat>
+                        <removeUnusedImports/>
+                    </java>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>spotless-check</id>
+                        <phase>verify</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
new file mode 100644
index 0000000..e10c576
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataGenerator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
+
+  private static final String KAFKA = "kafka:9092";
+
+  private static final String TOPIC = "transactions";
+
+  public static void main(String[] args) {
+    Producer producer = new Producer(KAFKA, TOPIC);
+
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  LOG.info("Shutting down");
+                  producer.close();
+                }));
+
+    producer.run();
+  }
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
new file mode 100644
index 0000000..e0fa817
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public class Producer implements Runnable, AutoCloseable {
+
+  private static final DateTimeFormatter formatter =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
+
+  private volatile boolean isRunning;
+
+  private final String brokers;
+
+  private final String topic;
+
+  public Producer(String brokers, String topic) {
+    this.brokers = brokers;
+    this.topic = topic;
+    this.isRunning = true;
+  }
+
+  @Override
+  public void run() {
+    KafkaProducer<Long, String> producer = new KafkaProducer<>(getProperties());
+
+    Throttler throttler = new Throttler(100);
+
+    Random generator = new Random();
+
+    Iterator<Long> accounts =
+        Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
+            .flatMap(UnaryOperator.identity())
+            .iterator();
+
+    Iterator<LocalDateTime> timestamps =
+        Stream.iterate(
+                LocalDateTime.of(2000, 1, 1, 1, 0),
+                time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
+            .iterator();
+
+    while (isRunning) {
+
+      Long account = accounts.next();
+      LocalDateTime timestamp = timestamps.next();
+      long millis = timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+
+      String transaction =
+          String.format(
+              "%s, %s, %s",
+              account.toString(), generator.nextInt(1000), timestamp.format(formatter));
+
+      ProducerRecord<Long, String> record =
+          new ProducerRecord<>(topic, null, millis, account, transaction);
+      producer.send(record);
+
+      try {
+        throttler.throttle();
+      } catch (InterruptedException e) {
+        isRunning = false;
+      }
+    }
+
+    producer.close();
+  }
+
+  @Override
+  public void close() {
+    isRunning = false;
+  }
+
+  private Properties getProperties() {
+    final Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.RETRIES_CONFIG, 0);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+    return props;
+  }
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
new file mode 100644
index 0000000..f704c80
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen;
+
+final class Throttler {
+
+  private final long throttleBatchSize;
+  private final long nanosPerBatch;
+
+  private long endOfNextBatchNanos;
+  private int currentBatch;
+
+  Throttler(long maxRecordsPerSecond) {
+    if (maxRecordsPerSecond == -1) {
+      // unlimited speed
+      throttleBatchSize = -1;
+      nanosPerBatch = 0;
+      endOfNextBatchNanos = System.nanoTime() + nanosPerBatch;
+      currentBatch = 0;
+      return;
+    }
+    final float ratePerSubtask = (float) maxRecordsPerSecond;
+
+    if (ratePerSubtask >= 10000) {
+      // high rates: all throttling in intervals of 2ms
+      throttleBatchSize = (int) ratePerSubtask / 500;
+      nanosPerBatch = 2_000_000L;
+    } else {
+      throttleBatchSize = ((int) (ratePerSubtask / 20)) + 1;
+      nanosPerBatch = ((int) (1_000_000_000L / ratePerSubtask)) * throttleBatchSize;
+    }
+    this.endOfNextBatchNanos = System.nanoTime() + nanosPerBatch;
+    this.currentBatch = 0;
+  }
+
+  void throttle() throws InterruptedException {
+    if (throttleBatchSize == -1) {
+      return;
+    }
+    if (++currentBatch != throttleBatchSize) {
+      return;
+    }
+    currentBatch = 0;
+
+    final long now = System.nanoTime();
+    final int millisRemaining = (int) ((endOfNextBatchNanos - now) / 1_000_000);
+
+    if (millisRemaining > 0) {
+      endOfNextBatchNanos += nanosPerBatch;
+      Thread.sleep(millisRemaining);
+    } else {
+      endOfNextBatchNanos = now + nanosPerBatch;
+    }
+  }
+}