Kubernetes is a container orchestration system.
A basic Flink cluster deployment in Kubernetes has three components:
Using the [resource definitions found below](#simple-kubernetes-flink-cluster- resources), launch the cluster with the kubectl
command:
kubectl create -f jobmanager-deployment.yaml kubectl create -f taskmanager-deployment.yaml kubectl create -f jobmanager-service.yaml
You can then access the Flink UI via kubectl proxy
:
kubectl proxy
in a terminalAgain, use kubectl
to delete the cluster:
kubectl delete -f jobmanager-deployment.yaml kubectl delete -f jobmanager-service.yaml kubectl delete -f taskmanager-deployment.yaml
kubectl auth can-i <list|create|edit|delete> pods
Use the following command to build a image with user jar
{% highlight bash %} cd flink-container/docker
./build.sh --job-jar examples/streaming/WordCount.jar --from-local-dist --image-name
docker push {% endhighlight %}
Use the following command to start a session {% highlight bash %} ./bin/kubernetes-session.sh {% endhighlight %}
This command will show you the following overview: {% highlight bash %} Usage: Required -ms,--master Kubernetes cluster master url -n,--pods Number of kubernetes pods to allocate (=Number of Task Managers) Optional -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the kubernetes session CLI. -i,--image Container image to use for Flink containers. Individual container types (e.g. jobmanager or taskmanager) can also be configured to use different images if desired, by setting the container type-specific image name. -jm,--jobManagerMemory Memory for JobManager Container [in MB] -nm,--name Set a custom name for the flink cluster on kubernetes -ns,--namespace Specify kubernetes namespace. -s,--slots Number of slots per TaskManager -sa,--serviceaddress The exposed address of kubernetes service to submit job and view dashboard. -tm,--taskManagerMemory Memory per TaskManager Container [in MB] {% endhighlight %}
Example: Issue the following command to allocate 4 Task Managers, with 8GB of memory and 32 processing slots each: {% highlight bash %} ./bin/kubernetes-session.sh -ms https://k8s-master:port -n 4 -tm 8192 -i flink-k8s:latest {% endhighlight %}
Blob Server and Task Manager are required to use nonrandom RPC ports. They can be configured with the following config options, either in flink-conf.yaml or as -D
flags at starting the session.
{% highlight bash %} blob.server.port: 7788 taskmanager.rpc.port: 7789 {% endhighlight %}
Once Flink is deployed in your kubernetes cluster, it will show you the connection details of the Job Manager.
In detached mode, the Flink client will exit after submitting the the service to the kubernetes cluster. If you want to stop the Kubernetes session, please use the Kubernetes utilities(kubectl delete service <ServiceName>
). You can also start another client and attach to the session to stop it.
There are several ways to expose a Service onto an external (outside of your cluster) IP address. This could be changed by kubernetes.service.exposed.type
.
NodePort
). <NodeIP>:<NodePort>
could be used to contact the Job Manager Service.kubectl get services/<ServiceName>
to get EXTERNAL-IP for ServiceAddress argument.Navigate to publishing services in Kubernetes to get more information.
Use the following command to submit job {% highlight bash %} bin/flink run -m kubernetes-cluster -knm -ksa examples/streaming/WordCount.jar {% endhighlight %}
-nm
when starting a session. If you do not specify a certain name, Flink client will generate a UUID for you session cluster.<ClusterId>-service
localhost
, <NodeIP>:<NodePort>
or EXTERNAL-IP
based on exposed type.Use the following command to attach to a session. {% highlight bash %} ./bin/kubernetes-session.sh -ms https://k8s-master:port -nm -sa {% endhighlight %}
The documentation above describes how to start a Flink cluster within a Kubernetes environment. It is also possible to launch a new Flink cluster for executing each individual job with better isolation.
Example:
{% highlight bash %} ./bin/flink run -m kubernetes-cluster -kms https://k8s-master:port -kn 4 -ki flink-k8s:latest examples/streaming/WordCount.jar {% endhighlight %}
The command line options of the Kubernetes session are also available with the ./bin/flink tool. They are prefixed with a k or kubernetes (for the long argument options).
Note: In attach mode, the argument -kn
(number of TaskManagers) is required and kubernetes.service.exposed.type
must be either NODE_PORT
or LOAD_BALANCER
.
Note: You can use a different configuration directory per job by setting the environment variable FLINK_CONF_DIR. To use this copy the conf directory from the Flink distribution and modify, for example, the logging settings on a per-job basis.
Note: It is also possible to “fire and forget” a Flink job to the Kubernetes cluster in detached mode. Use -m to specify the kubernetes-cluster and -d for detached mode. The -kn
argument will not take effect and resource is allocated as demand. Also in this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!
Note: If you want to accessing the Job Manager UI or get the logs, set kubernetes.destroy-perjob-cluster.after-job-finished=false
and the Flink cluster will not be destroyed after finished.
Users could use the following command to retrieve logs of Job Manager and Task Manager. {% highlight bash %} kubectl logs pod/ {% endhighlight %}
Namespaces in Kubernetes are a way to divide cluster resources between multiple users (via resource quota). It is similar to queue concept in Yarn cluster. Flink on Kubernetes can use namespaces to launch Flink clusters. The namespace could be specified by -ns
argument when starting a Flink cluster.
ResourceQuota provides constraints that limit aggregate resource consumption per namespace. It can limit the quantity of objects that can be created in a namespace by type, as well as the total amount of compute resources that may be consumed by resources in that project.
Role-based access control (RBAC) is a method of regulating access to compute or network resources based on the roles of individual users within an enterprise. So users can configure RBAC roles and service accounts used by Flink JobManager to access the Kubernetes API server within the Kubernetes cluster.
Every namespace will have a default service account. However, the default
service account may not have the permission to create or delete pods within the Kubernetes cluster. So users may need to specify another service account that has the right role binded. The configuration option kubernetes.jobmanager.service-account
could be used to set the service account. Use the following command to make the JobManager pod use the flink
service account to create and delete TaskManager pods.
{% highlight bash %} -D kubernetes.jobmanager.service-account=flink {% endhighlight %}
If the flink
service account does not exist, use the following command to create a new one and set the role binding.
{% highlight bash %} kubectl create serviceaccount flink kubectl create clusterrolebinding flink-role-binding --clusterrole=edit --serviceaccount=default:flink --namespace=default {% endhighlight %}
Navigate to RBAC Authorization for more information.
An early version of a [Flink Helm chart](https://github.com/docker-flink/ examples) is available on GitHub.
jobmanager-deployment.yaml
{% highlight yaml %} apiVersion: extensions/v1beta1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: flink:latest args: - jobmanager ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob - containerPort: 6125 name: query - containerPort: 8081 name: ui env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-jobmanager {% endhighlight %}
taskmanager-deployment.yaml
{% highlight yaml %} apiVersion: extensions/v1beta1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: flink:latest args: - taskmanager ports: - containerPort: 6121 name: data - containerPort: 6122 name: rpc - containerPort: 6125 name: query env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-jobmanager {% endhighlight %}
jobmanager-service.yaml
{% highlight yaml %} apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: ports:
This section briefly describes how Flink and Kubernetes interact.
When starting a Kubernetes session, Flink client will first (step 1) contact to Kubernetes ApiServer to submit the cluster description, including ConfigMap spec, Job Manager Service spec, Job Manager Replica Controller spec and Owner Reference.
The next step (step 2), Kubernetes Master create the required components. The Kubelet will pull the image, prepare and mount the volume and then execute the start command.
Once Flink JobManager pod is launched, the ResourceManager will allocate (step 3) the specified number of Task Managers. The JobManager will generate a new configuration for the TaskManagers, with the address of Job Manager set to ServiceName. This allows the TaskManagers to connect back to the JobManager after failover).
After all TaskManagers are launched and registered to ResourceManager and JobManager, the session is ready to accept jobs. {% top %}