[FLINK-30441] Upgrading PyFlink walkthrough to Flink 1.16
diff --git a/pyflink-walkthrough/Dockerfile b/pyflink-walkthrough/Dockerfile
index c0364b2..c93b29a 100644
--- a/pyflink-walkthrough/Dockerfile
+++ b/pyflink-walkthrough/Dockerfile
@@ -20,15 +20,15 @@
# Build PyFlink Playground Image
###############################################################################
-FROM apache/flink:1.15.2-scala_2.12-java8
-ARG FLINK_VERSION=1.15.2
+FROM apache/flink:1.16.0-scala_2.12-java8
+ARG FLINK_VERSION=1.16.0
# Install python3.7 and pyflink
# Pyflink does not yet function with python3.9, and this image is build on
# debian bullseye which ships with that version, so build python3.7 here.
RUN set -ex; \
apt-get update && \
- apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
+ apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
@@ -42,7 +42,7 @@
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
python -m pip install --upgrade pip; \
- pip install apache-flink==1.15.2; \
+ pip install apache-flink==${FLINK_VERSION}; \
pip install kafka-python;
# Download connector libraries
diff --git a/pyflink-walkthrough/README.md b/pyflink-walkthrough/README.md
index 6543baf..4b8e76e 100644
--- a/pyflink-walkthrough/README.md
+++ b/pyflink-walkthrough/README.md
@@ -5,14 +5,15 @@
In this playground, you will learn how to build and run an end-to-end PyFlink pipeline for data analytics, covering the following steps:
* Reading data from a Kafka source;
-* Creating data using a [UDF](https://ci.apache.org/projects/flink/flink-docs-release-1.14/dev/python/table-api-users-guide/udfs/python_udfs.html);
+* Creating data using a [UDF](https://ci.apache.org/projects/flink/flink-docs-release-1.16/dev/python/table-api-users-guide/udfs/python_udfs.html);
* Performing a simple aggregation over the source data;
* Writing the results to Elasticsearch and visualizing them in Kibana.
The environment is based on Docker Compose, so the only requirement is that you have [Docker](https://docs.docker.com/get-docker/)
-installed in your machine.
+installed on your machine.
-### Kafka
+### Apache Kafka
+
You will be using Kafka to store sample input data about payment transactions. A simple data generator [generate_source_data.py](generator/generate_source_data.py) is provided to
continuously write new records to the `payment_msg` Kafka topic. Each record is structured as follows:
@@ -41,13 +42,13 @@
```
-### ElasticSearch
+### Elasticsearch
-ElasticSearch is used to store the results and to provide an efficient query service.
+Elasticsearch is used to store the results and to provide an efficient query service.
### Kibana
-Kibana is an open source data visualization dashboard for ElasticSearch. You will use it to visualize
+Kibana is an open source data visualization dashboard for Elasticsearch. You will use it to visualize
the total transaction paymentAmount and proportion for each provinces in this PyFlink pipeline through a dashboard.
## Setup
@@ -85,6 +86,7 @@
### Checking the Kafka service
You can use the following command to read data from the Kafka topic and check whether it's generated correctly:
+
```shell script
$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic payment_msg
{"createTime":"2020-07-27 09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
@@ -93,7 +95,9 @@
{"createTime":"2020-07-27 09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
{"createTime":"2020-07-27 09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
```
+
You can also create a new topic by executing the following command:
+
```shell script
$ docker-compose exec kafka kafka-topics.sh --bootstrap-server kafka:9092 --create --topic <YOUR-TOPIC-NAME> --partitions 8 --replication-factor 1
```
@@ -101,9 +105,11 @@
## Running the PyFlink job
1. Submit the PyFlink job.
+
```shell script
$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d
```
+
Navigate to the [Flink Web UI](http://localhost:8081) after the job is submitted successfully. There should be a job in the running job list.
Click the job to get more details. You should see that the `StreamGraph` of the `payment_msg_proccessing` consists of two nodes, each with a parallelism of 1.
There is also a table in the bottom of the page that shows some metrics for each node (e.g. bytes received/sent, records received/sent). Note that Flink's metrics only
@@ -147,7 +153,8 @@
* Count the number of transactions, grouped by a 1 minute [tumbling window](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#tumble-tumbling-windows) and `payPlatform`.
After making a modification, you can submit the new job by executing the same command mentioned at
-[Running the PyFlink Job](#running-the-pyflink-job)
+[Running the PyFlink Job](#running-the-pyflink-job):
+
```shell script
$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d
```
diff --git a/pyflink-walkthrough/docker-compose.yml b/pyflink-walkthrough/docker-compose.yml
index 3defab8..fe89ff6 100644
--- a/pyflink-walkthrough/docker-compose.yml
+++ b/pyflink-walkthrough/docker-compose.yml
@@ -20,7 +20,7 @@
services:
jobmanager:
build: .
- image: pyflink/pyflink:1.15.2-scala_2.12
+ image: pyflink/pyflink:1.16.0-scala_2.12
volumes:
- .:/opt/pyflink-walkthrough
hostname: "jobmanager"
@@ -32,7 +32,7 @@
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
- image: pyflink/pyflink:1.15.2-scala_2.12
+ image: pyflink/pyflink:1.16.0-scala_2.12
volumes:
- .:/opt/pyflink-walkthrough
expose:
@@ -47,6 +47,10 @@
- JOB_MANAGER_RPC_ADDRESS=jobmanager
zookeeper:
image: wurstmeister/zookeeper:3.4.6
+ ulimits:
+ nofile:
+ soft: 65536
+ hard: 65536
ports:
- "2181:2181"
kafka:
diff --git a/pyflink-walkthrough/payment_msg_proccessing.py b/pyflink-walkthrough/payment_msg_proccessing.py
index 759175a..bc2a1c7 100644
--- a/pyflink-walkthrough/payment_msg_proccessing.py
+++ b/pyflink-walkthrough/payment_msg_proccessing.py
@@ -18,6 +18,7 @@
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
+from pyflink.table.expressions import call, col
from pyflink.table.udf import udf
@@ -73,9 +74,9 @@
t_env.register_function('province_id_to_name', province_id_to_name)
t_env.from_path("payment_msg") \
- .select("province_id_to_name(provinceId) as province, payAmount") \
- .group_by("province") \
- .select("province, sum(payAmount) as pay_amount") \
+ .select(call('province_id_to_name', col('provinceId')).alias("province"), col('payAmount')) \
+ .group_by(col('province')) \
+ .select(col('province'), call('sum', col('payAmount').alias("pay_amount"))) \
.execute_insert("es_sink")