blob: 101fcd165e4b502d207647a7a697d1e84184b846 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
# cython: embedsignature = True
from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (Array, Schema,
check_status,
MemoryPool, maybe_unbox_memory_pool,
Table,
pyarrow_wrap_schema,
pyarrow_wrap_table,
NativeFile, get_reader, get_writer)
from pyarrow.compat import tobytes, frombytes
from pyarrow.lib import ArrowException, NativeFile, _stringify_path
import six
try:
from textwrap import indent
except ImportError:
def indent(text, prefix):
lines = [prefix + line for line in text.splitlines(True)]
return ''.join(lines)
cdef class RowGroupStatistics:
cdef:
shared_ptr[CRowGroupStatistics] statistics
def __cinit__(self):
pass
cdef init(self, const shared_ptr[CRowGroupStatistics]& statistics):
self.statistics = statistics
def __repr__(self):
return """{0}
has_min_max: {1}
min: {2}
max: {3}
null_count: {4}
distinct_count: {5}
num_values: {6}
physical_type: {7}""".format(object.__repr__(self),
self.has_min_max,
self.min,
self.max,
self.null_count,
self.distinct_count,
self.num_values,
self.physical_type)
cdef inline _cast_statistic(self, object value):
# Input value is bytes
cdef ParquetType physical_type = self.statistics.get().physical_type()
if physical_type == ParquetType_BOOLEAN:
return bool(int(value))
elif physical_type == ParquetType_INT32:
return int(value)
elif physical_type == ParquetType_INT64:
return int(value)
elif physical_type == ParquetType_INT96:
# Leave as PyBytes
return value
elif physical_type == ParquetType_FLOAT:
return float(value)
elif physical_type == ParquetType_DOUBLE:
return float(value)
elif physical_type == ParquetType_BYTE_ARRAY:
# Leave as PyBytes
return value
elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
# Leave as PyBytes
return value
else:
raise ValueError('Unknown physical ParquetType')
property has_min_max:
def __get__(self):
return self.statistics.get().HasMinMax()
property min:
def __get__(self):
raw_physical_type = self.statistics.get().physical_type()
encode_min = self.statistics.get().EncodeMin()
min_value = FormatStatValue(raw_physical_type, encode_min.c_str())
return self._cast_statistic(min_value)
property max:
def __get__(self):
raw_physical_type = self.statistics.get().physical_type()
encode_max = self.statistics.get().EncodeMax()
max_value = FormatStatValue(raw_physical_type, encode_max.c_str())
return self._cast_statistic(max_value)
property null_count:
def __get__(self):
return self.statistics.get().null_count()
property distinct_count:
def __get__(self):
return self.statistics.get().distinct_count()
property num_values:
def __get__(self):
return self.statistics.get().num_values()
property physical_type:
def __get__(self):
physical_type = self.statistics.get().physical_type()
return physical_type_name_from_enum(physical_type)
cdef class ColumnChunkMetaData:
cdef:
unique_ptr[CColumnChunkMetaData] up_metadata
CColumnChunkMetaData* metadata
def __cinit__(self):
pass
cdef init(self, const CRowGroupMetaData& row_group_metadata, int i):
self.up_metadata = row_group_metadata.ColumnChunk(i)
self.metadata = self.up_metadata.get()
def __repr__(self):
statistics = indent(repr(self.statistics), 4 * ' ')
return """{0}
file_offset: {1}
file_path: {2}
type: {3}
num_values: {4}
path_in_schema: {5}
is_stats_set: {6}
statistics:
{7}
compression: {8}
encodings: {9}
has_dictionary_page: {10}
dictionary_page_offset: {11}
data_page_offset: {12}
index_page_offset: {13}
total_compressed_size: {14}
total_uncompressed_size: {15}""".format(object.__repr__(self),
self.file_offset,
self.file_path,
self.type,
self.num_values,
self.path_in_schema,
self.is_stats_set,
statistics,
self.compression,
self.encodings,
self.has_dictionary_page,
self.dictionary_page_offset,
self.data_page_offset,
self.index_page_offset,
self.total_compressed_size,
self.total_uncompressed_size)
property file_offset:
def __get__(self):
return self.metadata.file_offset()
property file_path:
def __get__(self):
return frombytes(self.metadata.file_path())
property type:
def __get__(self):
return physical_type_name_from_enum(self.metadata.type())
property num_values:
def __get__(self):
return self.metadata.num_values()
property path_in_schema:
def __get__(self):
path = self.metadata.path_in_schema().get().ToDotString()
return frombytes(path)
property is_stats_set:
def __get__(self):
return self.metadata.is_stats_set()
property statistics:
def __get__(self):
statistics = RowGroupStatistics()
statistics.init(self.metadata.statistics())
return statistics
property compression:
def __get__(self):
return self.metadata.compression()
property encodings:
def __get__(self):
return map(encoding_name_from_enum,
self.metadata.encodings())
property has_dictionary_page:
def __get__(self):
return self.metadata.has_dictionary_page()
property dictionary_page_offset:
def __get__(self):
return self.metadata.dictionary_page_offset()
property data_page_offset:
def __get__(self):
return self.metadata.data_page_offset()
property index_page_offset:
def __get__(self):
return self.metadata.index_page_offset()
property total_compressed_size:
def __get__(self):
return self.metadata.total_compressed_size()
property total_uncompressed_size:
def __get__(self):
return self.metadata.total_uncompressed_size()
cdef class RowGroupMetaData:
cdef:
unique_ptr[CRowGroupMetaData] up_metadata
CRowGroupMetaData* metadata
FileMetaData parent
def __cinit__(self):
pass
cdef void init_from_file(self, FileMetaData parent, int i):
if i < 0 or i >= parent.num_row_groups:
raise IndexError('{0} out of bounds'.format(i))
self.up_metadata = parent._metadata.RowGroup(i)
self.metadata = self.up_metadata.get()
self.parent = parent
def column(self, int i):
chunk = ColumnChunkMetaData()
chunk.init(deref(self.metadata), i)
return chunk
def __repr__(self):
return """{0}
num_columns: {1}
num_rows: {2}
total_byte_size: {3}""".format(object.__repr__(self),
self.num_columns,
self.num_rows,
self.total_byte_size)
property num_columns:
def __get__(self):
return self.metadata.num_columns()
property num_rows:
def __get__(self):
return self.metadata.num_rows()
property total_byte_size:
def __get__(self):
return self.metadata.total_byte_size()
cdef class FileMetaData:
cdef:
shared_ptr[CFileMetaData] sp_metadata
CFileMetaData* _metadata
ParquetSchema _schema
def __cinit__(self):
pass
cdef init(self, const shared_ptr[CFileMetaData]& metadata):
self.sp_metadata = metadata
self._metadata = metadata.get()
def __repr__(self):
return """{0}
created_by: {1}
num_columns: {2}
num_rows: {3}
num_row_groups: {4}
format_version: {5}
serialized_size: {6}""".format(object.__repr__(self),
self.created_by, self.num_columns,
self.num_rows, self.num_row_groups,
self.format_version,
self.serialized_size)
@property
def schema(self):
if self._schema is not None:
return self._schema
cdef ParquetSchema schema = ParquetSchema()
schema.init_from_filemeta(self)
self._schema = schema
return schema
property serialized_size:
def __get__(self):
return self._metadata.size()
property num_columns:
def __get__(self):
return self._metadata.num_columns()
property num_rows:
def __get__(self):
return self._metadata.num_rows()
property num_row_groups:
def __get__(self):
return self._metadata.num_row_groups()
property format_version:
def __get__(self):
cdef ParquetVersion version = self._metadata.version()
if version == ParquetVersion_V1:
return '1.0'
if version == ParquetVersion_V2:
return '2.0'
else:
print('Unrecognized file version, assuming 1.0: {0}'
.format(version))
return '1.0'
property created_by:
def __get__(self):
return frombytes(self._metadata.created_by())
def row_group(self, int i):
"""
"""
cdef RowGroupMetaData result = RowGroupMetaData()
result.init_from_file(self, i)
return result
property metadata:
def __get__(self):
cdef:
unordered_map[c_string, c_string] metadata
const CKeyValueMetadata* underlying_metadata
underlying_metadata = self._metadata.key_value_metadata().get()
if underlying_metadata != NULL:
underlying_metadata.ToUnorderedMap(&metadata)
return metadata
else:
return None
cdef class ParquetSchema:
cdef:
FileMetaData parent # the FileMetaData owning the SchemaDescriptor
const SchemaDescriptor* schema
def __cinit__(self):
self.schema = NULL
def __repr__(self):
cdef const ColumnDescriptor* descr
elements = []
for i in range(self.schema.num_columns()):
col = self.column(i)
logical_type = col.logical_type
formatted = '{0}: {1}'.format(col.path, col.physical_type)
if logical_type != 'NONE':
formatted += ' {0}'.format(logical_type)
elements.append(formatted)
return """{0}
{1}
""".format(object.__repr__(self), '\n'.join(elements))
cdef init_from_filemeta(self, FileMetaData container):
self.parent = container
self.schema = container._metadata.schema()
def __len__(self):
return self.schema.num_columns()
def __getitem__(self, i):
return self.column(i)
property names:
def __get__(self):
return [self[i].name for i in range(len(self))]
def to_arrow_schema(self):
"""
Convert Parquet schema to effective Arrow schema
Returns
-------
schema : pyarrow.Schema
"""
cdef:
shared_ptr[CSchema] sp_arrow_schema
with nogil:
check_status(FromParquetSchema(
self.schema, self.parent._metadata.key_value_metadata(),
&sp_arrow_schema))
return pyarrow_wrap_schema(sp_arrow_schema)
def equals(self, ParquetSchema other):
"""
Returns True if the Parquet schemas are equal
"""
return self.schema.Equals(deref(other.schema))
def column(self, i):
if i < 0 or i >= len(self):
raise IndexError('{0} out of bounds'.format(i))
cdef ColumnSchema col = ColumnSchema()
col.init_from_schema(self, i)
return col
cdef class ColumnSchema:
cdef:
ParquetSchema parent
const ColumnDescriptor* descr
def __cinit__(self):
self.descr = NULL
cdef init_from_schema(self, ParquetSchema schema, int i):
self.parent = schema
self.descr = schema.schema.Column(i)
def equals(self, ColumnSchema other):
"""
Returns True if the column schemas are equal
"""
return self.descr.Equals(deref(other.descr))
def __repr__(self):
physical_type = self.physical_type
logical_type = self.logical_type
if logical_type == 'DECIMAL':
logical_type = 'DECIMAL({0}, {1})'.format(self.precision,
self.scale)
elif physical_type == 'FIXED_LEN_BYTE_ARRAY':
logical_type = ('FIXED_LEN_BYTE_ARRAY(length={0})'
.format(self.length))
return """<ParquetColumnSchema>
name: {0}
path: {1}
max_definition_level: {2}
max_repetition_level: {3}
physical_type: {4}
logical_type: {5}""".format(self.name, self.path, self.max_definition_level,
self.max_repetition_level, physical_type,
logical_type)
property name:
def __get__(self):
return frombytes(self.descr.name())
property path:
def __get__(self):
return frombytes(self.descr.path().get().ToDotString())
property max_definition_level:
def __get__(self):
return self.descr.max_definition_level()
property max_repetition_level:
def __get__(self):
return self.descr.max_repetition_level()
property physical_type:
def __get__(self):
return physical_type_name_from_enum(self.descr.physical_type())
property logical_type:
def __get__(self):
return logical_type_name_from_enum(self.descr.logical_type())
# FIXED_LEN_BYTE_ARRAY attribute
property length:
def __get__(self):
return self.descr.type_length()
# Decimal attributes
property precision:
def __get__(self):
return self.descr.type_precision()
property scale:
def __get__(self):
return self.descr.type_scale()
cdef physical_type_name_from_enum(ParquetType type_):
return {
ParquetType_BOOLEAN: 'BOOLEAN',
ParquetType_INT32: 'INT32',
ParquetType_INT64: 'INT64',
ParquetType_INT96: 'INT96',
ParquetType_FLOAT: 'FLOAT',
ParquetType_DOUBLE: 'DOUBLE',
ParquetType_BYTE_ARRAY: 'BYTE_ARRAY',
ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY',
}.get(type_, 'UNKNOWN')
cdef logical_type_name_from_enum(ParquetLogicalType type_):
return {
ParquetLogicalType_NONE: 'NONE',
ParquetLogicalType_UTF8: 'UTF8',
ParquetLogicalType_MAP: 'MAP',
ParquetLogicalType_MAP_KEY_VALUE: 'MAP_KEY_VALUE',
ParquetLogicalType_LIST: 'LIST',
ParquetLogicalType_ENUM: 'ENUM',
ParquetLogicalType_DECIMAL: 'DECIMAL',
ParquetLogicalType_DATE: 'DATE',
ParquetLogicalType_TIME_MILLIS: 'TIME_MILLIS',
ParquetLogicalType_TIME_MICROS: 'TIME_MICROS',
ParquetLogicalType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS',
ParquetLogicalType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS',
ParquetLogicalType_UINT_8: 'UINT_8',
ParquetLogicalType_UINT_16: 'UINT_16',
ParquetLogicalType_UINT_32: 'UINT_32',
ParquetLogicalType_UINT_64: 'UINT_64',
ParquetLogicalType_INT_8: 'INT_8',
ParquetLogicalType_INT_16: 'INT_16',
ParquetLogicalType_INT_32: 'INT_32',
ParquetLogicalType_INT_64: 'UINT_64',
ParquetLogicalType_JSON: 'JSON',
ParquetLogicalType_BSON: 'BSON',
ParquetLogicalType_INTERVAL: 'INTERVAL',
}.get(type_, 'UNKNOWN')
cdef encoding_name_from_enum (ParquetEncoding encoding_):
return {
ParquetEncoding_PLAIN: "PLAIN",
ParquetEncoding_PLAIN_DICTIONARY: "PLAIN_DICTIONARY",
ParquetEncoding_RLE: "RLE",
ParquetEncoding_BIT_PACKED: "BIT_PACKED",
ParquetEncoding_DELTA_BINARY_PACKED: "DELTA_BINARY_PACKED",
ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: "DELTA_LENGTH_BYTE_ARRAY",
ParquetEncoding_DELTA_BYTE_ARRAY: "DELTA_BYTE_ARRAY",
ParquetEncoding_RLE_DICTIONARY: "RLE_DICTIONARY",
}.get(encoding_, 'UNKNOWN')
cdef class ParquetReader:
cdef:
object source
CMemoryPool* allocator
unique_ptr[FileReader] reader
FileMetaData _metadata
cdef public:
_column_idx_map
def __cinit__(self, MemoryPool memory_pool=None):
self.allocator = maybe_unbox_memory_pool(memory_pool)
self._metadata = None
def open(self, object source, FileMetaData metadata=None):
cdef:
shared_ptr[RandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
ReaderProperties properties = default_reader_properties()
c_string path
if metadata is not None:
c_metadata = metadata.sp_metadata
self.source = source
get_reader(source, &rd_handle)
with nogil:
check_status(OpenFile(rd_handle, self.allocator, properties,
c_metadata, &self.reader))
property column_paths:
def __get__(self):
cdef:
FileMetaData container = self.metadata
const CFileMetaData* metadata = container._metadata
vector[c_string] path
int i = 0
paths = []
for i in range(0, metadata.num_columns()):
path = (metadata.schema().Column(i)
.path().get().ToDotVector())
paths.append([frombytes(x) for x in path])
return paths
@property
def metadata(self):
cdef:
shared_ptr[CFileMetaData] metadata
FileMetaData result
if self._metadata is not None:
return self._metadata
with nogil:
metadata = self.reader.get().parquet_reader().metadata()
self._metadata = result = FileMetaData()
result.init(metadata)
return result
property num_row_groups:
def __get__(self):
return self.reader.get().num_row_groups()
def set_num_threads(self, int nthreads):
self.reader.get().set_num_threads(nthreads)
def read_row_group(self, int i, column_indices=None, nthreads=None):
cdef:
shared_ptr[CTable] ctable
vector[int] c_column_indices
if nthreads:
self.set_num_threads(nthreads)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
check_status(self.reader.get()
.ReadRowGroup(i, c_column_indices, &ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadRowGroup(i, &ctable))
return pyarrow_wrap_table(ctable)
def read_all(self, column_indices=None, nthreads=None):
cdef:
shared_ptr[CTable] ctable
vector[int] c_column_indices
if nthreads:
self.set_num_threads(nthreads)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
check_status(self.reader.get()
.ReadTable(c_column_indices, &ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadTable(&ctable))
return pyarrow_wrap_table(ctable)
def scan_contents(self, column_indices=None, batch_size=65536):
cdef:
vector[int] c_column_indices
int32_t c_batch_size
int64_t c_num_rows
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
c_batch_size = batch_size
with nogil:
check_status(self.reader.get()
.ScanContents(c_column_indices, c_batch_size,
&c_num_rows))
return c_num_rows
def column_name_idx(self, column_name):
"""
Find the matching index of a column in the schema.
Parameter
---------
column_name: str
Name of the column, separation of nesting levels is done via ".".
Returns
-------
column_idx: int
Integer index of the position of the column
"""
cdef:
FileMetaData container = self.metadata
const CFileMetaData* metadata = container._metadata
int i = 0
if self._column_idx_map is None:
self._column_idx_map = {}
for i in range(0, metadata.num_columns()):
col_bytes = tobytes(metadata.schema().Column(i)
.path().get().ToDotString())
self._column_idx_map[col_bytes] = i
return self._column_idx_map[tobytes(column_name)]
def read_column(self, int column_index):
cdef:
Array array = Array()
shared_ptr[CArray] carray
with nogil:
check_status(self.reader.get()
.ReadColumn(column_index, &carray))
array.init(carray)
return array
def read_schema_field(self, int field_index):
cdef:
Array array = Array()
shared_ptr[CArray] carray
with nogil:
check_status(self.reader.get()
.ReadSchemaField(field_index, &carray))
array.init(carray)
return array
cdef int check_compression_name(name) except -1:
if name.upper() not in ['NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI']:
raise ArrowException("Unsupported compression: " + name)
return 0
cdef ParquetCompression compression_from_name(str name):
name = name.upper()
if name == "SNAPPY":
return ParquetCompression_SNAPPY
elif name == "GZIP":
return ParquetCompression_GZIP
elif name == "LZO":
return ParquetCompression_LZO
elif name == "BROTLI":
return ParquetCompression_BROTLI
else:
return ParquetCompression_UNCOMPRESSED
cdef class ParquetWriter:
cdef:
unique_ptr[FileWriter] writer
shared_ptr[OutputStream] sink
bint own_sink
cdef readonly:
object use_dictionary
object use_deprecated_int96_timestamps
object coerce_timestamps
object compression
object version
int row_group_size
def __cinit__(self, where, Schema schema, use_dictionary=None,
compression=None, version=None,
MemoryPool memory_pool=None,
use_deprecated_int96_timestamps=False,
coerce_timestamps=None):
cdef:
shared_ptr[WriterProperties] properties
c_string c_where
CMemoryPool* pool
try:
where = _stringify_path(where)
except TypeError:
get_writer(where, &self.sink)
self.own_sink = False
else:
c_where = tobytes(where)
with nogil:
check_status(FileOutputStream.Open(c_where,
&self.sink))
self.own_sink = True
self.use_dictionary = use_dictionary
self.compression = compression
self.version = version
self.use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
self.coerce_timestamps = coerce_timestamps
cdef WriterProperties.Builder properties_builder
self._set_version(&properties_builder)
self._set_compression_props(&properties_builder)
self._set_dictionary_props(&properties_builder)
properties = properties_builder.build()
cdef ArrowWriterProperties.Builder arrow_properties_builder
self._set_int96_support(&arrow_properties_builder)
self._set_coerce_timestamps(&arrow_properties_builder)
arrow_properties = arrow_properties_builder.build()
pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
check_status(
FileWriter.Open(deref(schema.schema), pool,
self.sink, properties, arrow_properties,
&self.writer))
cdef void _set_int96_support(self, ArrowWriterProperties.Builder* props):
if self.use_deprecated_int96_timestamps:
props.enable_deprecated_int96_timestamps()
else:
props.disable_deprecated_int96_timestamps()
cdef int _set_coerce_timestamps(
self, ArrowWriterProperties.Builder* props) except -1:
if self.coerce_timestamps == 'ms':
props.coerce_timestamps(TimeUnit_MILLI)
elif self.coerce_timestamps == 'us':
props.coerce_timestamps(TimeUnit_MICRO)
elif self.coerce_timestamps is not None:
raise ValueError('Invalid value for coerce_timestamps: {0}'
.format(self.coerce_timestamps))
cdef void _set_version(self, WriterProperties.Builder* props):
if self.version is not None:
if self.version == "1.0":
props.version(ParquetVersion_V1)
elif self.version == "2.0":
props.version(ParquetVersion_V2)
else:
raise ArrowException("Unsupported Parquet format version")
cdef void _set_compression_props(self, WriterProperties.Builder* props):
if isinstance(self.compression, basestring):
check_compression_name(self.compression)
props.compression(compression_from_name(self.compression))
elif self.compression is not None:
for column, codec in self.compression.iteritems():
check_compression_name(codec)
props.compression(column, compression_from_name(codec))
cdef void _set_dictionary_props(self, WriterProperties.Builder* props):
if isinstance(self.use_dictionary, bool):
if self.use_dictionary:
props.enable_dictionary()
else:
props.disable_dictionary()
elif self.use_dictionary is not None:
# Deactivate dictionary encoding by default
props.disable_dictionary()
for column in self.use_dictionary:
props.enable_dictionary(column)
def close(self):
with nogil:
check_status(self.writer.get().Close())
if self.own_sink:
check_status(self.sink.get().Close())
def write_table(self, Table table, row_group_size=None):
cdef CTable* ctable = table.table
if row_group_size is None or row_group_size == -1:
if ctable.num_rows() > 0:
row_group_size = ctable.num_rows()
else:
row_group_size = 1
elif row_group_size == 0:
raise ValueError('Row group size cannot be 0')
cdef int64_t c_row_group_size = row_group_size
with nogil:
check_status(self.writer.get()
.WriteTable(deref(ctable), c_row_group_size))