[FLINK-18194][walkthroughs] Add transactions data generator
diff --git a/docker/data-generator/Dockerfile b/docker/data-generator/Dockerfile
new file mode 100644
index 0000000..5434ade
--- /dev/null
+++ b/docker/data-generator/Dockerfile
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM maven:3.6-jdk-8-slim AS builder
+
+# Get data producer code and compile it
+COPY ./src /opt/data-producer/src
+COPY ./pom.xml /opt/data-producer/pom.xml
+
+RUN cd /opt/data-producer; \
+ mvn clean install
+
+FROM openjdk:8-jre
+
+COPY --from=builder /opt/data-producer/target/data-generator-*.jar /opt/data-generator.jar
+
+RUN cd /opt
+
+COPY docker-entrypoint.sh /
+
+ENTRYPOINT ["/docker-entrypoint.sh"]
diff --git a/docker/data-generator/docker-entrypoint.sh b/docker/data-generator/docker-entrypoint.sh
new file mode 100755
index 0000000..9233baa
--- /dev/null
+++ b/docker/data-generator/docker-entrypoint.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+java -classpath /opt/data-generator.jar org.apache.flink.playground.datagen.DataGenerator
diff --git a/docker/data-generator/pom.xml b/docker/data-generator/pom.xml
new file mode 100644
index 0000000..95ced48
--- /dev/null
+++ b/docker/data-generator/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.flink</groupId>
+ <artifactId>data-generator</artifactId>
+ <version>1.0.0</version>
+
+ <url>http://flink.apache.org</url>
+ <inceptionYear>2014</inceptionYear>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <kafka.version>2.2.0</kafka.version>
+ <avro.version>1.8.2</avro.version>
+ <java.version>1.8</java.version>
+ <java.version>1.8</java.version>
+ <spotless-maven-plugin.version>1.20.0</spotless-maven-plugin.version>
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Java Compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <!-- Shade plugin to include all dependencies -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.13</version>
+ <inherited>false</inherited>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <excludes>
+ <!-- Additional files like .gitignore etc.-->
+ <exclude>**/.*/**</exclude>
+ <exclude>**/*.prefs</exclude>
+ <exclude>**/*.log</exclude>
+ <!-- Administrative files in the main trunk. -->
+ <exclude>**/README.md</exclude>
+ <exclude>**/CODE_OF_CONDUCT.md</exclude>
+ <exclude>.github/**</exclude>
+ <!-- IDE files. -->
+ <exclude>**/*.iml</exclude>
+ <!-- Generated content -->
+ <exclude>**/target/**</exclude>
+ <exclude>**/dependency-reduced-pom.xml</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <!-- Java code style -->
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless-maven-plugin.version}</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.7</version>
+ <style>GOOGLE</style>
+ </googleJavaFormat>
+ <removeUnusedImports/>
+ </java>
+ </configuration>
+ <executions>
+ <execution>
+ <id>spotless-check</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
new file mode 100644
index 0000000..e10c576
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
+
+ private static final String KAFKA = "kafka:9092";
+
+ private static final String TOPIC = "transactions";
+
+ public static void main(String[] args) {
+ Producer producer = new Producer(KAFKA, TOPIC);
+
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ LOG.info("Shutting down");
+ producer.close();
+ }));
+
+ producer.run();
+ }
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
new file mode 100644
index 0000000..e0fa817
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public class Producer implements Runnable, AutoCloseable {
+
+ private static final DateTimeFormatter formatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
+
+ private volatile boolean isRunning;
+
+ private final String brokers;
+
+ private final String topic;
+
+ public Producer(String brokers, String topic) {
+ this.brokers = brokers;
+ this.topic = topic;
+ this.isRunning = true;
+ }
+
+ @Override
+ public void run() {
+ KafkaProducer<Long, String> producer = new KafkaProducer<>(getProperties());
+
+ Throttler throttler = new Throttler(100);
+
+ Random generator = new Random();
+
+ Iterator<Long> accounts =
+ Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
+ .flatMap(UnaryOperator.identity())
+ .iterator();
+
+ Iterator<LocalDateTime> timestamps =
+ Stream.iterate(
+ LocalDateTime.of(2000, 1, 1, 1, 0),
+ time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
+ .iterator();
+
+ while (isRunning) {
+
+ Long account = accounts.next();
+ LocalDateTime timestamp = timestamps.next();
+ long millis = timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+
+ String transaction =
+ String.format(
+ "%s, %s, %s",
+ account.toString(), generator.nextInt(1000), timestamp.format(formatter));
+
+ ProducerRecord<Long, String> record =
+ new ProducerRecord<>(topic, null, millis, account, transaction);
+ producer.send(record);
+
+ try {
+ throttler.throttle();
+ } catch (InterruptedException e) {
+ isRunning = false;
+ }
+ }
+
+ producer.close();
+ }
+
+ @Override
+ public void close() {
+ isRunning = false;
+ }
+
+ private Properties getProperties() {
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.RETRIES_CONFIG, 0);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return props;
+ }
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
new file mode 100644
index 0000000..f704c80
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen;
+
+final class Throttler {
+
+ private final long throttleBatchSize;
+ private final long nanosPerBatch;
+
+ private long endOfNextBatchNanos;
+ private int currentBatch;
+
+ Throttler(long maxRecordsPerSecond) {
+ if (maxRecordsPerSecond == -1) {
+ // unlimited speed
+ throttleBatchSize = -1;
+ nanosPerBatch = 0;
+ endOfNextBatchNanos = System.nanoTime() + nanosPerBatch;
+ currentBatch = 0;
+ return;
+ }
+ final float ratePerSubtask = (float) maxRecordsPerSecond;
+
+ if (ratePerSubtask >= 10000) {
+ // high rates: all throttling in intervals of 2ms
+ throttleBatchSize = (int) ratePerSubtask / 500;
+ nanosPerBatch = 2_000_000L;
+ } else {
+ throttleBatchSize = ((int) (ratePerSubtask / 20)) + 1;
+ nanosPerBatch = ((int) (1_000_000_000L / ratePerSubtask)) * throttleBatchSize;
+ }
+ this.endOfNextBatchNanos = System.nanoTime() + nanosPerBatch;
+ this.currentBatch = 0;
+ }
+
+ void throttle() throws InterruptedException {
+ if (throttleBatchSize == -1) {
+ return;
+ }
+ if (++currentBatch != throttleBatchSize) {
+ return;
+ }
+ currentBatch = 0;
+
+ final long now = System.nanoTime();
+ final int millisRemaining = (int) ((endOfNextBatchNanos - now) / 1_000_000);
+
+ if (millisRemaining > 0) {
+ endOfNextBatchNanos += nanosPerBatch;
+ Thread.sleep(millisRemaining);
+ } else {
+ endOfNextBatchNanos = now + nanosPerBatch;
+ }
+ }
+}