blob: f2faee6ef203ad5cac6ca97a70bff20723d5d21a [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.datastream.slot_sharing_group import MemorySize, SlotSharingGroup
from pyflink.testing.test_case_utils import PyFlinkTestCase
class SlotSharingGroupTests(PyFlinkTestCase):
def test_build_slot_sharing_group_with_specific_resource(self):
name = 'slot_sharing_group'
heap_memory = MemorySize.of_mebi_bytes(100)
off_heap_memory = MemorySize.of_mebi_bytes(200)
managed_memory = MemorySize.of_mebi_bytes(300)
slot_sharing_group = SlotSharingGroup.builder(name) \
.set_cpu_cores(1.0) \
.set_task_heap_memory(heap_memory) \
.set_task_off_heap_memory(off_heap_memory) \
.set_managed_memory(managed_memory) \
.set_external_resource('gpu', 1.0) \
.build()
self.assertEqual(slot_sharing_group.get_name(), name)
self.assertEqual(slot_sharing_group.get_cpu_cores(), 1.0)
self.assertEqual(slot_sharing_group.get_task_heap_memory(), heap_memory)
self.assertEqual(slot_sharing_group.get_task_off_heap_memory(), off_heap_memory)
self.assertEqual(slot_sharing_group.get_managed_memory(), managed_memory)
self.assertEqual(slot_sharing_group.get_external_resources(), {'gpu': 1.0})
def test_build_slot_sharing_group_with_unknown_resource(self):
name = 'slot_sharing_group'
slot_sharing_group = SlotSharingGroup.builder(name).build()
self.assertEqual(slot_sharing_group.get_name(), name)
self.assertIsNone(slot_sharing_group.get_cpu_cores())
self.assertIsNone(slot_sharing_group.get_task_heap_memory())
self.assertIsNone(slot_sharing_group.get_task_off_heap_memory())
self.assertIsNone(slot_sharing_group.get_managed_memory())
self.assertEqual(slot_sharing_group.get_external_resources(), {})
def test_build_slot_sharing_group_with_illegal_config(self):
with self.assertRaises(Exception):
SlotSharingGroup.builder("slot_sharing_group") \
.set_cpu_cores(1.0) \
.set_task_heap_memory(MemorySize(bytes_size=0)) \
.set_task_off_heap_memory_mb(10) \
.build()
def test_build_slot_sharing_group_without_all_required_config(self):
with self.assertRaises(Exception):
SlotSharingGroup.builder("slot_sharing_group") \
.set_cpu_cores(1.0) \
.set_task_off_heap_memory_mb(10) \
.build()