The TaskManager is responsible for executing individual tasks of a Flink job. Each TaskManager will have at least one task slot and manage a subset of computing resources (cpu, memory, etc.) of underlying machines. How many tasks a TaskManger can hold is limited by both its slot number and resources. This document describes Flink's TaskManager resource calculation methods, to help the user better understand how many resources the Flink ResourceManager will allocate for each TaskManager from resource management frameworks like YARN, Mesos or Kubernetes, and how many of which are supplied for user jobs.
For session and perjob modes, the calculation methods of TaskManager resources is totally different.
Note: Setting resources of any operator in your job will enable the resource matching.
Use the following configurations to set TaskManager cpu resource and the number of task slots.
{% highlight yaml %} taskmanager.cpu.core: 1 taskmanager.numberOfTaskSlots: 10 {% endhighlight %}
As shown in the figure above, the memory of each TaskManager consists of three main parts: JVM heap, JVM direct, and native.
Use the following configurations to set TaskManager total and fine-grained memory resource.
{% highlight yaml %} -tm Total memory per TaskManager Container or Pod at executing yarn-session.sh
or kubernetes-session.sh
. {% endhighlight %}
If the resource matching is disabled, the resources of all operators will be set to UNKNOWN. The same method as session will be used to calculate the resources of each TaskManager.
If the resource matching is enabled, you could set up the resources for operator through ResourceSpec, including CpuCores, HeapMemory, DirectMemory, NativeMemory and ManagedMemory. The JobManager will allocate slots with corresponding resources from ResourceManager. The ResourceManager will combine the slot requests into Yarn container or kubernetes pod requests and send to the resource management framework.
Using the following two configurations to control how slot requests should be combined into a container or pod.
{% highlight yaml %} taskmanager.multi-slots.max.memory.mb: 8192 taskmanager.multi-slots.max.cpu.core: 1 {% endhighlight %}
Using the following code to set resources for operator. {% highlight java %} ResourceSpec resourceSpec = ResourceSpec.newBuilder() .setCpuCores(1) .setHeapMemoryInMB(1024) .setDirectMemoryInMB(128) .setNativeMemoryInMB(64) .addExtendedResource(new CommonExtendedResource(ResourceSpec.MANAGED_MEMORY_NAME, 10)) .build();
((SingleOutputStreamOperator) text).setResources(resourceSpec) .flatMap(new Tokenizer()).setResources(resourceSpec) .keyBy(0).sum(1).setResources(resourceSpec); {% endhighlight %}
Note: In perjob mode, the slot number of each TaskManager is dynamically calculated. The configuration option taskmanager.numberOfTaskSlots
will not take effect.
{% top %}