| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| __all__ = ['MemorySize', 'SlotSharingGroup'] |
| |
| from typing import Optional |
| |
| from pyflink.java_gateway import get_gateway |
| |
| |
| class MemorySize(object): |
| """ |
| MemorySize is a representation of a number of bytes, viewable in different units. |
| """ |
| |
| def __init__(self, j_memory_size=None, bytes_size: int = None): |
| self._j_memory_size = get_gateway().jvm \ |
| .org.apache.flink.configuration.MemorySize(bytes_size) \ |
| if j_memory_size is None else j_memory_size |
| |
| @staticmethod |
| def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize': |
| return MemorySize( |
| get_gateway().jvm.org.apache.flink.configuration.MemorySize.ofMebiBytes(mebi_bytes)) |
| |
| def get_bytes(self) -> int: |
| """ |
| Gets the memory size in bytes. |
| |
| :return: The memory size in bytes. |
| """ |
| return self._j_memory_size.getBytes() |
| |
| def get_kibi_bytes(self) -> int: |
| """ |
| Gets the memory size in Kibibytes (= 1024 bytes). |
| |
| :return: The memory size in Kibibytes. |
| """ |
| return self._j_memory_size.getKibiBytes() |
| |
| def get_mebi_bytes(self) -> int: |
| """ |
| Gets the memory size in Mebibytes (= 1024 Kibibytes). |
| |
| :return: The memory size in Mebibytes. |
| """ |
| return self._j_memory_size.getMebiBytes() |
| |
| def get_gibi_bytes(self) -> int: |
| """ |
| Gets the memory size in Gibibytes (= 1024 Mebibytes). |
| |
| :return: The memory size in Gibibytes. |
| """ |
| return self._j_memory_size.getGibiBytes() |
| |
| def get_tebi_bytes(self) -> int: |
| """ |
| Gets the memory size in Tebibytes (= 1024 Gibibytes). |
| |
| :return: The memory size in Tebibytes. |
| """ |
| return self._j_memory_size.getTebiBytes() |
| |
| def get_java_memory_size(self): |
| """ |
| Gets the Java MemorySize object. |
| |
| :return: The Java MemorySize object. |
| """ |
| return self._j_memory_size |
| |
| def __eq__(self, other): |
| return isinstance(other, self.__class__) and self._j_memory_size == other._j_memory_size |
| |
| def __hash__(self): |
| return self._j_memory_size.hashCode() |
| |
| def __lt__(self, other: 'MemorySize'): |
| if not isinstance(other, MemorySize): |
| raise Exception("Does not support comparison with non-MemorySize %s" % other) |
| |
| return self._j_memory_size.compareTo(other._j_memory_size) == -1 |
| |
| def __le__(self, other: 'MemorySize'): |
| return self.__eq__(other) and self.__lt__(other) |
| |
| def __str__(self): |
| return self._j_memory_size.toString() |
| |
| |
| class SlotSharingGroup(object): |
| """ |
| Describe the name and the the different resource components of a slot sharing group. |
| """ |
| |
| def __init__(self, j_slot_sharing_group): |
| self._j_slot_sharing_group = j_slot_sharing_group |
| |
| def get_name(self) -> str: |
| """ |
| Gets the name of this SlotSharingGroup. |
| |
| :return: The name of the SlotSharingGroup. |
| """ |
| return self._j_slot_sharing_group.getName() |
| |
| def get_managed_memory(self) -> Optional[MemorySize]: |
| """ |
| Gets the task managed memory for this SlotSharingGroup. |
| |
| :return: The task managed memory of the SlotSharingGroup. |
| """ |
| managed_memory = self._j_slot_sharing_group.getManagedMemory() |
| return MemorySize(managed_memory.get()) if managed_memory.isPresent() else None |
| |
| def get_task_heap_memory(self) -> Optional[MemorySize]: |
| """ |
| Gets the task heap memory for this SlotSharingGroup. |
| |
| :return: The task heap memory of the SlotSharingGroup. |
| """ |
| task_heap_memory = self._j_slot_sharing_group.getTaskHeapMemory() |
| return MemorySize(task_heap_memory.get()) if task_heap_memory.isPresent() else None |
| |
| def get_task_off_heap_memory(self) -> Optional[MemorySize]: |
| """ |
| Gets the task off-heap memory for this SlotSharingGroup. |
| |
| :return: The task off-heap memory of the SlotSharingGroup. |
| """ |
| task_off_heap_memory = self._j_slot_sharing_group.getTaskOffHeapMemory() |
| return MemorySize(task_off_heap_memory.get()) if task_off_heap_memory.isPresent() else None |
| |
| def get_cpu_cores(self) -> Optional[float]: |
| """ |
| Gets the CPU cores for this SlotSharingGroup. |
| |
| :return: The CPU cores of the SlotSharingGroup. |
| """ |
| cpu_cores = self._j_slot_sharing_group.getCpuCores() |
| return cpu_cores.get() if cpu_cores.isPresent() else None |
| |
| def get_external_resources(self) -> dict: |
| """ |
| Gets the external resource from this SlotSharingGroup. |
| |
| :return: User specified resources of the SlotSharingGroup. |
| """ |
| return dict(self._j_slot_sharing_group.getExternalResources()) |
| |
| def get_java_slot_sharing_group(self): |
| """ |
| Gets the Java SlotSharingGroup object. |
| |
| :return: The Java SlotSharingGroup object. |
| """ |
| return self._j_slot_sharing_group |
| |
| @staticmethod |
| def builder(name: str) -> 'Builder': |
| """ |
| Gets the Builder with the given name for this SlotSharingGroup. |
| |
| :param name: The name of the SlotSharingGroup. |
| :return: The builder for the SlotSharingGroup. |
| """ |
| return SlotSharingGroup.Builder( |
| get_gateway().jvm.org.apache.flink.api.common.operators.SlotSharingGroup.newBuilder( |
| name)) |
| |
| def __eq__(self, other): |
| return isinstance(other, self.__class__) and \ |
| self._j_slot_sharing_group == other._j_slot_sharing_group |
| |
| def __hash__(self): |
| return self._j_slot_sharing_group.hashCode() |
| |
| class Builder(object): |
| """ |
| Builder for the SlotSharingGroup. |
| """ |
| |
| def __init__(self, j_builder): |
| self._j_builder = j_builder |
| |
| def set_cpu_cores(self, cpu_cores: float) -> 'SlotSharingGroup.Builder': |
| """ |
| Sets the CPU cores for this SlotSharingGroup. |
| |
| :param cpu_cores: The CPU cores of the SlotSharingGroup. |
| :return: This object. |
| """ |
| self._j_builder.setCpuCores(cpu_cores) |
| return self |
| |
| def set_task_heap_memory(self, task_heap_memory: MemorySize) -> 'SlotSharingGroup.Builder': |
| """ |
| Sets the task heap memory for this SlotSharingGroup. |
| |
| :param task_heap_memory: The task heap memory of the SlotSharingGroup. |
| :return: This object. |
| """ |
| self._j_builder.setTaskHeapMemory(task_heap_memory.get_java_memory_size()) |
| return self |
| |
| def set_task_heap_memory_mb(self, task_heap_memory_mb: int) -> 'SlotSharingGroup.Builder': |
| """ |
| Sets the task heap memory for this SlotSharingGroup in MB. |
| |
| :param task_heap_memory_mb: The task heap memory of the SlotSharingGroup in MB. |
| :return: This object. |
| """ |
| self._j_builder.setTaskHeapMemoryMB(task_heap_memory_mb) |
| return self |
| |
| def set_task_off_heap_memory(self, task_off_heap_memory: MemorySize) \ |
| -> 'SlotSharingGroup.Builder': |
| """ |
| Sets the task off-heap memory for this SlotSharingGroup. |
| |
| :param task_off_heap_memory: The task off-heap memory of the SlotSharingGroup. |
| :return: This object. |
| """ |
| self._j_builder.setTaskOffHeapMemory(task_off_heap_memory.get_java_memory_size()) |
| return self |
| |
| def set_task_off_heap_memory_mb(self, task_off_heap_memory_mb: int) \ |
| -> 'SlotSharingGroup.Builder': |
| """ |
| Sets the task off-heap memory for this SlotSharingGroup in MB. |
| |
| :param task_off_heap_memory_mb: The task off-heap memory of the SlotSharingGroup in MB. |
| :return: This object. |
| """ |
| self._j_builder.setTaskOffHeapMemoryMB(task_off_heap_memory_mb) |
| return self |
| |
| def set_managed_memory(self, managed_memory: MemorySize) -> 'SlotSharingGroup.Builder': |
| """ |
| Sets the task managed memory for this SlotSharingGroup. |
| |
| :param managed_memory: The task managed memory of the SlotSharingGroup. |
| :return: This object. |
| """ |
| self._j_builder.setManagedMemory(managed_memory.get_java_memory_size()) |
| return self |
| |
| def set_managed_memory_mb(self, managed_memory_mb: int) -> 'SlotSharingGroup.Builder': |
| """ |
| Sets the task managed memory for this SlotSharingGroup in MB. |
| |
| :param managed_memory_mb: The task managed memory of the SlotSharingGroup in MB. |
| :return: This object. |
| """ |
| self._j_builder.setManagedMemoryMB(managed_memory_mb) |
| return self |
| |
| def set_external_resource(self, name: str, value: float) -> 'SlotSharingGroup.Builder': |
| """ |
| Adds the given external resource. The old value with the same resource name will be |
| replaced if present. |
| |
| :param name: The resource name of the given external resource. |
| :param value: The value of the given external resource. |
| :return: This object. |
| """ |
| self._j_builder.setExternalResource(name, value) |
| return self |
| |
| def build(self) -> 'SlotSharingGroup': |
| """ |
| Builds the SlotSharingGroup. |
| |
| :return: The SlotSharingGroup object. |
| """ |
| return SlotSharingGroup(j_slot_sharing_group=self._j_builder.build()) |