title: “Deploy Python pipelines on Kubernetes using the Flink runner” date: “2024-06-20T13:56:15+10:00” categories:
The Apache Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. With the operator, we can simplify the deployment and management of Apache Beam pipelines.
In this post, we develop an Apache Beam pipeline using the Python SDK and deploy it on an Apache Flink cluster by using the Apache Flink runner. We first deploy an Apache Kafka cluster on a minikube cluster, because the pipeline uses Kafka topics for its data source and sink. Then, we develop the pipeline as a Python package and add the package to a custom Docker image so that Python user code can be executed externally. For deployment, we create a Flink session cluster using the Flink Kubernetes Operator, and deploy the pipeline using a Kubernetes job. Finally, we check the output of the application by sending messages to the input Kafka topic using a Python producer application.
{{< toc >}}
We develop an Apache Beam pipeline using the Python SDK and deploy it on an Apache Flink cluster using the Apache Flink runner. Although the Flink cluster is created by the Flink Kubernetes Operator, we need two components to run the pipeline on the Flink runner: the job service and the SDK harness. Roughly speaking, the job service converts details about a Python pipeline into a format that the Flink runner can understand. The SDK harness executes the Python user code. The Python SDK provides convenience wrappers to manage those components, and you can use it by specifying FlinkRunner in the pipeline option, for example, --runner=FlinkRunner. The job service is managed automatically. We rely on our own SDK harness as a sidecar container for simplicity. Also, we need the Java IO Expansion Service, because the pipeline uses Apache Kafka topics for its data source and sink, and the Kafka Connector I/O is developed in Java. Simply put, the expansion service is used to serialize data for the Java SDK.
An Apache Kafka cluster is deployed using the Strimzi Operator on a minikube cluster. We install Strimzi version 0.39.0 and Kubernetes version 1.25.3. After the minikube CLI and Docker are installed, you can create a minikube cluster by specifying the Kubernetes version. You can find the source code for this blog post in the GitHub repository.
{{< highlight bash >}} minikube start --cpus=‘max’ --memory=20480
--addons=metrics-server --kubernetes-version=v1.25.3 {{< /highlight >}}
The GitHub repository keeps manifest files that you can use to deploy the Strimzi operator, Kafka cluster, and Kafka management application. To download a different version of the operator, download the relevant manifest file by specifying the version. By default, the manifest file assumes that the resources are deployed in the myproject namespace. However, because we deploy them in the default namespace, we need to change the resource namespace. We change the resource namespace using sed.
To deploy the operator, use the kubectl create command.
{{< highlight bash >}}
STRIMZI_VERSION=“0.39.0”
DOWNLOAD_URL=https://github.com/strimzi/strimzi-kafka-operator/releases/download/$STRIMZI_VERSION/strimzi-cluster-operator-$STRIMZI_VERSION.yaml curl -L -o kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
${DOWNLOAD_URL}
sed -i ‘s/namespace: .*/namespace: default/’ kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
kubectl create -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml {{< /highlight >}}
Verify that the Strimzi Operator runs as a Kubernetes deployment.
{{< highlight bash >}} kubectl get deploy,rs,po
{{< /highlight >}}
We deploy a Kafka cluster with a single broker and Zookeeper node. It has both internal and external listeners on ports 9092 and 29092, respectively. The external listener is used to access the Kafka cluster outside the minikube cluster. Also, the cluster is configured to allow automatic creation of topics (auto.create.topics.enable: "true"), and the default number of partitions is set to 3 (num.partitions: 3).
# kafka/manifests/kafka-cluster.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: demo-cluster spec: kafka: version: 3.5.2 replicas: 1 resources: requests: memory: 256Mi cpu: 250m limits: memory: 512Mi cpu: 500m listeners: - name: plain port: 9092 type: internal tls: false - name: external port: 29092 type: nodeport tls: false storage: type: jbod volumes: - id: 0 type: persistent-claim size: 20Gi deleteClaim: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.5" auto.create.topics.enable: "true" num.partitions: 3 zookeeper: replicas: 1 resources: requests: memory: 256Mi cpu: 250m limits: memory: 512Mi cpu: 500m storage: type: persistent-claim size: 10Gi deleteClaim: true
Deploy he Kafka cluster using the kubectl create command.
{{< highlight bash >}} kubectl create -f kafka/manifests/kafka-cluster.yaml {{< /highlight >}}
The Kafka and Zookeeper nodes are managed by the StrimziPodSet custom resource. It also creates multiple Kubernetes services. In this series, we use the following services:
demo-cluster-kafka-bootstrap - to access Kafka brokers from the client and management appsdemo-cluster-zookeeper-client - to access Zookeeper node from the management appdemo-cluster-kafka-external-bootstrap - to access Kafka brokers from the producer app{{< highlight bash >}} kubectl get po,strimzipodsets.core.strimzi.io,svc -l app.kubernetes.io/instance=demo-cluster
{{< /highlight >}}
UI for Apache Kafka (kafka-ui) is a free and open-source Kafka management application. It's deployed as a Kubernetes Deployment. The Deployment is configured to have a single instance, and the Kafka cluster access details are specified as environment variables.
# kafka/manifests/kafka-ui.yaml apiVersion: v1 kind: Service metadata: labels: app: kafka-ui name: kafka-ui spec: type: ClusterIP ports: - port: 8080 targetPort: 8080 selector: app: kafka-ui --- apiVersion: apps/v1 kind: Deployment metadata: labels: app: kafka-ui name: kafka-ui spec: replicas: 1 selector: matchLabels: app: kafka-ui template: metadata: labels: app: kafka-ui spec: containers: - image: provectuslabs/kafka-ui:v0.7.1 name: kafka-ui-container ports: - containerPort: 8080 env: - name: KAFKA_CLUSTERS_0_NAME value: demo-cluster - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS value: demo-cluster-kafka-bootstrap:9092 - name: KAFKA_CLUSTERS_0_ZOOKEEPER value: demo-cluster-zookeeper-client:2181 resources: requests: memory: 256Mi cpu: 250m limits: memory: 512Mi cpu: 500m
Deploy the Kafka management app (kafka-ui) using the kubectl create command.
{{< highlight bash >}} kubectl create -f kafka/manifests/kafka-ui.yaml
kubectl get all -l app=kafka-ui
{{< /highlight >}}
We use kubectl port-forward to connect to the kafka-ui server running in the minikube cluster on port 8080.
{{< highlight bash >}} kubectl port-forward svc/kafka-ui 8080 {{< /highlight >}}
We develop an Apache Beam pipeline as a Python package and add it to a custom Docker image, which is used to execute Python user code (SDK harness). We also build another custom Docker image, which adds the Java SDK of Apache Beam to the official Flink base image. This image is used to deploy a Flink cluster and to execute Java user code of the Kafka Connector I/O.
The application first reads text messages from an input Kafka topic. Next, it extracts words by splitting the messages (ReadWordsFromKafka). Then, the elements (words) are added to a fixed time window of 5 seconds, and their average length is calculated (CalculateAvgWordLen). Finally, we include the window start and end timestamps, and send the updated element to an output Kafka topic (WriteWordLenToKafka).
We create a custom Java IO Expansion Service (get_expansion_service) and add it to the ReadFromKafka and WriteToKafka transforms of the Kafka Connector I/O. Although the Kafka I/O provides a function to create that service, it did not work for me (or I do not understand how to make use of it yet). Instead, I created a custom service, as illustrated in Building Big Data Pipelines with Apache Beam by Jan Lukavský. The expansion service Jar file (beam-sdks-java-io-expansion-service.jar) must exist in the Kubernetes job that executes the pipeline, while the Java SDK (/opt/apache/beam/boot) must exist in the runner worker.
# beam/word_len/word_len.py import json import argparse import re import logging import typing import apache_beam as beam from apache_beam import pvalue from apache_beam.io import kafka from apache_beam.transforms.window import FixedWindows from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.transforms.external import JavaJarExpansionService def get_expansion_service( jar="/opt/apache/beam/jars/beam-sdks-java-io-expansion-service.jar", args=None ): if args == None: args = [ "--defaultEnvironmentType=PROCESS", '--defaultEnvironmentConfig={"command": "/opt/apache/beam/boot"}', "--experiments=use_deprecated_read", ] return JavaJarExpansionService(jar, ["{{PORT}}"] + args) class WordAccum(typing.NamedTuple): length: int count: int beam.coders.registry.register_coder(WordAccum, beam.coders.RowCoder) def decode_message(kafka_kv: tuple, verbose: bool = False): if verbose: print(kafka_kv) return kafka_kv[1].decode("utf-8") def tokenize(element: str): return re.findall(r"[A-Za-z\']+", element) def create_message(element: typing.Tuple[str, str, float]): msg = json.dumps(dict(zip(["window_start", "window_end", "avg_len"], element))) print(msg) return "".encode("utf-8"), msg.encode("utf-8") class AverageFn(beam.CombineFn): def create_accumulator(self): return WordAccum(length=0, count=0) def add_input(self, mutable_accumulator: WordAccum, element: str): length, count = tuple(mutable_accumulator) return WordAccum(length=length + len(element), count=count + 1) def merge_accumulators(self, accumulators: typing.List[WordAccum]): lengths, counts = zip(*accumulators) return WordAccum(length=sum(lengths), count=sum(counts)) def extract_output(self, accumulator: WordAccum): length, count = tuple(accumulator) return length / count if count else float("NaN") def get_accumulator_coder(self): return beam.coders.registry.get_coder(WordAccum) class AddWindowTS(beam.DoFn): def process(self, avg_len: float, win_param=beam.DoFn.WindowParam): yield ( win_param.start.to_rfc3339(), win_param.end.to_rfc3339(), avg_len, ) class ReadWordsFromKafka(beam.PTransform): def __init__( self, bootstrap_servers: str, topics: typing.List[str], group_id: str, verbose: bool = False, expansion_service: typing.Any = None, label: str | None = None, ) -> None: super().__init__(label) self.boostrap_servers = bootstrap_servers self.topics = topics self.group_id = group_id self.verbose = verbose self.expansion_service = expansion_service def expand(self, input: pvalue.PBegin): return ( input | "ReadFromKafka" >> kafka.ReadFromKafka( consumer_config={ "bootstrap.servers": self.boostrap_servers, "auto.offset.reset": "latest", # "enable.auto.commit": "true", "group.id": self.group_id, }, topics=self.topics, timestamp_policy=kafka.ReadFromKafka.create_time_policy, commit_offset_in_finalize=True, expansion_service=self.expansion_service, ) | "DecodeMessage" >> beam.Map(decode_message) | "Tokenize" >> beam.FlatMap(tokenize) ) class CalculateAvgWordLen(beam.PTransform): def expand(self, input: pvalue.PCollection): return ( input | "Windowing" >> beam.WindowInto(FixedWindows(size=5)) | "GetAvgWordLength" >> beam.CombineGlobally(AverageFn()).without_defaults() ) class WriteWordLenToKafka(beam.PTransform): def __init__( self, bootstrap_servers: str, topic: str, expansion_service: typing.Any = None, label: str | None = None, ) -> None: super().__init__(label) self.boostrap_servers = bootstrap_servers self.topic = topic self.expansion_service = expansion_service def expand(self, input: pvalue.PCollection): return ( input | "AddWindowTS" >> beam.ParDo(AddWindowTS()) | "CreateMessages" >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes]) | "WriteToKafka" >> kafka.WriteToKafka( producer_config={"bootstrap.servers": self.boostrap_servers}, topic=self.topic, expansion_service=self.expansion_service, ) ) def run(argv=None, save_main_session=True): parser = argparse.ArgumentParser(description="Beam pipeline arguments") parser.add_argument( "--deploy", dest="deploy", action="store_true", default="Flag to indicate whether to deploy to a cluster", ) parser.add_argument( "--bootstrap_servers", dest="bootstrap", default="host.docker.internal:29092", help="Kafka bootstrap server addresses", ) parser.add_argument( "--input_topic", dest="input", default="input-topic", help="Kafka input topic name", ) parser.add_argument( "--output_topic", dest="output", default="output-topic-beam", help="Kafka output topic name", ) parser.add_argument( "--group_id", dest="group", default="beam-word-len", help="Kafka output group ID", ) known_args, pipeline_args = parser.parse_known_args(argv) print(known_args) print(pipeline_args) # We use the save_main_session option because one or more DoFn elements in this # workflow rely on global context. That is, a module imported at the module level. pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session expansion_service = None if known_args.deploy is True: expansion_service = get_expansion_service() with beam.Pipeline(options=pipeline_options) as p: ( p | "ReadWordsFromKafka" >> ReadWordsFromKafka( bootstrap_servers=known_args.bootstrap, topics=[known_args.input], group_id=known_args.group, expansion_service=expansion_service, ) | "CalculateAvgWordLen" >> CalculateAvgWordLen() | "WriteWordLenToKafka" >> WriteWordLenToKafka( bootstrap_servers=known_args.bootstrap, topic=known_args.output, expansion_service=expansion_service, ) ) logging.getLogger().setLevel(logging.DEBUG) logging.info("Building pipeline ...") if __name__ == "__main__": run()
The pipeline script is added to a Python package under a folder named word_len. A simple module named run is created, because it is executed as a module, for example, python -m .... When I ran the pipeline as a script, I encountered an error. This packaging method is for demonstration only. For a recommended way of packaging a pipeline, see Managing Python Pipeline Dependencies.
# beam/word_len/run.py from . import * run()
Overall, the pipeline package uses the following structure.
{{< highlight bash >}} tree beam/word_len
beam/word_len ├── init.py ├── run.py └── word_len.py {{< /highlight >}}
As discussed previously, we build a custom Docker image (beam-python-example:1.16) and use it to deploy a Flink cluster and to run the Java user code of the Kafka Connector I/O.
# beam/Dockerfile FROM flink:1.16 COPY --from=apache/beam_java11_sdk:2.56.0 /opt/apache/beam/ /opt/apache/beam/
We also build a custom Docker image (beam-python-harness:2.56.0) to run Python user code (SDK harness). From the Python SDK Docker image, it first installs the Java Development Kit (JDK) and downloads the Java IO Expansion Service Jar file. Then, the Beam pipeline packages are copied to the /app folder. The app folder is added to the PYTHONPATH environment variable, which makes the packages searchable.
# beam/Dockerfile-python-harness FROM apache/beam_python3.10_sdk:2.56.0 ARG BEAM_VERSION ENV BEAM_VERSION=${BEAM_VERSION:-2.56.0} ENV REPO_BASE_URL=https://repo1.maven.org/maven2/org/apache/beam RUN apt-get update && apt-get install -y default-jdk RUN mkdir -p /opt/apache/beam/jars \ && wget ${REPO_BASE_URL}/beam-sdks-java-io-expansion-service/${BEAM_VERSION}/beam-sdks-java-io-expansion-service-${BEAM_VERSION}.jar \ --progress=bar:force:noscroll -O /opt/apache/beam/jars/beam-sdks-java-io-expansion-service.jar COPY word_len /app/word_len COPY word_count /app/word_count ENV PYTHONPATH="$PYTHONPATH:/app"
Because the custom images need to be accessible in the minikube cluster, we point the terminal‘s docker-cli to the minikube’s Docker engine. Then, we can build the images using the docker build command.
{{< highlight bash >}} eval $(minikube docker-env) docker build -t beam-python-example:1.16 beam/ docker build -t beam-python-harness:2.56.0 -f beam/Dockerfile-python-harness beam/ {{< /highlight >}}
The Beam pipeline is executed on a Flink session cluster, which is deployed by the Flink Kubernetes Operator. The application deployment mode where the Beam pipeline is deployed as a Flink job doesn‘t seem to work (or I don’t understand how to do so yet) due to either a job submission timeout error or a failure to upload the job artifact. After the pipeline is deployed, we check the output of the application by sending text messages to the input Kafka topic.
First, to make it possible to add the webhook component, install the certificate manager on the minikube cluster. Then, use a Helm chart to install the operator. Version 1.8.0 is installed in the post.
{{< highlight bash >}} kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
helm list
{{< /highlight >}}
First, create a Flink session cluster. In the manifest file, configure common properties, such as the Docker image, Flink version, cluster configuration, and pod template. These properties are applied to the Flink job manager and task manager. In addition, specify the replica and resource. We add a sidecar container to the task manager, and this SDK harness container is configured to execute Python user code - see the following job configuration.
# beam/word_len_cluster.yml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: word-len-cluster spec: image: beam-python-example:1.16 imagePullPolicy: Never flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "10" serviceAccount: flink podTemplate: spec: containers: - name: flink-main-container volumeMounts: - mountPath: /opt/flink/log name: flink-logs volumes: - name: flink-logs emptyDir: {} jobManager: resource: memory: "2048Mi" cpu: 2 taskManager: replicas: 1 resource: memory: "2048Mi" cpu: 2 podTemplate: spec: containers: - name: python-harness image: beam-python-harness:2.56.0 args: ["-worker_pool"] ports: - containerPort: 50000 name: harness-port
The pipeline is deployed using a Kubernetes job, and the custom SDK harness image is used to execute the pipeline as a module. The first two arguments are application-specific. The rest of the arguments are for pipeline options. For more information about the pipeline arguments, see the pipeline options source and Flink Runner document. To execute Python user code in the sidecar container, we set the environment type to EXTERNAL and the environment config to localhost:50000.
# beam/word_len_job.yml apiVersion: batch/v1 kind: Job metadata: name: word-len-job spec: template: metadata: labels: app: word-len-job spec: containers: - name: beam-word-len-job image: beam-python-harness:2.56.0 command: ["python"] args: - "-m" - "word_len.run" - "--deploy" - "--bootstrap_servers=demo-cluster-kafka-bootstrap:9092" - "--runner=FlinkRunner" - "--flink_master=word-len-cluster-rest:8081" - "--job_name=beam-word-len" - "--streaming" - "--parallelism=3" - "--flink_submit_uber_jar" - "--environment_type=EXTERNAL" - "--environment_config=localhost:50000" - "--checkpointing_interval=10000" restartPolicy: Never
Deploy the session cluster and job using the kubectl create command. The session cluster is created by the FlinkDeployment custom resource, and it manages the job manager deployment, task manager pod, and associated services. When we check the log of the job's pod, we see that it does the following tasks:
{{< highlight bash >}} kubectl create -f beam/word_len_cluster.yml
kubectl create -f beam/word_len_job.yml
kubectl logs word-len-job-p5rph -f
{{< /highlight >}}
After the deployment completes, we can see the following Flink session cluster and job related resources.
{{< highlight bash >}} kubectl get all -l app=word-len-cluster
kubectl get all -l app=word-len-job
{{< /highlight >}}
You can access the Flink web UI using the kubectl port-forward command on port 8081. The job graph shows two tasks. The first task adds word elements into a fixed time window. The second task sends the average word length records to the output topic.
{{< highlight bash >}} kubectl port-forward svc/flink-word-len-rest 8081 {{< /highlight >}}
The Kafka I/O automatically creates a topic if it doesn't exist, and we can see the input topic is created on kafka-ui.
A simple Python Kafka producer is created to check the output of the application. By default, the producer app sends random text from the Faker package to the input Kafka topic every one second.
# kafka/client/producer.py import os import time from faker import Faker from kafka import KafkaProducer class TextProducer: def __init__(self, bootstrap_servers: list, topic_name: str) -> None: self.bootstrap_servers = bootstrap_servers self.topic_name = topic_name self.kafka_producer = self.create_producer() def create_producer(self): """ Returns a KafkaProducer instance """ return KafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: v.encode("utf-8"), ) def send_to_kafka(self, text: str, timestamp_ms: int = None): """ Sends text to a Kafka topic. """ try: args = {"topic": self.topic_name, "value": text} if timestamp_ms is not None: args = {**args, **{"timestamp_ms": timestamp_ms}} self.kafka_producer.send(**args) self.kafka_producer.flush() except Exception as e: raise RuntimeError("fails to send a message") from e if __name__ == "__main__": producer = TextProducer( os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"), os.getenv("TOPIC_NAME", "input-topic"), ) fake = Faker() num_events = 0 while True: num_events += 1 text = fake.text() producer.send_to_kafka(text) if num_events % 5 == 0: print(f"<<<<<{num_events} text sent... current>>>>\n{text}") time.sleep(int(os.getenv("DELAY_SECONDS", "1")))
Expose the Kafka bootstrap server on port 29092 using the kubectl port-forward command. Execute the Python script to start the producer app.
{{< highlight bash >}} kubectl port-forward svc/demo-cluster-kafka-external-bootstrap 29092
python kafka/client/producer.py {{< /highlight >}}
We can see the output topic (output-topic-beam) is created on kafka-ui.
Also, we can check that the output messages are created as expected in the Topics tab.
Delete the Kubernetes resources and the minikube cluster using the following steps.
{{< highlight bash >}}
kubectl delete -f beam/word_len_cluster.yml kubectl delete -f beam/word_len_job.yml helm uninstall flink-kubernetes-operator helm repo remove flink-operator-repo kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
STRIMZI_VERSION=“0.39.0” kubectl delete -f kafka/manifests/kafka-cluster.yaml kubectl delete -f kafka/manifests/kafka-ui.yaml kubectl delete -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
minikube delete {{< /highlight >}}