[FLINK-18194][walkthroughs] Add table api walkthrough skeleton code

This closes #13
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
index e10c576..250f2c5 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
@@ -21,6 +21,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/** A basic data generator for continuously writing data into a Kafka topic. */
 public class DataGenerator {
 
   private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
index e0fa817..33d68bd 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.playground.datagen;
 
-import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
-import java.util.Iterator;
 import java.util.Properties;
-import java.util.Random;
-import java.util.function.UnaryOperator;
-import java.util.stream.Stream;
+import org.apache.flink.playground.datagen.model.Transaction;
+import org.apache.flink.playground.datagen.model.TransactionSerializer;
+import org.apache.flink.playground.datagen.model.TransactionSupplier;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 
+/** Generates CSV transaction records at a rate */
 public class Producer implements Runnable, AutoCloseable {
 
   private static final DateTimeFormatter formatter =
@@ -51,36 +49,20 @@
 
   @Override
   public void run() {
-    KafkaProducer<Long, String> producer = new KafkaProducer<>(getProperties());
+    KafkaProducer<Long, Transaction> producer = new KafkaProducer<>(getProperties());
 
     Throttler throttler = new Throttler(100);
 
-    Random generator = new Random();
-
-    Iterator<Long> accounts =
-        Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
-            .flatMap(UnaryOperator.identity())
-            .iterator();
-
-    Iterator<LocalDateTime> timestamps =
-        Stream.iterate(
-                LocalDateTime.of(2000, 1, 1, 1, 0),
-                time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
-            .iterator();
+    TransactionSupplier transactions = new TransactionSupplier();
 
     while (isRunning) {
 
-      Long account = accounts.next();
-      LocalDateTime timestamp = timestamps.next();
-      long millis = timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+      Transaction transaction = transactions.get();
 
-      String transaction =
-          String.format(
-              "%s, %s, %s",
-              account.toString(), generator.nextInt(1000), timestamp.format(formatter));
+      long millis = transaction.timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
 
-      ProducerRecord<Long, String> record =
-          new ProducerRecord<>(topic, null, millis, account, transaction);
+      ProducerRecord<Long, Transaction> record =
+          new ProducerRecord<>(topic, null, millis, transaction.accountId, transaction);
       producer.send(record);
 
       try {
@@ -104,7 +86,7 @@
     props.put(ProducerConfig.ACKS_CONFIG, "all");
     props.put(ProducerConfig.RETRIES_CONFIG, 0);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TransactionSerializer.class);
 
     return props;
   }
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
index f704c80..548d300 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.playground.datagen;
 
+/** A data throttler that controls the rate at which data is written out to Kafka. */
 final class Throttler {
 
   private final long throttleBatchSize;
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java
new file mode 100644
index 0000000..42fcb41
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen.model;
+
+import java.time.LocalDateTime;
+
+/** A simple financial transaction. */
+public class Transaction {
+  public long accountId;
+
+  public int amount;
+
+  public LocalDateTime timestamp;
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java
new file mode 100644
index 0000000..b57b5f0
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen.model;
+
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+
+/** Serializes a {@link Transaction} into a CSV record. */
+public class TransactionSerializer implements Serializer<Transaction> {
+
+  private static final DateTimeFormatter formatter =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
+
+  @Override
+  public void configure(Map<String, ?> map, boolean b) {}
+
+  @Override
+  public byte[] serialize(String s, Transaction transaction) {
+    String csv =
+        String.format(
+            "%s, %s, %s",
+            transaction.accountId, transaction.amount, transaction.timestamp.format(formatter));
+
+    return csv.getBytes();
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java
new file mode 100644
index 0000000..6ddf9a6
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen.model;
+
+import java.time.LocalDateTime;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+/** A supplier that generates an arbitrary transaction. */
+public class TransactionSupplier implements Supplier<Transaction> {
+
+  private final Random generator = new Random();
+
+  private final Iterator<Long> accounts =
+      Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
+          .flatMap(UnaryOperator.identity())
+          .iterator();
+
+  private final Iterator<LocalDateTime> timestamps =
+      Stream.iterate(
+              LocalDateTime.of(2000, 1, 1, 1, 0),
+              time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
+          .iterator();
+
+  @Override
+  public Transaction get() {
+    Transaction transaction = new Transaction();
+    transaction.accountId = accounts.next();
+    transaction.amount = generator.nextInt(1000);
+    transaction.timestamp = timestamps.next();
+
+    return transaction;
+  }
+}
diff --git a/table-walkthrough/Dockerfile b/table-walkthrough/Dockerfile
new file mode 100644
index 0000000..86cf2df
--- /dev/null
+++ b/table-walkthrough/Dockerfile
@@ -0,0 +1,45 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM maven:3.6-jdk-8-slim AS builder
+
+COPY ./pom.xml /opt/pom.xml
+COPY ./src /opt/src
+RUN cd /opt; mvn clean install -Dmaven.test.skip
+
+FROM flink:1.11-SNAPSHOT-scala_2.11
+
+# Download connector libraries for snapshot version
+RUN wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/1.11-SNAPSHOT/flink-sql-connector-kafka_2.11-1.11-20200610.034108-152.jar; \
+    wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-jdbc_2.11/1.11-SNAPSHOT/flink-connector-jdbc_2.11-1.11-20200610.033814-8.jar; \
+    wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-csv/1.11-SNAPSHOT/flink-csv-1.11-20200610.033438-153.jar; \
+    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;
+
+
+# Download connector libraries
+#RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/${FLINK_VERSION}/flink-sql-connector-kafka_2.11-${FLINK_VERSION}.jar; \
+#    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-jdbc_2.11/${FLINK_VERSION}/flink-jdbc_2.11-${FLINK_VERSION}.jar; \
+#    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
+#    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;
+
+COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar
+
+RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
+    echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
+    echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
+    echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;
diff --git a/table-walkthrough/docker-compose.yml b/table-walkthrough/docker-compose.yml
new file mode 100644
index 0000000..2388af4
--- /dev/null
+++ b/table-walkthrough/docker-compose.yml
@@ -0,0 +1,91 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2.1'
+services:
+  jobmanager:
+    image: apache/flink-table-walkthrough:1-FLINK-1.11-scala_2.11
+    build: .
+    hostname: "jobmanager"
+    expose:
+      - "6123"
+    ports:
+      - "8082:8081"
+    command: standalone-job
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS=jobmanager
+    depends_on:
+      - kafka
+      - mysql
+  taskmanager:
+    image: apache/flink-playground-walkthrough:1-FLINK-1.11-scala_2.11
+    build: .
+    expose:
+      - "6121"
+      - "6122"
+    depends_on:
+      - jobmanager
+    command: taskmanager
+    links:
+      - jobmanager:jobmanager
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS=jobmanager
+  zookeeper:
+    image: wurstmeister/zookeeper:3.4.6
+    ports:
+      - "2181:2181"
+  kafka:
+    image: wurstmeister/kafka:2.12-2.2.1
+    ports:
+      - "9092:9092"
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: "kafka"
+      KAFKA_ADVERTISED_PORT: "9092"
+      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_CREATE_TOPICS: "kafka:1:1"
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock
+  data-generator:
+      image: apache/data-generator:1
+      build: ../docker/data-generator
+      depends_on:
+        - kafka
+  mysql:
+    image: mysql:8.0.19
+    command: --default-authentication-plugin=mysql_native_password --secure_file_priv=/data
+    environment:
+      MYSQL_USER: "sql-demo"
+      MYSQL_PASSWORD: "demo-sql"
+      MYSQL_DATABASE: "sql-demo"
+      MYSQL_RANDOM_ROOT_PASSWORD: "yes"
+    volumes:
+      - ../docker/mysql-spend-report-init:/docker-entrypoint-initdb.d
+      - ./data:/data
+  grafana:
+    image: grafana/grafana
+    ports:
+      - "3000:3000"
+    depends_on:
+      - mysql
+    volumes:
+      - ../docker/grafana-spend-report-init/provisioning/:/etc/grafana/provisioning/
+      - ../docker/grafana-spend-report-init/dashboard.json:/etc/grafana/dashboard.json
+      - ../docker/grafana-spend-report-init/grafana.ini:/etc/grafana/grafana.ini
diff --git a/table-walkthrough/pom.xml b/table-walkthrough/pom.xml
new file mode 100644
index 0000000..dda85c8
--- /dev/null
+++ b/table-walkthrough/pom.xml
@@ -0,0 +1,251 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.flink</groupId>
+	<artifactId>spend-report</artifactId>
+	<version>1.0.0</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough Table Java</name>
+	<url>https://flink.apache.org</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>1.11-SNAPSHOT</flink.version>
+		<java.version>1.8</java.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<maven.compiler.source>${java.version}</maven.compiler.source>
+		<maven.compiler.target>${java.version}</maven.compiler.target>
+    </properties>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+		</dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Add logging framework, to produce console output when running in the IDE. -->
+        <!-- These dependencies are excluded from the application JAR by default. -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.7</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>runtime</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <!-- Java Compiler -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+
+            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.apache.flink:force-shading</exclude>
+                                    <exclude>com.google.code.findbugs:jsr305</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    <exclude>log4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.flink.playgrounds.spendreport.SpendReport</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+
+        <pluginManagement>
+            <plugins>
+
+                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-shade-plugin</artifactId>
+                                        <versionRange>[3.0.0,)</versionRange>
+                                        <goals>
+                                            <goal>shade</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-compiler-plugin</artifactId>
+                                        <versionRange>[3.1,)</versionRange>
+                                        <goals>
+                                            <goal>testCompile</goal>
+                                            <goal>compile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.rat</groupId>
+                    <artifactId>apache-rat-plugin</artifactId>
+                    <version>0.13</version>
+                    <inherited>false</inherited>
+                    <executions>
+                        <execution>
+                            <phase>verify</phase>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                    <configuration>
+                        <excludes>
+                            <!-- Additional files like .gitignore etc.-->
+                            <exclude>**/.*/**</exclude>
+                            <exclude>**/*.prefs</exclude>
+                            <exclude>**/*.log</exclude>
+                            <!-- Administrative files in the main trunk. -->
+                            <exclude>**/README.md</exclude>
+                            <exclude>**/CODE_OF_CONDUCT.md</exclude>
+                            <exclude>.github/**</exclude>
+                            <!-- IDE files. -->
+                            <exclude>**/*.iml</exclude>
+                            <!-- Generated content -->
+                            <exclude>**/target/**</exclude>
+                            <exclude>**/dependency-reduced-pom.xml</exclude>
+                        </excludes>
+                    </configuration>
+                </plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+</project>
diff --git a/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java
new file mode 100644
index 0000000..1a8cb83
--- /dev/null
+++ b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+public class SpendReport {
+
+    public static Table report(Table transactions) {
+        throw new UnimplementedException();
+    }
+
+    public static void main(String[] args) throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+
+        tEnv.executeSql("CREATE TABLE transactions (\n" +
+                "    account_id  BIGINT,\n" +
+                "    amount      BIGINT,\n" +
+                "    transaction_time TIMESTAMP(3),\n" +
+                "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
+                ") WITH (\n" +
+                "    'connector' = 'kafka',\n" +
+                "    'topic'     = 'transactions',\n" +
+                "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+                "    'format'    = 'csv'\n" +
+                ")");
+
+        tEnv.executeSql("CREATE TABLE spend_report (\n" +
+                "    account_id BIGINT,\n" +
+                "    log_ts     TIMESTAMP(3),\n" +
+                "    amount     BIGINT\n," +
+                "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+                ") WITH (\n" +
+                "  'connector'  = 'jdbc',\n" +
+                "  'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+                "  'table-name' = 'spend_report',\n" +
+                "  'driver'     = 'com.mysql.jdbc.Driver',\n" +
+                "  'username'   = 'sql-demo',\n" +
+                "  'password'   = 'demo-sql'\n" +
+                ")");
+
+        Table transactions = tEnv.from("transactions");
+        report(transactions).executeInsert("spend_report");
+    }
+}
diff --git a/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java
new file mode 100644
index 0000000..e1c986f
--- /dev/null
+++ b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+public class UnimplementedException extends RuntimeException {
+
+    public UnimplementedException() {
+        super("This method has not yet been implemented");
+    }
+}
diff --git a/table-walkthrough/src/main/resources/log4j.properties b/table-walkthrough/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e26ea17
--- /dev/null
+++ b/table-walkthrough/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java b/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
new file mode 100644
index 0000000..208f6e9
--- /dev/null
+++ b/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A unit test of the spend report.
+ * If this test passes then the business
+ * logic is correct.
+ */
+public class SpendReportTest {
+
+    private static final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
+    
+    @Test
+    public void testReport() {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+
+        Table transactions =
+                tEnv.fromValues(
+                        DataTypes.ROW(
+                                DataTypes.FIELD("account_id", DataTypes.BIGINT()),
+                                DataTypes.FIELD("amount", DataTypes.BIGINT()),
+                                DataTypes.FIELD("transaction_time", DataTypes.TIMESTAMP(3))),
+                        Row.of(1, 188, DATE_TIME.plusMinutes(12)),
+                        Row.of(2, 374, DATE_TIME.plusMinutes(47)),
+                        Row.of(3, 112, DATE_TIME.plusMinutes(36)),
+                        Row.of(4, 478, DATE_TIME.plusMinutes(3)),
+                        Row.of(5, 208, DATE_TIME.plusMinutes(8)),
+                        Row.of(1, 379, DATE_TIME.plusMinutes(53)),
+                        Row.of(2, 351, DATE_TIME.plusMinutes(32)),
+                        Row.of(3, 320, DATE_TIME.plusMinutes(31)),
+                        Row.of(4, 259, DATE_TIME.plusMinutes(19)),
+                        Row.of(5, 273, DATE_TIME.plusMinutes(42)));
+
+        try {
+            TableResult results = SpendReport.report(transactions).execute();
+
+            MatcherAssert.assertThat(
+                    materialize(results),
+                    Matchers.containsInAnyOrder(
+                            Row.of(1L, DATE_TIME, 567L),
+                            Row.of(2L, DATE_TIME, 725L),
+                            Row.of(3L, DATE_TIME, 432L),
+                            Row.of(4L, DATE_TIME, 737L),
+                            Row.of(5L, DATE_TIME, 481L)));
+        } catch (UnimplementedException e) {
+            Assume.assumeNoException("The walkthrough has not been implemented", e);
+        }
+    }
+    
+    private static List<Row> materialize(TableResult results) {
+        try (CloseableIterator<Row> resultIterator = results.collect()) {
+            return StreamSupport
+                    .stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to materialize results", e);
+        }
+    }
+}
diff --git a/table-walkthrough/src/test/resources/log4j.properties b/table-walkthrough/src/test/resources/log4j.properties
new file mode 100644
index 0000000..5cae07b
--- /dev/null
+++ b/table-walkthrough/src/test/resources/log4j.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.streaming.api.operators.collect.CollectResultFetcher=OFF
+log4j.logger.org.apache.flink.streaming.api.operators.collect.CollectSinkFunction=OFF
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n