tree: 68eba247fd9208b0df0360833a927085177728a0 [path history] [tgz]
  1. generator/
  2. kibana/
  3. pic/
  4. docker-compose.yml
  5. Dockerfile
  6. payment_msg_proccessing.py
  7. README.md
pyflink-walkthrough/README.md

pyflink-walkthrough

Background

In this playground, you will learn how to build and run an end-to-end PyFlink pipeline for data analytics, covering the following steps:

  • Reading data from a Kafka source;
  • Creating data using a UDF;
  • Performing a simple aggregation over the source data;
  • Writing the results to Elasticsearch and visualizing them in Kibana.

The environment is based on Docker Compose, so the only requirement is that you have Docker installed in your machine.

Kafka

You will be using Kafka to store sample input data about payment transactions. A simple data generator generate_source_data.py is provided to continuously write new records to the payment_msg Kafka topic. Each record is structured as follows:

{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 28306.44976403719, "payPlatform": 0, "provinceId": 4}

  • createTime: The creation time of the transaction.
  • orderId: The id of the current transaction.
  • payAmount: The amount being paid with this transaction.
  • payPlatform: The platform used to create this payment: pc or mobile.
  • provinceId: The id of the province for the user.

PyFlink

The transaction data will be processed with PyFlink using the Python script payment_msg_processing.py. This script will first map the provinceId in the input records to its corresponding province name using a Python UDF, and then compute the sum of the transaction amounts for each province. The following code snippet will explain the main processing logic in payment_msg_processing.py.


t_env.from_path("payment_msg") \ # Get the created Kafka source table named payment_msg .select("province_id_to_name(provinceId) as province, payAmount") \ # Select the provinceId and payAmount field and transform the provinceId to province name by a UDF .group_by("province") \ # Group the selected fields by province .select("province, sum(payAmount) as pay_amount") \ # Sum up payAmount for each province .execute_insert("es_sink") # Write the result into ElaticSearch Sink

ElasticSearch

ElasticSearch is used to store the results and to provide an efficient query service.

Kibana

Kibana is an open source data visualization dashboard for ElasticSearch. You will use it to visualize the total transaction paymentAmount and proportion for each provinces in this PyFlink pipeline through a dashboard.

Setup

As mentioned, the environment for this walkthrough is based on Docker Compose; It uses a custom image to spin up Flink (JobManager+TaskManager), Kafka+Zookeeper, the data generator, and Elasticsearch+Kibana containers.

You can find the docker-compose.yaml file of the pyflink-walkthrough in the pyflink-walkthrough root directory.

Building the Docker image

First, build the Docker image by running:

$ cd pyflink-walkthrough
$ docker-compose build

Starting the Playground

Once the Docker image build is complete, run the following command to start the playground:

$ docker-compose up -d

One way of checking if the playground was successfully started is to access some of the services that are exposed:

  1. visiting Flink Web UI http://localhost:8081.
  2. visiting Elasticsearch http://localhost:9200.
  3. visiting Kibana http://localhost:5601.

Note: you may need to wait around 1 minute before all the services come up.

Checking the Kafka service

You can use the following command to read data from the Kafka topic and check whether it's generated correctly:

$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic payment_msg
{"createTime":"2020-07-27 09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
{"createTime":"2020-07-27 09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
{"createTime":"2020-07-27 09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
{"createTime":"2020-07-27 09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
{"createTime":"2020-07-27 09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}

You can also create a new topic by executing the following command:

$ docker-compose exec kafka kafka-topics.sh --bootstrap-server kafka:9092 --create --topic <YOUR-TOPIC-NAME> --partitions 8 --replication-factor 1

Running the PyFlink job

  1. Submit the PyFlink job.
$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d

Navigate to the Flink Web UI after the job is submitted successfully. There should be a job in the running job list. Click the job to get more details. You should see that the StreamGraph of the payment_msg_proccessing consists of two nodes, each with a parallelism of 1. There is also a table in the bottom of the page that shows some metrics for each node (e.g. bytes received/sent, records received/sent). Note that Flink‘s metrics only report bytes and records and records communicated within the Flink cluster, and so will always report 0 bytes and 0 records received by sources, and 0 bytes and 0 records sent to sinks - so don’t be confused that noting is reported as being read from Kafka, or written to Elasticsearch.

image

image

  1. Navigate to the Kibana UI, open the menu list by clicking the menu button in the upper left corner, then choose the Dashboard item to turn to the dashboard page and choose the pre-created dashboard payment_dashboard. There will be a vertical bar chart and a pie chart demonstrating the total amount and the proportion of each province.

image

image

  1. Stop the PyFlink job:

Visit the Flink Web UI at http://localhost:8081/#/overview , select the job, and click Cancel Job in the upper right corner.

image

Stopping the Playground

To stop the playground, run the following command:

$ docker-compose down

Further Explorations

If you would like to explore this example more deeply, you can edit payment_msg_processing.py or create new PyFlink projects that perform more complex processing. You can do this locally under the pyflink-walkthrough directory, since it is mounted on the jobmanager docker container.

Ideas:

  • Add your own Kafka source table;
  • Create a new index for the Elasticsearch sink;
  • Count the number of transactions, grouped by a 1 minute tumbling window and payPlatform.

After making a modification, you can submit the new job by executing the same command mentioned at Running the PyFlink Job

$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d

Furthermore, you can also create new Kibana dashboards to visualize other aspects of the data in Elasticsearch.