blob: 8b73ebcd12e0b6e4dc718345534f71b8711f30f3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from iceberg.api import (FileFormat,
Metrics)
from iceberg.api.types import Conversions
from .generic_data_file import GenericDataFile
from .partition_data import PartitionData
class DataFiles(object):
DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024
@staticmethod
def new_partition_data(spec):
return PartitionData(spec.partition_type())
@staticmethod
def copy_partition_data(spec, partition_data, reuse):
data = reuse
if data is None:
data = DataFiles.new_partition_data(spec)
for i, _ in enumerate(spec.fields):
data.set(i, partition_data.get(i))
return data
@staticmethod
def fill_from_path(spec, partition_path, reuse):
data = reuse
if data is None:
data = DataFiles.new_partition_data(spec)
partitions = partition_path.split("/")
if len(partitions) > len(spec.fields):
raise RuntimeError("Invalid partition data, too many fields (expecting %s): %s" % (len(spec.fields),
partition_path))
if len(partitions) < len(spec.fields):
raise RuntimeError("Invalid partition data, not enough fields(expecting %s): %s" % (len(spec.fields),
partition_path))
for i, part in enumerate(partitions):
field = spec.fields[i]
parts = part.split("=")
if len(parts) != 2 or parts[0] is None or parts[0] != field.name:
raise RuntimeError("Invalid partition: %s" % part)
data.set(i, Conversions.from_partition_string(data.get_type(i), parts[1]))
return data
@staticmethod
def data(spec, partition_path):
return DataFiles.fill_from_path(spec, partition_path, None)
@staticmethod
def copy(spec, partition):
return DataFiles.copy_partition_data(spec, partition, None)
@staticmethod
def from_input_file(input_file, row_count, partition_data=None, metrics=None):
from .filesystem import FileSystemInputFile
if isinstance(input_file, FileSystemInputFile):
return DataFiles.from_stat(input_file.get_stat(), row_count,
partition_data=partition_data, metrics=metrics)
@staticmethod
def from_stat(stat, row_count, partition_data=None, metrics=None):
location = stat.path
format = FileFormat.from_file_name(location)
return GenericDataFile(location, format, stat.length, stat.block_size,
row_count=row_count, partition=partition_data, metrics=metrics)
@staticmethod
def builder(spec=None):
return DataFileBuilder(spec)
class DataFileBuilder(object):
def __init__(self, spec=None):
self.spec = spec
self.partition_data = DataFiles.new_partition_data(spec) if spec is not None else None
self.is_partitioned = spec is not None
self.file_path = None
self.format = None
self.record_count = -1
self.file_size_in_bytes = -1
self.block_size_in_bytes = -1
self.column_sizes = None
self.value_counts = None
self.null_value_counts = None
self.lower_bounds = None
self.upper_bounds = None
def clear(self):
if self.is_partitioned:
self.partition_data.clear()
self.file_path = None
self.format = None
self.record_count = -1
self.file_size_in_bytes = -1
self.block_size_in_bytes = -1
self.column_sizes = None
self.value_counts = None
self.null_value_counts = None
self.lower_bounds = None
self.upper_bounds = None
return self
def copy(self, to_copy):
if self.is_partitioned:
self.partition_data = DataFiles.copy_partition_data(self.spec, to_copy.partition, self.partition_data)
self.file_path = str(to_copy.path())
self.format = to_copy.format
self.record_count = to_copy.record_count
self.file_size_in_bytes = to_copy.file_size_in_bytes
self.block_size_in_bytes = to_copy.block_size_in_bytes
self.column_sizes = to_copy.column_sizes
self.value_counts = to_copy.value_counts
self.null_value_counts = to_copy.null_value_counts
self.lower_bounds = to_copy.lower_bounds
self.upper_bounds = to_copy.upper_bounds
return self
def with_status(self, stat):
self.file_path = stat.path
self.file_size_in_bytes = stat.length
self.block_size_in_bytes = stat.blocksize
return self
def with_input_file(self, input_file):
from .filesystem import FileSystemInputFile
if isinstance(input_file, FileSystemInputFile):
self.with_status(input_file.get_stat())
self.file_path = self.location()
self.file_size_in_bytes = self.get_length()
return self
def with_path(self, path):
self.file_path = path
return self
def with_format(self, fmt):
if isinstance(fmt, FileFormat):
self.format = fmt
else:
self.format = FileFormat[str(fmt).upper()]
return self
def with_partition(self, partition):
self.partition_data = DataFiles.copy_partition_data(self.spec, partition, self.partition_data)
return self
def with_record_count(self, record_count):
self.record_count = record_count
return self
def with_file_size_in_bytes(self, file_size_in_bytes):
self.file_size_in_bytes = file_size_in_bytes
return self
def with_block_size_in_bytes(self, block_size_in_bytes):
self.block_size_in_bytes = block_size_in_bytes
return self
def with_partition_path(self, partition_path):
if not self.is_partitioned:
raise RuntimeError("Cannot add partition data for an unpartitioned table")
self.partition_data = DataFiles.fill_from_path(self.spec, partition_path, self.partition_data)
return self
def with_metrics(self, metrics):
self.record_count = metrics.row_count if metrics.row_count is not None else -1
self.column_sizes = metrics.column_sizes
self.value_counts = metrics.value_counts
self.null_value_counts = metrics.null_value_counts
self.lower_bounds = metrics.lower_bounds
self.upper_bounds = metrics.upper_bounds
return self
def build(self):
if self.file_path is None:
raise RuntimeError("File path is required")
if self.format is None:
self.format = FileFormat.from_file_name(self.file_path)
if self.format is None:
raise RuntimeError("File format is required")
if self.file_size_in_bytes < 0:
raise RuntimeError("File size is required")
if self.record_count < 0:
raise RuntimeError("Record count is required")
if self.block_size_in_bytes is None:
self.block_size_in_bytes = DataFiles.DEFAULT_BLOCK_SIZE
return GenericDataFile(self.file_path, self.format, self.file_size_in_bytes, self.block_size_in_bytes,
partition=self.partition_data.copy() if self.is_partitioned else None,
metrics=Metrics(row_count=self.record_count, column_sizes=self.column_sizes,
value_counts=self.value_counts,
null_value_counts=self.null_value_counts,
lower_bounds=self.lower_bounds, upper_bounds=self.upper_bounds))