blob: b68ccfc2103ca787d7c43520db24631c1289c236 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional
class MemorySize:
"""MemorySize is a representation of a number of bytes, viewable in different units."""
ZERO = None
MAX_VALUE = None
def __init__(self, bytes: int):
"""Constructs a new MemorySize."""
if bytes < 0:
raise ValueError("bytes must be >= 0")
self.bytes = bytes
@staticmethod
def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize':
return MemorySize(mebi_bytes << 20)
@staticmethod
def of_kibi_bytes(kibi_bytes: int) -> 'MemorySize':
return MemorySize(kibi_bytes << 10)
@staticmethod
def of_bytes(bytes: int) -> 'MemorySize':
return MemorySize(bytes)
def get_bytes(self) -> int:
return self.bytes
def get_kibi_bytes(self) -> int:
return self.bytes >> 10
def get_mebi_bytes(self) -> int:
return self.bytes >> 20
def get_gibi_bytes(self) -> int:
return self.bytes >> 30
def get_tebi_bytes(self) -> int:
return self.bytes >> 40
def __eq__(self, other) -> bool:
return isinstance(other, MemorySize) and self.bytes == other.bytes
def __hash__(self) -> int:
return hash(self.bytes)
def __str__(self) -> str:
return self.format_to_string()
def format_to_string(self) -> str:
ORDERED_UNITS = [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES,
MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]
highest_integer_unit = MemoryUnit.BYTES
for idx, unit in enumerate(ORDERED_UNITS):
if self.bytes % unit.multiplier != 0:
if idx == 0:
highest_integer_unit = ORDERED_UNITS[0]
else:
highest_integer_unit = ORDERED_UNITS[idx - 1]
break
else:
highest_integer_unit = MemoryUnit.BYTES
return f"{self.bytes // highest_integer_unit.multiplier} {highest_integer_unit.units[1]}"
def __repr__(self) -> str:
return f"MemorySize({self.bytes})"
def __lt__(self, other: 'MemorySize') -> bool:
return self.bytes < other.bytes
def __le__(self, other: 'MemorySize') -> bool:
return self.bytes <= other.bytes
def __gt__(self, other: 'MemorySize') -> bool:
return self.bytes > other.bytes
def __ge__(self, other: 'MemorySize') -> bool:
return self.bytes >= other.bytes
@staticmethod
def parse(text: str) -> 'MemorySize':
return MemorySize(MemorySize.parse_bytes(text))
@staticmethod
def parse_bytes(text: str) -> int:
if text is None:
raise ValueError("text cannot be None")
trimmed = text.strip()
if not trimmed:
raise ValueError("argument is an empty- or whitespace-only string")
pos = 0
while pos < len(trimmed) and trimmed[pos].isdigit():
pos += 1
number_str = trimmed[:pos]
unit_str = trimmed[pos:].strip().lower()
if not number_str:
raise ValueError("text does not start with a number")
try:
value = int(number_str)
except ValueError:
raise ValueError(
f"The value '{number_str}' cannot be represented as 64bit number (numeric overflow).")
unit = MemorySize._parse_unit(unit_str)
multiplier = unit.multiplier if unit else 1
result = value * multiplier
if result // multiplier != value:
raise ValueError(
f"The value '{text}' cannot be represented as 64bit number of bytes (numeric overflow).")
return result
@staticmethod
def _parse_unit(unit_str: str) -> Optional['MemoryUnit']:
if not unit_str:
return None
for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES,
MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
if unit_str in unit.units:
return unit
raise ValueError(
f"Memory size unit '{unit_str}' does not match any of the recognized units: "
f"{MemoryUnit.get_all_units()}")
class MemoryUnit:
"""Enum which defines memory unit, mostly used to parse value from configuration file."""
def __init__(self, units: list, multiplier: int):
self.units = units
self.multiplier = multiplier
BYTES = None
KILO_BYTES = None
MEGA_BYTES = None
GIGA_BYTES = None
TERA_BYTES = None
@staticmethod
def get_all_units() -> str:
all_units = []
for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES,
MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
all_units.append("(" + " | ".join(unit.units) + ")")
return " / ".join(all_units)
@staticmethod
def has_unit(text: str) -> bool:
if text is None:
raise ValueError("text cannot be None")
trimmed = text.strip()
if not trimmed:
raise ValueError("argument is an empty- or whitespace-only string")
pos = 0
while pos < len(trimmed) and trimmed[pos].isdigit():
pos += 1
unit = trimmed[pos:].strip().lower()
return len(unit) > 0
MemoryUnit.BYTES = MemoryUnit(["b", "bytes"], 1)
MemoryUnit.KILO_BYTES = MemoryUnit(["k", "kb", "kibibytes"], 1024)
MemoryUnit.MEGA_BYTES = MemoryUnit(["m", "mb", "mebibytes"], 1024 * 1024)
MemoryUnit.GIGA_BYTES = MemoryUnit(["g", "gb", "gibibytes"], 1024 * 1024 * 1024)
MemoryUnit.TERA_BYTES = MemoryUnit(["t", "tb", "tebibytes"], 1024 * 1024 * 1024 * 1024)
MemorySize.ZERO = MemorySize(0)
MemorySize.MAX_VALUE = MemorySize(2**63 - 1)