[FLINK-18194][walkthroughs] Add table api walkthrough skeleton code
This closes #13
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
index e10c576..250f2c5 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
@@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/** A basic data generator for continuously writing data into a Kafka topic. */
public class DataGenerator {
private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
index e0fa817..33d68bd 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
@@ -18,20 +18,18 @@
package org.apache.flink.playground.datagen;
-import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
-import java.util.Iterator;
import java.util.Properties;
-import java.util.Random;
-import java.util.function.UnaryOperator;
-import java.util.stream.Stream;
+import org.apache.flink.playground.datagen.model.Transaction;
+import org.apache.flink.playground.datagen.model.TransactionSerializer;
+import org.apache.flink.playground.datagen.model.TransactionSupplier;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+/** Generates CSV transaction records at a rate */
public class Producer implements Runnable, AutoCloseable {
private static final DateTimeFormatter formatter =
@@ -51,36 +49,20 @@
@Override
public void run() {
- KafkaProducer<Long, String> producer = new KafkaProducer<>(getProperties());
+ KafkaProducer<Long, Transaction> producer = new KafkaProducer<>(getProperties());
Throttler throttler = new Throttler(100);
- Random generator = new Random();
-
- Iterator<Long> accounts =
- Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
- .flatMap(UnaryOperator.identity())
- .iterator();
-
- Iterator<LocalDateTime> timestamps =
- Stream.iterate(
- LocalDateTime.of(2000, 1, 1, 1, 0),
- time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
- .iterator();
+ TransactionSupplier transactions = new TransactionSupplier();
while (isRunning) {
- Long account = accounts.next();
- LocalDateTime timestamp = timestamps.next();
- long millis = timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+ Transaction transaction = transactions.get();
- String transaction =
- String.format(
- "%s, %s, %s",
- account.toString(), generator.nextInt(1000), timestamp.format(formatter));
+ long millis = transaction.timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
- ProducerRecord<Long, String> record =
- new ProducerRecord<>(topic, null, millis, account, transaction);
+ ProducerRecord<Long, Transaction> record =
+ new ProducerRecord<>(topic, null, millis, transaction.accountId, transaction);
producer.send(record);
try {
@@ -104,7 +86,7 @@
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TransactionSerializer.class);
return props;
}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
index f704c80..548d300 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
@@ -18,6 +18,7 @@
package org.apache.flink.playground.datagen;
+/** A data throttler that controls the rate at which data is written out to Kafka. */
final class Throttler {
private final long throttleBatchSize;
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java
new file mode 100644
index 0000000..42fcb41
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen.model;
+
+import java.time.LocalDateTime;
+
+/** A simple financial transaction. */
+public class Transaction {
+ public long accountId;
+
+ public int amount;
+
+ public LocalDateTime timestamp;
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java
new file mode 100644
index 0000000..b57b5f0
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen.model;
+
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+
+/** Serializes a {@link Transaction} into a CSV record. */
+public class TransactionSerializer implements Serializer<Transaction> {
+
+ private static final DateTimeFormatter formatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
+
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {}
+
+ @Override
+ public byte[] serialize(String s, Transaction transaction) {
+ String csv =
+ String.format(
+ "%s, %s, %s",
+ transaction.accountId, transaction.amount, transaction.timestamp.format(formatter));
+
+ return csv.getBytes();
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java
new file mode 100644
index 0000000..6ddf9a6
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen.model;
+
+import java.time.LocalDateTime;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+/** A supplier that generates an arbitrary transaction. */
+public class TransactionSupplier implements Supplier<Transaction> {
+
+ private final Random generator = new Random();
+
+ private final Iterator<Long> accounts =
+ Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
+ .flatMap(UnaryOperator.identity())
+ .iterator();
+
+ private final Iterator<LocalDateTime> timestamps =
+ Stream.iterate(
+ LocalDateTime.of(2000, 1, 1, 1, 0),
+ time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
+ .iterator();
+
+ @Override
+ public Transaction get() {
+ Transaction transaction = new Transaction();
+ transaction.accountId = accounts.next();
+ transaction.amount = generator.nextInt(1000);
+ transaction.timestamp = timestamps.next();
+
+ return transaction;
+ }
+}
diff --git a/table-walkthrough/Dockerfile b/table-walkthrough/Dockerfile
new file mode 100644
index 0000000..86cf2df
--- /dev/null
+++ b/table-walkthrough/Dockerfile
@@ -0,0 +1,45 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM maven:3.6-jdk-8-slim AS builder
+
+COPY ./pom.xml /opt/pom.xml
+COPY ./src /opt/src
+RUN cd /opt; mvn clean install -Dmaven.test.skip
+
+FROM flink:1.11-SNAPSHOT-scala_2.11
+
+# Download connector libraries for snapshot version
+RUN wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/1.11-SNAPSHOT/flink-sql-connector-kafka_2.11-1.11-20200610.034108-152.jar; \
+ wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-jdbc_2.11/1.11-SNAPSHOT/flink-connector-jdbc_2.11-1.11-20200610.033814-8.jar; \
+ wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-csv/1.11-SNAPSHOT/flink-csv-1.11-20200610.033438-153.jar; \
+ wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;
+
+
+# Download connector libraries
+#RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/${FLINK_VERSION}/flink-sql-connector-kafka_2.11-${FLINK_VERSION}.jar; \
+# wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-jdbc_2.11/${FLINK_VERSION}/flink-jdbc_2.11-${FLINK_VERSION}.jar; \
+# wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
+# wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;
+
+COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar
+
+RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
+ echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
+ echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
+ echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;
diff --git a/table-walkthrough/docker-compose.yml b/table-walkthrough/docker-compose.yml
new file mode 100644
index 0000000..2388af4
--- /dev/null
+++ b/table-walkthrough/docker-compose.yml
@@ -0,0 +1,91 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2.1'
+services:
+ jobmanager:
+ image: apache/flink-table-walkthrough:1-FLINK-1.11-scala_2.11
+ build: .
+ hostname: "jobmanager"
+ expose:
+ - "6123"
+ ports:
+ - "8082:8081"
+ command: standalone-job
+ environment:
+ - JOB_MANAGER_RPC_ADDRESS=jobmanager
+ depends_on:
+ - kafka
+ - mysql
+ taskmanager:
+ image: apache/flink-playground-walkthrough:1-FLINK-1.11-scala_2.11
+ build: .
+ expose:
+ - "6121"
+ - "6122"
+ depends_on:
+ - jobmanager
+ command: taskmanager
+ links:
+ - jobmanager:jobmanager
+ environment:
+ - JOB_MANAGER_RPC_ADDRESS=jobmanager
+ zookeeper:
+ image: wurstmeister/zookeeper:3.4.6
+ ports:
+ - "2181:2181"
+ kafka:
+ image: wurstmeister/kafka:2.12-2.2.1
+ ports:
+ - "9092:9092"
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: "kafka"
+ KAFKA_ADVERTISED_PORT: "9092"
+ HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_CREATE_TOPICS: "kafka:1:1"
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+ data-generator:
+ image: apache/data-generator:1
+ build: ../docker/data-generator
+ depends_on:
+ - kafka
+ mysql:
+ image: mysql:8.0.19
+ command: --default-authentication-plugin=mysql_native_password --secure_file_priv=/data
+ environment:
+ MYSQL_USER: "sql-demo"
+ MYSQL_PASSWORD: "demo-sql"
+ MYSQL_DATABASE: "sql-demo"
+ MYSQL_RANDOM_ROOT_PASSWORD: "yes"
+ volumes:
+ - ../docker/mysql-spend-report-init:/docker-entrypoint-initdb.d
+ - ./data:/data
+ grafana:
+ image: grafana/grafana
+ ports:
+ - "3000:3000"
+ depends_on:
+ - mysql
+ volumes:
+ - ../docker/grafana-spend-report-init/provisioning/:/etc/grafana/provisioning/
+ - ../docker/grafana-spend-report-init/dashboard.json:/etc/grafana/dashboard.json
+ - ../docker/grafana-spend-report-init/grafana.ini:/etc/grafana/grafana.ini
diff --git a/table-walkthrough/pom.xml b/table-walkthrough/pom.xml
new file mode 100644
index 0000000..dda85c8
--- /dev/null
+++ b/table-walkthrough/pom.xml
@@ -0,0 +1,251 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.flink</groupId>
+ <artifactId>spend-report</artifactId>
+ <version>1.0.0</version>
+ <packaging>jar</packaging>
+
+ <name>Flink Walkthrough Table Java</name>
+ <url>https://flink.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>1.11-SNAPSHOT</flink.version>
+ <java.version>1.8</java.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Add logging framework, to produce console output when running in the IDE. -->
+ <!-- These dependencies are excluded from the application JAR by default. -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.7</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Java Compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+ <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.playgrounds.spendreport.SpendReport</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+
+ <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <versionRange>[3.0.0,)</versionRange>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.1,)</versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ <goal>compile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.13</version>
+ <inherited>false</inherited>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <excludes>
+ <!-- Additional files like .gitignore etc.-->
+ <exclude>**/.*/**</exclude>
+ <exclude>**/*.prefs</exclude>
+ <exclude>**/*.log</exclude>
+ <!-- Administrative files in the main trunk. -->
+ <exclude>**/README.md</exclude>
+ <exclude>**/CODE_OF_CONDUCT.md</exclude>
+ <exclude>.github/**</exclude>
+ <!-- IDE files. -->
+ <exclude>**/*.iml</exclude>
+ <!-- Generated content -->
+ <exclude>**/target/**</exclude>
+ <exclude>**/dependency-reduced-pom.xml</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
diff --git a/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java
new file mode 100644
index 0000000..1a8cb83
--- /dev/null
+++ b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+public class SpendReport {
+
+ public static Table report(Table transactions) {
+ throw new UnimplementedException();
+ }
+
+ public static void main(String[] args) throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+ TableEnvironment tEnv = TableEnvironment.create(settings);
+
+ tEnv.executeSql("CREATE TABLE transactions (\n" +
+ " account_id BIGINT,\n" +
+ " amount BIGINT,\n" +
+ " transaction_time TIMESTAMP(3),\n" +
+ " WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
+ ") WITH (\n" +
+ " 'connector' = 'kafka',\n" +
+ " 'topic' = 'transactions',\n" +
+ " 'properties.bootstrap.servers' = 'kafka:9092',\n" +
+ " 'format' = 'csv'\n" +
+ ")");
+
+ tEnv.executeSql("CREATE TABLE spend_report (\n" +
+ " account_id BIGINT,\n" +
+ " log_ts TIMESTAMP(3),\n" +
+ " amount BIGINT\n," +
+ " PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+ ") WITH (\n" +
+ " 'connector' = 'jdbc',\n" +
+ " 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+ " 'table-name' = 'spend_report',\n" +
+ " 'driver' = 'com.mysql.jdbc.Driver',\n" +
+ " 'username' = 'sql-demo',\n" +
+ " 'password' = 'demo-sql'\n" +
+ ")");
+
+ Table transactions = tEnv.from("transactions");
+ report(transactions).executeInsert("spend_report");
+ }
+}
diff --git a/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java
new file mode 100644
index 0000000..e1c986f
--- /dev/null
+++ b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+public class UnimplementedException extends RuntimeException {
+
+ public UnimplementedException() {
+ super("This method has not yet been implemented");
+ }
+}
diff --git a/table-walkthrough/src/main/resources/log4j.properties b/table-walkthrough/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e26ea17
--- /dev/null
+++ b/table-walkthrough/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java b/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
new file mode 100644
index 0000000..208f6e9
--- /dev/null
+++ b/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A unit test of the spend report.
+ * If this test passes then the business
+ * logic is correct.
+ */
+public class SpendReportTest {
+
+ private static final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
+
+ @Test
+ public void testReport() {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tEnv = TableEnvironment.create(settings);
+
+ Table transactions =
+ tEnv.fromValues(
+ DataTypes.ROW(
+ DataTypes.FIELD("account_id", DataTypes.BIGINT()),
+ DataTypes.FIELD("amount", DataTypes.BIGINT()),
+ DataTypes.FIELD("transaction_time", DataTypes.TIMESTAMP(3))),
+ Row.of(1, 188, DATE_TIME.plusMinutes(12)),
+ Row.of(2, 374, DATE_TIME.plusMinutes(47)),
+ Row.of(3, 112, DATE_TIME.plusMinutes(36)),
+ Row.of(4, 478, DATE_TIME.plusMinutes(3)),
+ Row.of(5, 208, DATE_TIME.plusMinutes(8)),
+ Row.of(1, 379, DATE_TIME.plusMinutes(53)),
+ Row.of(2, 351, DATE_TIME.plusMinutes(32)),
+ Row.of(3, 320, DATE_TIME.plusMinutes(31)),
+ Row.of(4, 259, DATE_TIME.plusMinutes(19)),
+ Row.of(5, 273, DATE_TIME.plusMinutes(42)));
+
+ try {
+ TableResult results = SpendReport.report(transactions).execute();
+
+ MatcherAssert.assertThat(
+ materialize(results),
+ Matchers.containsInAnyOrder(
+ Row.of(1L, DATE_TIME, 567L),
+ Row.of(2L, DATE_TIME, 725L),
+ Row.of(3L, DATE_TIME, 432L),
+ Row.of(4L, DATE_TIME, 737L),
+ Row.of(5L, DATE_TIME, 481L)));
+ } catch (UnimplementedException e) {
+ Assume.assumeNoException("The walkthrough has not been implemented", e);
+ }
+ }
+
+ private static List<Row> materialize(TableResult results) {
+ try (CloseableIterator<Row> resultIterator = results.collect()) {
+ return StreamSupport
+ .stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to materialize results", e);
+ }
+ }
+}
diff --git a/table-walkthrough/src/test/resources/log4j.properties b/table-walkthrough/src/test/resources/log4j.properties
new file mode 100644
index 0000000..5cae07b
--- /dev/null
+++ b/table-walkthrough/src/test/resources/log4j.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.streaming.api.operators.collect.CollectResultFetcher=OFF
+log4j.logger.org.apache.flink.streaming.api.operators.collect.CollectSinkFunction=OFF
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n