blob: 65d4c759bfbda75fe5583312ca7bbfe3ae58e09d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import codecs
import gzip
from abc import ABC, abstractmethod
from typing import Callable
from pyiceberg.io import InputFile, InputStream, OutputFile
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
GZIP = "gzip"
class Compressor(ABC):
@staticmethod
def get_compressor(location: str) -> Compressor:
return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR
@abstractmethod
def stream_decompressor(self, inp: InputStream) -> InputStream:
"""Return a stream decompressor.
Args:
inp: The input stream that needs decompressing.
Returns:
The wrapped stream
"""
@abstractmethod
def bytes_compressor(self) -> Callable[[bytes], bytes]:
"""Return a function to compress bytes.
Returns:
A function that can be used to compress bytes.
"""
class NoopCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return inp
def bytes_compressor(self) -> Callable[[bytes], bytes]:
return lambda b: b
NOOP_COMPRESSOR = NoopCompressor()
class GzipCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return gzip.open(inp)
def bytes_compressor(self) -> Callable[[bytes], bytes]:
return gzip.compress
class FromByteStream:
"""A collection of methods that deserialize dictionaries into Iceberg objects."""
@staticmethod
def table_metadata(
byte_stream: InputStream, encoding: str = "utf-8", compression: Compressor = NOOP_COMPRESSOR
) -> TableMetadata:
"""Instantiate a TableMetadata object from a byte stream.
Args:
byte_stream: A file-like byte stream object.
encoding (default "utf-8"): The byte encoder to use for the reader.
compression: Optional compression method
"""
with compression.stream_decompressor(byte_stream) as byte_stream:
reader = codecs.getreader(encoding)
json_bytes = reader(byte_stream)
metadata = json_bytes.read()
return TableMetadataUtil.parse_raw(metadata)
class FromInputFile:
"""A collection of methods that deserialize InputFiles into Iceberg objects."""
@staticmethod
def table_metadata(input_file: InputFile, encoding: str = "utf-8") -> TableMetadata:
"""Create a TableMetadata instance from an input file.
Args:
input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract base class.
encoding (str): Encoding to use when loading bytestream.
Returns:
TableMetadata: A table metadata instance.
"""
with input_file.open() as input_stream:
return FromByteStream.table_metadata(
byte_stream=input_stream, encoding=encoding, compression=Compressor.get_compressor(location=input_file.location)
)
class ToOutputFile:
"""A collection of methods that serialize Iceberg objects into files given an OutputFile instance."""
@staticmethod
def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None:
"""Write a TableMetadata instance to an output file.
Args:
output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract base class.
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
json_bytes = metadata.model_dump_json().encode("utf-8")
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
output_stream.write(json_bytes)