blob: 5fb66da69c4f501ea82e5b93d8864b093ce9ce23 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
__all__ = ['MemorySize', 'SlotSharingGroup']
from typing import Optional
from pyflink.java_gateway import get_gateway
class MemorySize(object):
"""
MemorySize is a representation of a number of bytes, viewable in different units.
"""
def __init__(self, j_memory_size=None, bytes_size: int = None):
self._j_memory_size = get_gateway().jvm \
.org.apache.flink.configuration.MemorySize(bytes_size) \
if j_memory_size is None else j_memory_size
@staticmethod
def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize':
return MemorySize(
get_gateway().jvm.org.apache.flink.configuration.MemorySize.ofMebiBytes(mebi_bytes))
def get_bytes(self) -> int:
"""
Gets the memory size in bytes.
:return: The memory size in bytes.
"""
return self._j_memory_size.getBytes()
def get_kibi_bytes(self) -> int:
"""
Gets the memory size in Kibibytes (= 1024 bytes).
:return: The memory size in Kibibytes.
"""
return self._j_memory_size.getKibiBytes()
def get_mebi_bytes(self) -> int:
"""
Gets the memory size in Mebibytes (= 1024 Kibibytes).
:return: The memory size in Mebibytes.
"""
return self._j_memory_size.getMebiBytes()
def get_gibi_bytes(self) -> int:
"""
Gets the memory size in Gibibytes (= 1024 Mebibytes).
:return: The memory size in Gibibytes.
"""
return self._j_memory_size.getGibiBytes()
def get_tebi_bytes(self) -> int:
"""
Gets the memory size in Tebibytes (= 1024 Gibibytes).
:return: The memory size in Tebibytes.
"""
return self._j_memory_size.getTebiBytes()
def get_java_memory_size(self):
"""
Gets the Java MemorySize object.
:return: The Java MemorySize object.
"""
return self._j_memory_size
def __eq__(self, other):
return isinstance(other, self.__class__) and self._j_memory_size == other._j_memory_size
def __hash__(self):
return self._j_memory_size.hashCode()
def __lt__(self, other: 'MemorySize'):
if not isinstance(other, MemorySize):
raise Exception("Does not support comparison with non-MemorySize %s" % other)
return self._j_memory_size.compareTo(other._j_memory_size) == -1
def __le__(self, other: 'MemorySize'):
return self.__eq__(other) and self.__lt__(other)
def __str__(self):
return self._j_memory_size.toString()
class SlotSharingGroup(object):
"""
Describe the name and the the different resource components of a slot sharing group.
"""
def __init__(self, j_slot_sharing_group):
self._j_slot_sharing_group = j_slot_sharing_group
def get_name(self) -> str:
"""
Gets the name of this SlotSharingGroup.
:return: The name of the SlotSharingGroup.
"""
return self._j_slot_sharing_group.getName()
def get_managed_memory(self) -> Optional[MemorySize]:
"""
Gets the task managed memory for this SlotSharingGroup.
:return: The task managed memory of the SlotSharingGroup.
"""
managed_memory = self._j_slot_sharing_group.getManagedMemory()
return MemorySize(managed_memory.get()) if managed_memory.isPresent() else None
def get_task_heap_memory(self) -> Optional[MemorySize]:
"""
Gets the task heap memory for this SlotSharingGroup.
:return: The task heap memory of the SlotSharingGroup.
"""
task_heap_memory = self._j_slot_sharing_group.getTaskHeapMemory()
return MemorySize(task_heap_memory.get()) if task_heap_memory.isPresent() else None
def get_task_off_heap_memory(self) -> Optional[MemorySize]:
"""
Gets the task off-heap memory for this SlotSharingGroup.
:return: The task off-heap memory of the SlotSharingGroup.
"""
task_off_heap_memory = self._j_slot_sharing_group.getTaskOffHeapMemory()
return MemorySize(task_off_heap_memory.get()) if task_off_heap_memory.isPresent() else None
def get_cpu_cores(self) -> Optional[float]:
"""
Gets the CPU cores for this SlotSharingGroup.
:return: The CPU cores of the SlotSharingGroup.
"""
cpu_cores = self._j_slot_sharing_group.getCpuCores()
return cpu_cores.get() if cpu_cores.isPresent() else None
def get_external_resources(self) -> dict:
"""
Gets the external resource from this SlotSharingGroup.
:return: User specified resources of the SlotSharingGroup.
"""
return dict(self._j_slot_sharing_group.getExternalResources())
def get_java_slot_sharing_group(self):
"""
Gets the Java SlotSharingGroup object.
:return: The Java SlotSharingGroup object.
"""
return self._j_slot_sharing_group
@staticmethod
def builder(name: str) -> 'Builder':
"""
Gets the Builder with the given name for this SlotSharingGroup.
:param name: The name of the SlotSharingGroup.
:return: The builder for the SlotSharingGroup.
"""
return SlotSharingGroup.Builder(
get_gateway().jvm.org.apache.flink.api.common.operators.SlotSharingGroup.newBuilder(
name))
def __eq__(self, other):
return isinstance(other, self.__class__) and \
self._j_slot_sharing_group == other._j_slot_sharing_group
def __hash__(self):
return self._j_slot_sharing_group.hashCode()
class Builder(object):
"""
Builder for the SlotSharingGroup.
"""
def __init__(self, j_builder):
self._j_builder = j_builder
def set_cpu_cores(self, cpu_cores: float) -> 'SlotSharingGroup.Builder':
"""
Sets the CPU cores for this SlotSharingGroup.
:param cpu_cores: The CPU cores of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setCpuCores(cpu_cores)
return self
def set_task_heap_memory(self, task_heap_memory: MemorySize) -> 'SlotSharingGroup.Builder':
"""
Sets the task heap memory for this SlotSharingGroup.
:param task_heap_memory: The task heap memory of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setTaskHeapMemory(task_heap_memory.get_java_memory_size())
return self
def set_task_heap_memory_mb(self, task_heap_memory_mb: int) -> 'SlotSharingGroup.Builder':
"""
Sets the task heap memory for this SlotSharingGroup in MB.
:param task_heap_memory_mb: The task heap memory of the SlotSharingGroup in MB.
:return: This object.
"""
self._j_builder.setTaskHeapMemoryMB(task_heap_memory_mb)
return self
def set_task_off_heap_memory(self, task_off_heap_memory: MemorySize) \
-> 'SlotSharingGroup.Builder':
"""
Sets the task off-heap memory for this SlotSharingGroup.
:param task_off_heap_memory: The task off-heap memory of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setTaskOffHeapMemory(task_off_heap_memory.get_java_memory_size())
return self
def set_task_off_heap_memory_mb(self, task_off_heap_memory_mb: int) \
-> 'SlotSharingGroup.Builder':
"""
Sets the task off-heap memory for this SlotSharingGroup in MB.
:param task_off_heap_memory_mb: The task off-heap memory of the SlotSharingGroup in MB.
:return: This object.
"""
self._j_builder.setTaskOffHeapMemoryMB(task_off_heap_memory_mb)
return self
def set_managed_memory(self, managed_memory: MemorySize) -> 'SlotSharingGroup.Builder':
"""
Sets the task managed memory for this SlotSharingGroup.
:param managed_memory: The task managed memory of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setManagedMemory(managed_memory.get_java_memory_size())
return self
def set_managed_memory_mb(self, managed_memory_mb: int) -> 'SlotSharingGroup.Builder':
"""
Sets the task managed memory for this SlotSharingGroup in MB.
:param managed_memory_mb: The task managed memory of the SlotSharingGroup in MB.
:return: This object.
"""
self._j_builder.setManagedMemoryMB(managed_memory_mb)
return self
def set_external_resource(self, name: str, value: float) -> 'SlotSharingGroup.Builder':
"""
Adds the given external resource. The old value with the same resource name will be
replaced if present.
:param name: The resource name of the given external resource.
:param value: The value of the given external resource.
:return: This object.
"""
self._j_builder.setExternalResource(name, value)
return self
def build(self) -> 'SlotSharingGroup':
"""
Builds the SlotSharingGroup.
:return: The SlotSharingGroup object.
"""
return SlotSharingGroup(j_slot_sharing_group=self._j_builder.build())