blob: 824afc341915333c2019b8b5d3d54be0240eeb84 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import base64
import struct
from abc import ABC, abstractmethod
from functools import singledispatch
from typing import (
Any,
Callable,
Generic,
Literal,
Optional,
TypeVar,
)
import mmh3
from pydantic import Field, PositiveInt, PrivateAttr
from pyiceberg.types import (
BinaryType,
DateType,
DecimalType,
FixedType,
IcebergType,
IntegerType,
LongType,
StringType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
from pyiceberg.utils import datetime
from pyiceberg.utils.decimal import decimal_to_bytes, truncate_decimal
from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
from pyiceberg.utils.parsing import ParseNumberFromBrackets
from pyiceberg.utils.singleton import Singleton
S = TypeVar("S")
T = TypeVar("T")
IDENTITY = "identity"
VOID = "void"
BUCKET = "bucket"
TRUNCATE = "truncate"
BUCKET_PARSER = ParseNumberFromBrackets(BUCKET)
TRUNCATE_PARSER = ParseNumberFromBrackets(TRUNCATE)
class Transform(IcebergBaseModel, ABC, Generic[S, T]):
"""Transform base class for concrete transforms.
A base class to transform values and project predicates on partition values.
This class is not used directly. Instead, use one of module method to create the child classes.
"""
__root__: str = Field()
@classmethod
def __get_validators__(cls):
# one or more validators may be yielded which will be called in the
# order to validate the input, each validator will receive as an input
# the value returned from the previous validator
yield cls.validate
@classmethod
def validate(cls, v: Any):
# When Pydantic is unable to determine the subtype
# In this case we'll help pydantic a bit by parsing the transform type ourselves
if isinstance(v, str):
if v == IDENTITY:
return IdentityTransform()
elif v == VOID:
return VoidTransform()
elif v.startswith(BUCKET):
return BucketTransform(num_buckets=BUCKET_PARSER.match(v))
elif v.startswith(TRUNCATE):
return TruncateTransform(width=BUCKET_PARSER.match(v))
else:
return UnknownTransform(transform=v)
return v
@abstractmethod
def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[T]]:
...
@abstractmethod
def can_transform(self, source: IcebergType) -> bool:
return False
@abstractmethod
def result_type(self, source: IcebergType) -> IcebergType:
...
@property
def preserves_order(self) -> bool:
return False
def satisfies_order_of(self, other) -> bool:
return self == other
def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
return str(value) if value is not None else "null"
@property
def dedup_name(self) -> str:
return self.__str__()
def __str__(self) -> str:
return self.__root__
def __eq__(self, other: Any) -> bool:
if isinstance(other, Transform):
return self.__root__ == other.__root__
return False
class BucketTransform(Transform[S, int]):
"""Base Transform class to transform a value into a bucket partition value
Transforms are parameterized by a number of buckets. Bucket partition transforms use a 32-bit
hash of the source value to produce a positive value by mod the bucket number.
Args:
num_buckets (int): The number of buckets.
"""
_source_type: IcebergType = PrivateAttr()
_num_buckets: PositiveInt = PrivateAttr()
def __init__(self, num_buckets: int, **data: Any):
super().__init__(__root__=f"bucket[{num_buckets}]", **data)
self._num_buckets = num_buckets
@property
def num_buckets(self) -> int:
return self._num_buckets
def hash(self, value: S) -> int:
raise NotImplementedError()
def apply(self, value: Optional[S]) -> Optional[int]:
return (self.hash(value) & IntegerType.max) % self._num_buckets if value else None
def result_type(self, source: IcebergType) -> IcebergType:
return IntegerType()
def can_transform(self, source: IcebergType) -> bool:
return type(source) in {
IntegerType,
DateType,
LongType,
TimeType,
TimestampType,
TimestamptzType,
DecimalType,
StringType,
FixedType,
BinaryType,
UUIDType,
}
def transform(self, source: IcebergType, bucket: bool = True) -> Callable[[Optional[Any]], Optional[int]]:
source_type = type(source)
if source_type in {IntegerType, LongType, DateType, TimeType, TimestampType, TimestamptzType}:
def hash_func(v):
return mmh3.hash(struct.pack("<q", v))
elif source_type == DecimalType:
def hash_func(v):
return mmh3.hash(decimal_to_bytes(v))
elif source_type in {StringType, FixedType, BinaryType}:
def hash_func(v):
return mmh3.hash(v)
elif source_type == UUIDType:
def hash_func(v):
return mmh3.hash(
struct.pack(
">QQ",
(v.int >> 64) & 0xFFFFFFFFFFFFFFFF,
v.int & 0xFFFFFFFFFFFFFFFF,
)
)
else:
raise ValueError(f"Unknown type {source}")
if bucket:
return lambda v: (hash_func(v) & IntegerType.max) % self._num_buckets if v else None
return hash_func
def __repr__(self) -> str:
return f"BucketTransform(num_buckets={self._num_buckets})"
def _base64encode(buffer: bytes) -> str:
"""Converts bytes to base64 string"""
return base64.b64encode(buffer).decode("ISO-8859-1")
class IdentityTransform(Transform[S, S]):
"""Transforms a value into itself.
Example:
>>> transform = IdentityTransform()
>>> transform.transform(StringType())('hello-world')
'hello-world'
"""
__root__: Literal["identity"] = Field(default="identity")
_source_type: IcebergType = PrivateAttr()
def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[S]]:
return lambda v: v
def can_transform(self, source: IcebergType) -> bool:
return source.is_primitive
def result_type(self, source: IcebergType) -> IcebergType:
return source
@property
def preserves_order(self) -> bool:
return True
def satisfies_order_of(self, other: Transform) -> bool:
"""ordering by value is the same as long as the other preserves order"""
return other.preserves_order
def to_human_string(self, source_type: IcebergType, value: Optional[S]) -> str:
return _human_string(value, source_type) if value is not None else "null"
def __str__(self) -> str:
return "identity"
def __repr__(self) -> str:
return "IdentityTransform()"
class TruncateTransform(Transform[S, S]):
"""A transform for truncating a value to a specified width.
Args:
width (int): The truncate width, should be positive
Raises:
ValueError: If a type is provided that is incompatible with a Truncate transform
"""
__root__: str = Field()
_source_type: IcebergType = PrivateAttr()
_width: PositiveInt = PrivateAttr()
def __init__(self, width: int, **data: Any):
super().__init__(__root__=f"truncate[{width}]", **data)
self._width = width
def can_transform(self, source: IcebergType) -> bool:
return type(source) in {IntegerType, LongType, StringType, BinaryType, DecimalType}
def result_type(self, source: IcebergType) -> IcebergType:
return source
@property
def preserves_order(self) -> bool:
return True
@property
def source_type(self) -> IcebergType:
return self._source_type
@property
def width(self) -> int:
return self._width
def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[S]]:
source_type = type(source)
if source_type in {IntegerType, LongType}:
def truncate_func(v):
return v - v % self._width
elif source_type in {StringType, BinaryType}:
def truncate_func(v):
return v[0 : min(self._width, len(v))]
elif source_type == DecimalType:
def truncate_func(v):
return truncate_decimal(v, self._width)
else:
raise ValueError(f"Cannot truncate for type: {source}")
return lambda v: truncate_func(v) if v else None
def satisfies_order_of(self, other: Transform) -> bool:
if self == other:
return True
elif (
isinstance(self.source_type, StringType)
and isinstance(other, TruncateTransform)
and isinstance(other.source_type, StringType)
):
return self.width >= other.width
return False
def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
if value is None:
return "null"
elif isinstance(value, bytes):
return _base64encode(value)
else:
return str(value)
def __repr__(self) -> str:
return f"TruncateTransform(width={self._width})"
@singledispatch
def _human_string(value: Any, _type: IcebergType) -> str:
return str(value)
@_human_string.register(bytes)
def _(value: bytes, _type: IcebergType) -> str:
return _base64encode(value)
@_human_string.register(int)
def _(value: int, _type: IcebergType) -> str:
return _int_to_human_string(_type, value)
@singledispatch
def _int_to_human_string(_type: IcebergType, value: int) -> str:
return str(value)
@_int_to_human_string.register(DateType)
def _(_type: IcebergType, value: int) -> str:
return datetime.to_human_day(value)
@_int_to_human_string.register(TimeType)
def _(_type: IcebergType, value: int) -> str:
return datetime.to_human_time(value)
@_int_to_human_string.register(TimestampType)
def _(_type: IcebergType, value: int) -> str:
return datetime.to_human_timestamp(value)
@_int_to_human_string.register(TimestamptzType)
def _(_type: IcebergType, value: int) -> str:
return datetime.to_human_timestamptz(value)
class UnknownTransform(Transform):
"""A transform that represents when an unknown transform is provided
Args:
source_type (IcebergType): An Iceberg `Type`
transform (str): A string name of a transform
Raises:
AttributeError: If the apply method is called.
"""
__root__: Literal["unknown"] = Field(default="unknown")
_source_type: IcebergType = PrivateAttr()
_transform: str = PrivateAttr()
def __init__(self, transform: str, **data: Any):
super().__init__(**data)
self._transform = transform
def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[T]]:
raise AttributeError(f"Cannot apply unsupported transform: {self}")
def can_transform(self, source: IcebergType) -> bool:
return False
def result_type(self, source: IcebergType) -> IcebergType:
return StringType()
def __repr__(self) -> str:
return f"UnknownTransform(transform={repr(self._transform)})"
class VoidTransform(Transform, Singleton):
"""A transform that always returns None"""
__root__ = "void"
def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[T]]:
return lambda v: None
def can_transform(self, _: IcebergType) -> bool:
return True
def result_type(self, source: IcebergType) -> IcebergType:
return source
def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
return "null"
def __repr__(self) -> str:
return "VoidTransform()"