Flink Kubernetes Operator Python Example

Overview

This is an end-to-end example of running Flink Python jobs using the Flink Kubernetes Operator.

What's in this example?

  1. Python script of a simple streaming job
  2. DockerFile to build custom image with pyflink and python demo
  3. Example YAML for submitting the python job using the operator

How does it work?

Flink supports Python jobs in application mode by utilizing org.apache.flink.client.python.PythonDriver class as the entry class. With the Flink Kubernetes Operator, we can reuse this class to run Python jobs as well.

The class is packaged in flink-python_${scala_version}-${flink_version}.jar which is in the default Flink image. So we do not need to create a new job jar. Instead, we just set entryClass of the job crd to org.apache.flink.client.python.PythonDriver. After applying the job yaml, the launched job manager pod will run the main() method of PythonDriver and parse arguments declared in the args field of the job crd.

Note, in args field, users must either specify -py option or -pym option. Besides, order of elements in args field matters: due to current parsing process, Flink specific options(e.g -pyfs, -py) must be placed at first and user-defined arguments should be placed in the end. Check the doc for more details about PyFlink arguments.

A working example would be:

args: ["-pyfs", "/opt/flink/usrlib/pythonjob/python_demo.py", "-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pythonjob/python_demo.py", "-myarg", "123"]

But the following will throw exception:

args: ["-myarg", "123", "-pyfs", "/opt/flink/usrlib/pythonjob/python_demo.py", "-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pythonjob/python_demo.py"]

Usage

The following steps assume that you have the Flink Kubernetes Operator installed and running in your environment.

Step 1: Put your Python script files under the flink-python-example directory and add your Python script into the Dockerfile

Step 2: Build docker image

Check this doc for more details about building Pyflink image. Note, Pyflink 1.15.3 is only supported on x86 arch.

# Uncomment when building for local minikube env:
# eval $(minikube docker-env)

DOCKER_BUILDKIT=1 docker build . -t flink-python-example:latest

This step will create an image based on an official Flink base image including the Python scripts.

Step 4: Create FlinkDeployment Yaml and Submit

Edit the included python-example.yaml so that the job.args section points to the Python script that you wish to execute, then submit it.

kubectl apply -f python-example.yaml

It is possible to reuse the above image for different Python scripts as long as users make them accessible on Job Manager Pod(e.g using PodTemplate with mounted storage).