blob: 67c1c5a4fc80a8d1b349b1525f0b88e1b329efa5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
# cython: embedsignature = True
import io
from textwrap import indent
import warnings
import numpy as np
from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Array, Schema,
check_status,
MemoryPool, maybe_unbox_memory_pool,
Table, NativeFile,
pyarrow_wrap_chunked_array,
pyarrow_wrap_schema,
pyarrow_wrap_table,
pyarrow_wrap_buffer,
pyarrow_wrap_batch,
NativeFile, get_reader, get_writer)
from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
_stringify_path, _datetime_from_int,
tobytes, frombytes)
cimport cpython as cp
cdef class Statistics(_Weakrefable):
def __cinit__(self):
pass
def __repr__(self):
return """{}
has_min_max: {}
min: {}
max: {}
null_count: {}
distinct_count: {}
num_values: {}
physical_type: {}
logical_type: {}
converted_type (legacy): {}""".format(object.__repr__(self),
self.has_min_max,
self.min,
self.max,
self.null_count,
self.distinct_count,
self.num_values,
self.physical_type,
str(self.logical_type),
self.converted_type)
def to_dict(self):
d = dict(
has_min_max=self.has_min_max,
min=self.min,
max=self.max,
null_count=self.null_count,
distinct_count=self.distinct_count,
num_values=self.num_values,
physical_type=self.physical_type
)
return d
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, Statistics other):
return self.statistics.get().Equals(deref(other.statistics.get()))
@property
def has_min_max(self):
return self.statistics.get().HasMinMax()
@property
def min_raw(self):
if self.has_min_max:
return _cast_statistic_raw_min(self.statistics.get())
else:
return None
@property
def max_raw(self):
if self.has_min_max:
return _cast_statistic_raw_max(self.statistics.get())
else:
return None
@property
def min(self):
if self.has_min_max:
return _cast_statistic_min(self.statistics.get())
else:
return None
@property
def max(self):
if self.has_min_max:
return _cast_statistic_max(self.statistics.get())
else:
return None
@property
def null_count(self):
return self.statistics.get().null_count()
@property
def distinct_count(self):
return self.statistics.get().distinct_count()
@property
def num_values(self):
return self.statistics.get().num_values()
@property
def physical_type(self):
raw_physical_type = self.statistics.get().physical_type()
return physical_type_name_from_enum(raw_physical_type)
@property
def logical_type(self):
return wrap_logical_type(self.statistics.get().descr().logical_type())
@property
def converted_type(self):
raw_converted_type = self.statistics.get().descr().converted_type()
return converted_type_name_from_enum(raw_converted_type)
cdef class ParquetLogicalType(_Weakrefable):
cdef:
shared_ptr[const CParquetLogicalType] type
def __cinit__(self):
pass
cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
self.type = type
def __str__(self):
return frombytes(self.type.get().ToString(), safe=True)
def to_json(self):
return frombytes(self.type.get().ToJSON())
@property
def type(self):
return logical_type_name_from_enum(self.type.get().type())
cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
cdef ParquetLogicalType out = ParquetLogicalType()
out.init(type)
return out
cdef _cast_statistic_raw_min(CStatistics* statistics):
cdef ParquetType physical_type = statistics.physical_type()
cdef uint32_t type_length = statistics.descr().type_length()
if physical_type == ParquetType_BOOLEAN:
return (<CBoolStatistics*> statistics).min()
elif physical_type == ParquetType_INT32:
return (<CInt32Statistics*> statistics).min()
elif physical_type == ParquetType_INT64:
return (<CInt64Statistics*> statistics).min()
elif physical_type == ParquetType_FLOAT:
return (<CFloatStatistics*> statistics).min()
elif physical_type == ParquetType_DOUBLE:
return (<CDoubleStatistics*> statistics).min()
elif physical_type == ParquetType_BYTE_ARRAY:
return _box_byte_array((<CByteArrayStatistics*> statistics).min())
elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)
cdef _cast_statistic_raw_max(CStatistics* statistics):
cdef ParquetType physical_type = statistics.physical_type()
cdef uint32_t type_length = statistics.descr().type_length()
if physical_type == ParquetType_BOOLEAN:
return (<CBoolStatistics*> statistics).max()
elif physical_type == ParquetType_INT32:
return (<CInt32Statistics*> statistics).max()
elif physical_type == ParquetType_INT64:
return (<CInt64Statistics*> statistics).max()
elif physical_type == ParquetType_FLOAT:
return (<CFloatStatistics*> statistics).max()
elif physical_type == ParquetType_DOUBLE:
return (<CDoubleStatistics*> statistics).max()
elif physical_type == ParquetType_BYTE_ARRAY:
return _box_byte_array((<CByteArrayStatistics*> statistics).max())
elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)
cdef _cast_statistic_min(CStatistics* statistics):
min_raw = _cast_statistic_raw_min(statistics)
return _box_logical_type_value(min_raw, statistics.descr())
cdef _cast_statistic_max(CStatistics* statistics):
max_raw = _cast_statistic_raw_max(statistics)
return _box_logical_type_value(max_raw, statistics.descr())
cdef _box_logical_type_value(object value, const ColumnDescriptor* descr):
cdef:
const CParquetLogicalType* ltype = descr.logical_type().get()
ParquetTimeUnit time_unit
const CParquetIntType* itype
const CParquetTimestampType* ts_type
if ltype.type() == ParquetLogicalType_STRING:
return value.decode('utf8')
elif ltype.type() == ParquetLogicalType_TIME:
time_unit = (<const CParquetTimeType*> ltype).time_unit()
if time_unit == ParquetTimeUnit_MILLIS:
return _datetime_from_int(value, unit=TimeUnit_MILLI).time()
else:
return _datetime_from_int(value, unit=TimeUnit_MICRO).time()
elif ltype.type() == ParquetLogicalType_TIMESTAMP:
ts_type = <const CParquetTimestampType*> ltype
time_unit = ts_type.time_unit()
if ts_type.is_adjusted_to_utc():
import pytz
tzinfo = pytz.utc
else:
tzinfo = None
if time_unit == ParquetTimeUnit_MILLIS:
return _datetime_from_int(value, unit=TimeUnit_MILLI,
tzinfo=tzinfo)
elif time_unit == ParquetTimeUnit_MICROS:
return _datetime_from_int(value, unit=TimeUnit_MICRO,
tzinfo=tzinfo)
elif time_unit == ParquetTimeUnit_NANOS:
return _datetime_from_int(value, unit=TimeUnit_NANO,
tzinfo=tzinfo)
else:
raise ValueError("Unsupported time unit")
elif ltype.type() == ParquetLogicalType_INT:
itype = <const CParquetIntType*> ltype
if not itype.is_signed() and itype.bit_width() == 32:
return int(np.int32(value).view(np.uint32))
elif not itype.is_signed() and itype.bit_width() == 64:
return int(np.int64(value).view(np.uint64))
else:
return value
else:
# No logical boxing defined
return value
cdef _box_byte_array(ParquetByteArray val):
return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)
cdef _box_flba(ParquetFLBA val, uint32_t len):
return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
cdef class ColumnChunkMetaData(_Weakrefable):
def __cinit__(self):
pass
def __repr__(self):
statistics = indent(repr(self.statistics), 4 * ' ')
return """{0}
file_offset: {1}
file_path: {2}
physical_type: {3}
num_values: {4}
path_in_schema: {5}
is_stats_set: {6}
statistics:
{7}
compression: {8}
encodings: {9}
has_dictionary_page: {10}
dictionary_page_offset: {11}
data_page_offset: {12}
total_compressed_size: {13}
total_uncompressed_size: {14}""".format(object.__repr__(self),
self.file_offset,
self.file_path,
self.physical_type,
self.num_values,
self.path_in_schema,
self.is_stats_set,
statistics,
self.compression,
self.encodings,
self.has_dictionary_page,
self.dictionary_page_offset,
self.data_page_offset,
self.total_compressed_size,
self.total_uncompressed_size)
def to_dict(self):
statistics = self.statistics.to_dict() if self.is_stats_set else None
d = dict(
file_offset=self.file_offset,
file_path=self.file_path,
physical_type=self.physical_type,
num_values=self.num_values,
path_in_schema=self.path_in_schema,
is_stats_set=self.is_stats_set,
statistics=statistics,
compression=self.compression,
encodings=self.encodings,
has_dictionary_page=self.has_dictionary_page,
dictionary_page_offset=self.dictionary_page_offset,
data_page_offset=self.data_page_offset,
total_compressed_size=self.total_compressed_size,
total_uncompressed_size=self.total_uncompressed_size
)
return d
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, ColumnChunkMetaData other):
return self.metadata.Equals(deref(other.metadata))
@property
def file_offset(self):
return self.metadata.file_offset()
@property
def file_path(self):
return frombytes(self.metadata.file_path())
@property
def physical_type(self):
return physical_type_name_from_enum(self.metadata.type())
@property
def num_values(self):
return self.metadata.num_values()
@property
def path_in_schema(self):
path = self.metadata.path_in_schema().get().ToDotString()
return frombytes(path)
@property
def is_stats_set(self):
return self.metadata.is_stats_set()
@property
def statistics(self):
if not self.metadata.is_stats_set():
return None
statistics = Statistics()
statistics.init(self.metadata.statistics(), self)
return statistics
@property
def compression(self):
return compression_name_from_enum(self.metadata.compression())
@property
def encodings(self):
return tuple(map(encoding_name_from_enum, self.metadata.encodings()))
@property
def has_dictionary_page(self):
return bool(self.metadata.has_dictionary_page())
@property
def dictionary_page_offset(self):
if self.has_dictionary_page:
return self.metadata.dictionary_page_offset()
else:
return None
@property
def data_page_offset(self):
return self.metadata.data_page_offset()
@property
def has_index_page(self):
raise NotImplementedError('not supported in parquet-cpp')
@property
def index_page_offset(self):
raise NotImplementedError("parquet-cpp doesn't return valid values")
@property
def total_compressed_size(self):
return self.metadata.total_compressed_size()
@property
def total_uncompressed_size(self):
return self.metadata.total_uncompressed_size()
cdef class RowGroupMetaData(_Weakrefable):
def __cinit__(self, FileMetaData parent, int index):
if index < 0 or index >= parent.num_row_groups:
raise IndexError('{0} out of bounds'.format(index))
self.up_metadata = parent._metadata.RowGroup(index)
self.metadata = self.up_metadata.get()
self.parent = parent
self.index = index
def __reduce__(self):
return RowGroupMetaData, (self.parent, self.index)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, RowGroupMetaData other):
return self.metadata.Equals(deref(other.metadata))
def column(self, int i):
if i < 0 or i >= self.num_columns:
raise IndexError('{0} out of bounds'.format(i))
chunk = ColumnChunkMetaData()
chunk.init(self, i)
return chunk
def __repr__(self):
return """{0}
num_columns: {1}
num_rows: {2}
total_byte_size: {3}""".format(object.__repr__(self),
self.num_columns,
self.num_rows,
self.total_byte_size)
def to_dict(self):
columns = []
d = dict(
num_columns=self.num_columns,
num_rows=self.num_rows,
total_byte_size=self.total_byte_size,
columns=columns,
)
for i in range(self.num_columns):
columns.append(self.column(i).to_dict())
return d
@property
def num_columns(self):
return self.metadata.num_columns()
@property
def num_rows(self):
return self.metadata.num_rows()
@property
def total_byte_size(self):
return self.metadata.total_byte_size()
def _reconstruct_filemetadata(Buffer serialized):
cdef:
FileMetaData metadata = FileMetaData.__new__(FileMetaData)
CBuffer *buffer = serialized.buffer.get()
uint32_t metadata_len = <uint32_t>buffer.size()
metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))
return metadata
cdef class FileMetaData(_Weakrefable):
def __cinit__(self):
pass
def __reduce__(self):
cdef:
NativeFile sink = BufferOutputStream()
COutputStream* c_sink = sink.get_output_stream().get()
with nogil:
self._metadata.WriteTo(c_sink)
cdef Buffer buffer = sink.getvalue()
return _reconstruct_filemetadata, (buffer,)
def __repr__(self):
return """{0}
created_by: {1}
num_columns: {2}
num_rows: {3}
num_row_groups: {4}
format_version: {5}
serialized_size: {6}""".format(object.__repr__(self),
self.created_by, self.num_columns,
self.num_rows, self.num_row_groups,
self.format_version,
self.serialized_size)
def to_dict(self):
row_groups = []
d = dict(
created_by=self.created_by,
num_columns=self.num_columns,
num_rows=self.num_rows,
num_row_groups=self.num_row_groups,
row_groups=row_groups,
format_version=self.format_version,
serialized_size=self.serialized_size
)
for i in range(self.num_row_groups):
row_groups.append(self.row_group(i).to_dict())
return d
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, FileMetaData other):
return self._metadata.Equals(deref(other._metadata))
@property
def schema(self):
if self._schema is None:
self._schema = ParquetSchema(self)
return self._schema
@property
def serialized_size(self):
return self._metadata.size()
@property
def num_columns(self):
return self._metadata.num_columns()
@property
def num_rows(self):
return self._metadata.num_rows()
@property
def num_row_groups(self):
return self._metadata.num_row_groups()
@property
def format_version(self):
cdef ParquetVersion version = self._metadata.version()
if version == ParquetVersion_V1:
return '1.0'
if version == ParquetVersion_V2:
return '2.0'
else:
warnings.warn('Unrecognized file version, assuming 1.0: {}'
.format(version))
return '1.0'
@property
def created_by(self):
return frombytes(self._metadata.created_by())
@property
def metadata(self):
cdef:
unordered_map[c_string, c_string] metadata
const CKeyValueMetadata* underlying_metadata
underlying_metadata = self._metadata.key_value_metadata().get()
if underlying_metadata != NULL:
underlying_metadata.ToUnorderedMap(&metadata)
return metadata
else:
return None
def row_group(self, int i):
return RowGroupMetaData(self, i)
def set_file_path(self, path):
"""
Modify the file_path field of each ColumnChunk in the
FileMetaData to be a particular value
"""
cdef:
c_string c_path = tobytes(path)
self._metadata.set_file_path(c_path)
def append_row_groups(self, FileMetaData other):
"""
Append row groups of other FileMetaData object
"""
cdef shared_ptr[CFileMetaData] c_metadata
c_metadata = other.sp_metadata
self._metadata.AppendRowGroups(deref(c_metadata))
def write_metadata_file(self, where):
"""
Write the metadata object to a metadata-only file
"""
cdef:
shared_ptr[COutputStream] sink
c_string c_where
try:
where = _stringify_path(where)
except TypeError:
get_writer(where, &sink)
else:
c_where = tobytes(where)
with nogil:
sink = GetResultValue(FileOutputStream.Open(c_where))
with nogil:
check_status(
WriteMetaDataFile(deref(self._metadata), sink.get()))
cdef class ParquetSchema(_Weakrefable):
def __cinit__(self, FileMetaData container):
self.parent = container
self.schema = container._metadata.schema()
def __repr__(self):
return "{0}\n{1}".format(
object.__repr__(self),
frombytes(self.schema.ToString(), safe=True))
def __reduce__(self):
return ParquetSchema, (self.parent,)
def __len__(self):
return self.schema.num_columns()
def __getitem__(self, i):
return self.column(i)
@property
def names(self):
return [self[i].name for i in range(len(self))]
def to_arrow_schema(self):
"""
Convert Parquet schema to effective Arrow schema
Returns
-------
schema : pyarrow.Schema
"""
cdef shared_ptr[CSchema] sp_arrow_schema
with nogil:
check_status(FromParquetSchema(
self.schema, default_arrow_reader_properties(),
self.parent._metadata.key_value_metadata(),
&sp_arrow_schema))
return pyarrow_wrap_schema(sp_arrow_schema)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, ParquetSchema other):
"""
Returns True if the Parquet schemas are equal
"""
return self.schema.Equals(deref(other.schema))
def column(self, i):
if i < 0 or i >= len(self):
raise IndexError('{0} out of bounds'.format(i))
return ColumnSchema(self, i)
cdef class ColumnSchema(_Weakrefable):
cdef:
int index
ParquetSchema parent
const ColumnDescriptor* descr
def __cinit__(self, ParquetSchema schema, int index):
self.parent = schema
self.index = index # for pickling support
self.descr = schema.schema.Column(index)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def __reduce__(self):
return ColumnSchema, (self.parent, self.index)
def equals(self, ColumnSchema other):
"""
Returns True if the column schemas are equal
"""
return self.descr.Equals(deref(other.descr))
def __repr__(self):
physical_type = self.physical_type
converted_type = self.converted_type
if converted_type == 'DECIMAL':
converted_type = 'DECIMAL({0}, {1})'.format(self.precision,
self.scale)
elif physical_type == 'FIXED_LEN_BYTE_ARRAY':
converted_type = ('FIXED_LEN_BYTE_ARRAY(length={0})'
.format(self.length))
return """<ParquetColumnSchema>
name: {0}
path: {1}
max_definition_level: {2}
max_repetition_level: {3}
physical_type: {4}
logical_type: {5}
converted_type (legacy): {6}""".format(self.name, self.path,
self.max_definition_level,
self.max_repetition_level,
physical_type,
str(self.logical_type),
converted_type)
@property
def name(self):
return frombytes(self.descr.name())
@property
def path(self):
return frombytes(self.descr.path().get().ToDotString())
@property
def max_definition_level(self):
return self.descr.max_definition_level()
@property
def max_repetition_level(self):
return self.descr.max_repetition_level()
@property
def physical_type(self):
return physical_type_name_from_enum(self.descr.physical_type())
@property
def logical_type(self):
return wrap_logical_type(self.descr.logical_type())
@property
def converted_type(self):
return converted_type_name_from_enum(self.descr.converted_type())
@property
def logical_type(self):
return wrap_logical_type(self.descr.logical_type())
# FIXED_LEN_BYTE_ARRAY attribute
@property
def length(self):
return self.descr.type_length()
# Decimal attributes
@property
def precision(self):
return self.descr.type_precision()
@property
def scale(self):
return self.descr.type_scale()
cdef physical_type_name_from_enum(ParquetType type_):
return {
ParquetType_BOOLEAN: 'BOOLEAN',
ParquetType_INT32: 'INT32',
ParquetType_INT64: 'INT64',
ParquetType_INT96: 'INT96',
ParquetType_FLOAT: 'FLOAT',
ParquetType_DOUBLE: 'DOUBLE',
ParquetType_BYTE_ARRAY: 'BYTE_ARRAY',
ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY',
}.get(type_, 'UNKNOWN')
cdef logical_type_name_from_enum(ParquetLogicalTypeId type_):
return {
ParquetLogicalType_UNDEFINED: 'UNDEFINED',
ParquetLogicalType_STRING: 'STRING',
ParquetLogicalType_MAP: 'MAP',
ParquetLogicalType_LIST: 'LIST',
ParquetLogicalType_ENUM: 'ENUM',
ParquetLogicalType_DECIMAL: 'DECIMAL',
ParquetLogicalType_DATE: 'DATE',
ParquetLogicalType_TIME: 'TIME',
ParquetLogicalType_TIMESTAMP: 'TIMESTAMP',
ParquetLogicalType_INT: 'INT',
ParquetLogicalType_JSON: 'JSON',
ParquetLogicalType_BSON: 'BSON',
ParquetLogicalType_UUID: 'UUID',
ParquetLogicalType_NONE: 'NONE',
}.get(type_, 'UNKNOWN')
cdef converted_type_name_from_enum(ParquetConvertedType type_):
return {
ParquetConvertedType_NONE: 'NONE',
ParquetConvertedType_UTF8: 'UTF8',
ParquetConvertedType_MAP: 'MAP',
ParquetConvertedType_MAP_KEY_VALUE: 'MAP_KEY_VALUE',
ParquetConvertedType_LIST: 'LIST',
ParquetConvertedType_ENUM: 'ENUM',
ParquetConvertedType_DECIMAL: 'DECIMAL',
ParquetConvertedType_DATE: 'DATE',
ParquetConvertedType_TIME_MILLIS: 'TIME_MILLIS',
ParquetConvertedType_TIME_MICROS: 'TIME_MICROS',
ParquetConvertedType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS',
ParquetConvertedType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS',
ParquetConvertedType_UINT_8: 'UINT_8',
ParquetConvertedType_UINT_16: 'UINT_16',
ParquetConvertedType_UINT_32: 'UINT_32',
ParquetConvertedType_UINT_64: 'UINT_64',
ParquetConvertedType_INT_8: 'INT_8',
ParquetConvertedType_INT_16: 'INT_16',
ParquetConvertedType_INT_32: 'INT_32',
ParquetConvertedType_INT_64: 'INT_64',
ParquetConvertedType_JSON: 'JSON',
ParquetConvertedType_BSON: 'BSON',
ParquetConvertedType_INTERVAL: 'INTERVAL',
}.get(type_, 'UNKNOWN')
cdef encoding_name_from_enum(ParquetEncoding encoding_):
return {
ParquetEncoding_PLAIN: 'PLAIN',
ParquetEncoding_PLAIN_DICTIONARY: 'PLAIN_DICTIONARY',
ParquetEncoding_RLE: 'RLE',
ParquetEncoding_BIT_PACKED: 'BIT_PACKED',
ParquetEncoding_DELTA_BINARY_PACKED: 'DELTA_BINARY_PACKED',
ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY',
ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY',
ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY',
ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT',
}.get(encoding_, 'UNKNOWN')
cdef compression_name_from_enum(ParquetCompression compression_):
return {
ParquetCompression_UNCOMPRESSED: 'UNCOMPRESSED',
ParquetCompression_SNAPPY: 'SNAPPY',
ParquetCompression_GZIP: 'GZIP',
ParquetCompression_LZO: 'LZO',
ParquetCompression_BROTLI: 'BROTLI',
ParquetCompression_LZ4: 'LZ4',
ParquetCompression_ZSTD: 'ZSTD',
}.get(compression_, 'UNKNOWN')
cdef int check_compression_name(name) except -1:
if name.upper() not in {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4',
'ZSTD'}:
raise ArrowException("Unsupported compression: " + name)
return 0
cdef ParquetCompression compression_from_name(name):
name = name.upper()
if name == 'SNAPPY':
return ParquetCompression_SNAPPY
elif name == 'GZIP':
return ParquetCompression_GZIP
elif name == 'LZO':
return ParquetCompression_LZO
elif name == 'BROTLI':
return ParquetCompression_BROTLI
elif name == 'LZ4':
return ParquetCompression_LZ4
elif name == 'ZSTD':
return ParquetCompression_ZSTD
else:
return ParquetCompression_UNCOMPRESSED
cdef class ParquetReader(_Weakrefable):
cdef:
object source
CMemoryPool* pool
unique_ptr[FileReader] reader
FileMetaData _metadata
cdef public:
_column_idx_map
def __cinit__(self, MemoryPool memory_pool=None):
self.pool = maybe_unbox_memory_pool(memory_pool)
self._metadata = None
def open(self, object source, bint use_memory_map=True,
read_dictionary=None, FileMetaData metadata=None,
int buffer_size=0):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
CReaderProperties properties = default_reader_properties()
ArrowReaderProperties arrow_props = (
default_arrow_reader_properties())
c_string path
FileReaderBuilder builder
if metadata is not None:
c_metadata = metadata.sp_metadata
if buffer_size > 0:
properties.enable_buffered_stream()
properties.set_buffer_size(buffer_size)
elif buffer_size == 0:
properties.disable_buffered_stream()
else:
raise ValueError('Buffer size must be larger than zero')
self.source = source
get_reader(source, use_memory_map, &rd_handle)
with nogil:
check_status(builder.Open(rd_handle, properties, c_metadata))
# Set up metadata
with nogil:
c_metadata = builder.raw_reader().metadata()
self._metadata = result = FileMetaData()
result.init(c_metadata)
if read_dictionary is not None:
self._set_read_dictionary(read_dictionary, &arrow_props)
with nogil:
check_status(builder.memory_pool(self.pool)
.properties(arrow_props)
.Build(&self.reader))
cdef _set_read_dictionary(self, read_dictionary,
ArrowReaderProperties* props):
for column in read_dictionary:
if not isinstance(column, int):
column = self.column_name_idx(column)
props.set_read_dictionary(column, True)
@property
def column_paths(self):
cdef:
FileMetaData container = self.metadata
const CFileMetaData* metadata = container._metadata
vector[c_string] path
int i = 0
paths = []
for i in range(0, metadata.num_columns()):
path = (metadata.schema().Column(i)
.path().get().ToDotVector())
paths.append([frombytes(x) for x in path])
return paths
@property
def metadata(self):
return self._metadata
@property
def schema_arrow(self):
cdef shared_ptr[CSchema] out
with nogil:
check_status(self.reader.get().GetSchema(&out))
return pyarrow_wrap_schema(out)
@property
def num_row_groups(self):
return self.reader.get().num_row_groups()
def set_use_threads(self, bint use_threads):
self.reader.get().set_use_threads(use_threads)
def set_batch_size(self, int64_t batch_size):
self.reader.get().set_batch_size(batch_size)
def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
bint use_threads=True):
cdef:
vector[int] c_row_groups
vector[int] c_column_indices
shared_ptr[CRecordBatch] record_batch
shared_ptr[TableBatchReader] batch_reader
unique_ptr[CRecordBatchReader] recordbatchreader
self.set_batch_size(batch_size)
if use_threads:
self.set_use_threads(use_threads)
for row_group in row_groups:
c_row_groups.push_back(row_group)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
check_status(
self.reader.get().GetRecordBatchReader(
c_row_groups, c_column_indices, &recordbatchreader
)
)
else:
with nogil:
check_status(
self.reader.get().GetRecordBatchReader(
c_row_groups, &recordbatchreader
)
)
while True:
with nogil:
check_status(
recordbatchreader.get().ReadNext(&record_batch)
)
if record_batch.get() == NULL:
break
yield pyarrow_wrap_batch(record_batch)
def read_row_group(self, int i, column_indices=None,
bint use_threads=True):
return self.read_row_groups([i], column_indices, use_threads)
def read_row_groups(self, row_groups not None, column_indices=None,
bint use_threads=True):
cdef:
shared_ptr[CTable] ctable
vector[int] c_row_groups
vector[int] c_column_indices
self.set_use_threads(use_threads)
for row_group in row_groups:
c_row_groups.push_back(row_group)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
check_status(self.reader.get()
.ReadRowGroups(c_row_groups, c_column_indices,
&ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadRowGroups(c_row_groups, &ctable))
return pyarrow_wrap_table(ctable)
def read_all(self, column_indices=None, bint use_threads=True):
cdef:
shared_ptr[CTable] ctable
vector[int] c_column_indices
self.set_use_threads(use_threads)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
check_status(self.reader.get()
.ReadTable(c_column_indices, &ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadTable(&ctable))
return pyarrow_wrap_table(ctable)
def scan_contents(self, column_indices=None, batch_size=65536):
cdef:
vector[int] c_column_indices
int32_t c_batch_size
int64_t c_num_rows
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
c_batch_size = batch_size
with nogil:
check_status(self.reader.get()
.ScanContents(c_column_indices, c_batch_size,
&c_num_rows))
return c_num_rows
def column_name_idx(self, column_name):
"""
Find the matching index of a column in the schema.
Parameter
---------
column_name: str
Name of the column, separation of nesting levels is done via ".".
Returns
-------
column_idx: int
Integer index of the position of the column
"""
cdef:
FileMetaData container = self.metadata
const CFileMetaData* metadata = container._metadata
int i = 0
if self._column_idx_map is None:
self._column_idx_map = {}
for i in range(0, metadata.num_columns()):
col_bytes = tobytes(metadata.schema().Column(i)
.path().get().ToDotString())
self._column_idx_map[col_bytes] = i
return self._column_idx_map[tobytes(column_name)]
def read_column(self, int column_index):
cdef shared_ptr[CChunkedArray] out
with nogil:
check_status(self.reader.get()
.ReadColumn(column_index, &out))
return pyarrow_wrap_chunked_array(out)
def read_schema_field(self, int field_index):
cdef shared_ptr[CChunkedArray] out
with nogil:
check_status(self.reader.get()
.ReadSchemaField(field_index, &out))
return pyarrow_wrap_chunked_array(out)
cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
compression=None,
version=None,
write_statistics=None,
data_page_size=None,
compression_level=None,
use_byte_stream_split=False,
data_page_version=None) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
WriterProperties.Builder props
# data_page_version
if data_page_version is not None:
if data_page_version == "1.0":
props.data_page_version(ParquetDataPageVersion_V1)
elif data_page_version == "2.0":
props.data_page_version(ParquetDataPageVersion_V2)
else:
raise ValueError("Unsupported Parquet data page version: {0}"
.format(data_page_version))
# version
if version is not None:
if version == "1.0":
props.version(ParquetVersion_V1)
elif version == "2.0":
props.version(ParquetVersion_V2)
else:
raise ValueError("Unsupported Parquet format version: {0}"
.format(version))
# compression
if isinstance(compression, basestring):
check_compression_name(compression)
props.compression(compression_from_name(compression))
elif compression is not None:
for column, codec in compression.iteritems():
check_compression_name(codec)
props.compression(tobytes(column), compression_from_name(codec))
if isinstance(compression_level, int):
props.compression_level(compression_level)
elif compression_level is not None:
for column, level in compression_level.iteritems():
props.compression_level(tobytes(column), level)
# use_dictionary
if isinstance(use_dictionary, bool):
if use_dictionary:
props.enable_dictionary()
else:
props.disable_dictionary()
elif use_dictionary is not None:
# Deactivate dictionary encoding by default
props.disable_dictionary()
for column in use_dictionary:
props.enable_dictionary(tobytes(column))
# write_statistics
if isinstance(write_statistics, bool):
if write_statistics:
props.enable_statistics()
else:
props.disable_statistics()
elif write_statistics is not None:
# Deactivate statistics by default and enable for specified columns
props.disable_statistics()
for column in write_statistics:
props.enable_statistics(tobytes(column))
# use_byte_stream_split
if isinstance(use_byte_stream_split, bool):
if use_byte_stream_split:
props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT)
elif use_byte_stream_split is not None:
for column in use_byte_stream_split:
props.encoding(tobytes(column),
ParquetEncoding_BYTE_STREAM_SPLIT)
if data_page_size is not None:
props.data_pagesize(data_page_size)
properties = props.build()
return properties
cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
allow_truncated_timestamps=False,
writer_engine_version=None,
use_compliant_nested_type=False) except *:
"""Arrow writer properties"""
cdef:
shared_ptr[ArrowWriterProperties] arrow_properties
ArrowWriterProperties.Builder arrow_props
# Store the original Arrow schema so things like dictionary types can
# be automatically reconstructed
arrow_props.store_schema()
# int96 support
if use_deprecated_int96_timestamps:
arrow_props.enable_deprecated_int96_timestamps()
else:
arrow_props.disable_deprecated_int96_timestamps()
# coerce_timestamps
if coerce_timestamps == 'ms':
arrow_props.coerce_timestamps(TimeUnit_MILLI)
elif coerce_timestamps == 'us':
arrow_props.coerce_timestamps(TimeUnit_MICRO)
elif coerce_timestamps is not None:
raise ValueError('Invalid value for coerce_timestamps: {0}'
.format(coerce_timestamps))
# allow_truncated_timestamps
if allow_truncated_timestamps:
arrow_props.allow_truncated_timestamps()
else:
arrow_props.disallow_truncated_timestamps()
# use_compliant_nested_type
if use_compliant_nested_type:
arrow_props.enable_compliant_nested_types()
else:
arrow_props.disable_compliant_nested_types()
# writer_engine_version
if writer_engine_version == "V1":
warnings.warn("V1 parquet writer engine is a no-op. Use V2.")
arrow_props.set_engine_version(ArrowWriterEngineVersion.V1)
elif writer_engine_version != "V2":
raise ValueError("Unsupported Writer Engine Version: {0}"
.format(writer_engine_version))
arrow_properties = arrow_props.build()
return arrow_properties
cdef class ParquetWriter(_Weakrefable):
cdef:
unique_ptr[FileWriter] writer
shared_ptr[COutputStream] sink
bint own_sink
cdef readonly:
object use_dictionary
object use_deprecated_int96_timestamps
object use_byte_stream_split
object coerce_timestamps
object allow_truncated_timestamps
object compression
object compression_level
object data_page_version
object use_compliant_nested_type
object version
object write_statistics
object writer_engine_version
int row_group_size
int64_t data_page_size
def __cinit__(self, where, Schema schema, use_dictionary=None,
compression=None, version=None,
write_statistics=None,
MemoryPool memory_pool=None,
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
data_page_size=None,
allow_truncated_timestamps=False,
compression_level=None,
use_byte_stream_split=False,
writer_engine_version=None,
data_page_version=None,
use_compliant_nested_type=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
c_string c_where
CMemoryPool* pool
try:
where = _stringify_path(where)
except TypeError:
get_writer(where, &self.sink)
self.own_sink = False
else:
c_where = tobytes(where)
with nogil:
self.sink = GetResultValue(FileOutputStream.Open(c_where))
self.own_sink = True
properties = _create_writer_properties(
use_dictionary=use_dictionary,
compression=compression,
version=version,
write_statistics=write_statistics,
data_page_size=data_page_size,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
data_page_version=data_page_version
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
coerce_timestamps=coerce_timestamps,
allow_truncated_timestamps=allow_truncated_timestamps,
writer_engine_version=writer_engine_version,
use_compliant_nested_type=use_compliant_nested_type
)
pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
check_status(
FileWriter.Open(deref(schema.schema), pool,
self.sink, properties, arrow_properties,
&self.writer))
def close(self):
with nogil:
check_status(self.writer.get().Close())
if self.own_sink:
check_status(self.sink.get().Close())
def write_table(self, Table table, row_group_size=None):
cdef:
CTable* ctable = table.table
int64_t c_row_group_size
if row_group_size is None or row_group_size == -1:
c_row_group_size = ctable.num_rows()
elif row_group_size == 0:
raise ValueError('Row group size cannot be 0')
else:
c_row_group_size = row_group_size
with nogil:
check_status(self.writer.get()
.WriteTable(deref(ctable), c_row_group_size))
@property
def metadata(self):
cdef:
shared_ptr[CFileMetaData] metadata
FileMetaData result
with nogil:
metadata = self.writer.get().metadata()
if metadata:
result = FileMetaData()
result.init(metadata)
return result
raise RuntimeError(
'file metadata is only available after writer close')