blob: 2f82a9f64df6dd4693d049deff29b64648d81479 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from collections import namedtuple
import warnings
cpdef enum MetadataVersion:
V1 = <char> CMetadataVersion_V1
V2 = <char> CMetadataVersion_V2
V3 = <char> CMetadataVersion_V3
V4 = <char> CMetadataVersion_V4
V5 = <char> CMetadataVersion_V5
cdef object _wrap_metadata_version(CMetadataVersion version):
return MetadataVersion(<char> version)
cdef CMetadataVersion _unwrap_metadata_version(
MetadataVersion version) except *:
if version == MetadataVersion.V1:
return CMetadataVersion_V1
elif version == MetadataVersion.V2:
return CMetadataVersion_V2
elif version == MetadataVersion.V3:
return CMetadataVersion_V3
elif version == MetadataVersion.V4:
return CMetadataVersion_V4
elif version == MetadataVersion.V5:
return CMetadataVersion_V5
raise ValueError("Not a metadata version: " + repr(version))
_WriteStats = namedtuple(
'WriteStats',
('num_messages', 'num_record_batches', 'num_dictionary_batches',
'num_dictionary_deltas', 'num_replaced_dictionaries'))
class WriteStats(_WriteStats):
"""IPC write statistics
"""
__slots__ = ()
@staticmethod
cdef _wrap_write_stats(CIpcWriteStats c):
return WriteStats(c.num_messages, c.num_record_batches,
c.num_dictionary_batches, c.num_dictionary_deltas,
c.num_replaced_dictionaries)
_ReadStats = namedtuple(
'ReadStats',
('num_messages', 'num_record_batches', 'num_dictionary_batches',
'num_dictionary_deltas', 'num_replaced_dictionaries'))
class ReadStats(_ReadStats):
"""IPC write statistics
"""
__slots__ = ()
@staticmethod
cdef _wrap_read_stats(CIpcReadStats c):
return ReadStats(c.num_messages, c.num_record_batches,
c.num_dictionary_batches, c.num_dictionary_deltas,
c.num_replaced_dictionaries)
cdef class IpcWriteOptions(_Weakrefable):
"""Serialization options for the IPC format.
Parameters
----------
metadata_version : MetadataVersion, default MetadataVersion.V5
The metadata version to write. V5 is the current and latest,
V4 is the pre-1.0 metadata version (with incompatible Union layout).
allow_64bit: bool, default False
If true, allow field lengths that don't fit in a signed 32-bit int.
use_legacy_format : bool, default False
Whether to use the pre-Arrow 0.15 IPC format.
compression: str or None
If not None, compression codec to use for record batch buffers.
May only be "lz4", "zstd" or None.
use_threads: bool
Whether to use the global CPU thread pool to parallelize any
computational tasks like compression.
emit_dictionary_deltas: bool
Whether to emit dictionary deltas. Default is false for maximum
stream compatibility.
"""
__slots__ = ()
# cdef block is in lib.pxd
def __init__(self, *, metadata_version=MetadataVersion.V5,
bint allow_64bit=False, use_legacy_format=False,
compression=None, bint use_threads=True,
bint emit_dictionary_deltas=False):
self.c_options = CIpcWriteOptions.Defaults()
self.allow_64bit = allow_64bit
self.use_legacy_format = use_legacy_format
self.metadata_version = metadata_version
if compression is not None:
self.compression = compression
self.use_threads = use_threads
self.emit_dictionary_deltas = emit_dictionary_deltas
@property
def allow_64bit(self):
return self.c_options.allow_64bit
@allow_64bit.setter
def allow_64bit(self, bint value):
self.c_options.allow_64bit = value
@property
def use_legacy_format(self):
return self.c_options.write_legacy_ipc_format
@use_legacy_format.setter
def use_legacy_format(self, bint value):
self.c_options.write_legacy_ipc_format = value
@property
def metadata_version(self):
return _wrap_metadata_version(self.c_options.metadata_version)
@metadata_version.setter
def metadata_version(self, value):
self.c_options.metadata_version = _unwrap_metadata_version(value)
@property
def compression(self):
if self.c_options.codec == nullptr:
return None
else:
return frombytes(self.c_options.codec.get().name())
@compression.setter
def compression(self, value):
if value is None:
self.c_options.codec.reset()
else:
self.c_options.codec = shared_ptr[CCodec](GetResultValue(
CCodec.Create(_ensure_compression(value))).release())
@property
def use_threads(self):
return self.c_options.use_threads
@use_threads.setter
def use_threads(self, bint value):
self.c_options.use_threads = value
@property
def emit_dictionary_deltas(self):
return self.c_options.emit_dictionary_deltas
@emit_dictionary_deltas.setter
def emit_dictionary_deltas(self, bint value):
self.c_options.emit_dictionary_deltas = value
cdef class Message(_Weakrefable):
"""
Container for an Arrow IPC message with metadata and optional body
"""
def __cinit__(self):
pass
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, use "
"`pyarrow.ipc.read_message` function instead."
.format(self.__class__.__name__))
@property
def type(self):
return frombytes(FormatMessageType(self.message.get().type()))
@property
def metadata(self):
return pyarrow_wrap_buffer(self.message.get().metadata())
@property
def metadata_version(self):
return _wrap_metadata_version(self.message.get().metadata_version())
@property
def body(self):
cdef shared_ptr[CBuffer] body = self.message.get().body()
if body.get() == NULL:
return None
else:
return pyarrow_wrap_buffer(body)
def equals(self, Message other):
"""
Returns True if the message contents (metadata and body) are identical
Parameters
----------
other : Message
Returns
-------
are_equal : bool
"""
cdef c_bool result
with nogil:
result = self.message.get().Equals(deref(other.message.get()))
return result
def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None):
"""
Write message to generic OutputStream
Parameters
----------
sink : NativeFile
alignment : int, default 8
Byte alignment for metadata and body
memory_pool : MemoryPool, default None
Uses default memory pool if not specified
"""
cdef:
int64_t output_length = 0
COutputStream* out
CIpcWriteOptions options
options.alignment = alignment
out = sink.get_output_stream().get()
with nogil:
check_status(self.message.get()
.SerializeTo(out, options, &output_length))
def serialize(self, alignment=8, memory_pool=None):
"""
Write message as encapsulated IPC message
Parameters
----------
alignment : int, default 8
Byte alignment for metadata and body
memory_pool : MemoryPool, default None
Uses default memory pool if not specified
Returns
-------
serialized : Buffer
"""
stream = BufferOutputStream(memory_pool)
self.serialize_to(stream, alignment=alignment, memory_pool=memory_pool)
return stream.getvalue()
def __repr__(self):
if self.message == nullptr:
return """pyarrow.Message(uninitialized)"""
metadata_len = self.metadata.size
body = self.body
body_len = 0 if body is None else body.size
return """pyarrow.Message
type: {0}
metadata length: {1}
body length: {2}""".format(self.type, metadata_len, body_len)
cdef class MessageReader(_Weakrefable):
"""
Interface for reading Message objects from some source (like an
InputStream)
"""
cdef:
unique_ptr[CMessageReader] reader
def __cinit__(self):
pass
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, use "
"`pyarrow.ipc.MessageReader.open_stream` function "
"instead.".format(self.__class__.__name__))
@staticmethod
def open_stream(source):
cdef:
MessageReader result = MessageReader.__new__(MessageReader)
shared_ptr[CInputStream] in_stream
unique_ptr[CMessageReader] reader
_get_input_stream(source, &in_stream)
with nogil:
reader = CMessageReader.Open(in_stream)
result.reader.reset(reader.release())
return result
def __iter__(self):
while True:
yield self.read_next_message()
def read_next_message(self):
"""
Read next Message from the stream.
Raises
------
StopIteration : at end of stream
"""
cdef Message result = Message.__new__(Message)
with nogil:
result.message = move(GetResultValue(self.reader.get()
.ReadNextMessage()))
if result.message.get() == NULL:
raise StopIteration
return result
# ----------------------------------------------------------------------
# File and stream readers and writers
cdef class _CRecordBatchWriter(_Weakrefable):
"""The base RecordBatchWriter wrapper.
Provides common implementations of convenience methods. Should not
be instantiated directly by user code.
"""
# cdef block is in lib.pxd
def write(self, table_or_batch):
"""
Write RecordBatch or Table to stream.
Parameters
----------
table_or_batch : {RecordBatch, Table}
"""
if isinstance(table_or_batch, RecordBatch):
self.write_batch(table_or_batch)
elif isinstance(table_or_batch, Table):
self.write_table(table_or_batch)
else:
raise ValueError(type(table_or_batch))
def write_batch(self, RecordBatch batch):
"""
Write RecordBatch to stream.
Parameters
----------
batch : RecordBatch
"""
with nogil:
check_status(self.writer.get()
.WriteRecordBatch(deref(batch.batch)))
def write_table(self, Table table, max_chunksize=None, **kwargs):
"""
Write Table to stream in (contiguous) RecordBatch objects.
Parameters
----------
table : Table
max_chunksize : int, default None
Maximum size for RecordBatch chunks. Individual chunks may be
smaller depending on the chunk layout of individual columns.
"""
cdef:
# max_chunksize must be > 0 to have any impact
int64_t c_max_chunksize = -1
if 'chunksize' in kwargs:
max_chunksize = kwargs['chunksize']
msg = ('The parameter chunksize is deprecated for the write_table '
'methods as of 0.15, please use parameter '
'max_chunksize instead')
warnings.warn(msg, FutureWarning)
if max_chunksize is not None:
c_max_chunksize = max_chunksize
with nogil:
check_status(self.writer.get().WriteTable(table.table[0],
c_max_chunksize))
def close(self):
"""
Close stream and write end-of-stream 0 marker.
"""
with nogil:
check_status(self.writer.get().Close())
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def stats(self):
"""
Current IPC write statistics.
"""
if not self.writer:
raise ValueError("Operation on closed writer")
return _wrap_write_stats(self.writer.get().stats())
cdef class _RecordBatchStreamWriter(_CRecordBatchWriter):
cdef:
CIpcWriteOptions options
bint closed
def __cinit__(self):
pass
def __dealloc__(self):
pass
@property
def _use_legacy_format(self):
# For testing (see test_ipc.py)
return self.options.write_legacy_ipc_format
@property
def _metadata_version(self):
# For testing (see test_ipc.py)
return _wrap_metadata_version(self.options.metadata_version)
def _open(self, sink, Schema schema not None,
IpcWriteOptions options=IpcWriteOptions()):
cdef:
shared_ptr[COutputStream] c_sink
self.options = options.c_options
get_writer(sink, &c_sink)
with nogil:
self.writer = GetResultValue(
MakeStreamWriter(c_sink, schema.sp_schema,
self.options))
cdef _get_input_stream(object source, shared_ptr[CInputStream]* out):
try:
source = as_buffer(source)
except TypeError:
# Non-buffer-like
pass
get_input_stream(source, True, out)
class _ReadPandasMixin:
def read_pandas(self, **options):
"""
Read contents of stream to a pandas.DataFrame.
Read all record batches as a pyarrow.Table then convert it to a
pandas.DataFrame using Table.to_pandas.
Parameters
----------
**options : arguments to forward to Table.to_pandas
Returns
-------
df : pandas.DataFrame
"""
table = self.read_all()
return table.to_pandas(**options)
cdef class RecordBatchReader(_Weakrefable):
"""Base class for reading stream of record batches.
Provides common implementations of convenience methods. Should not
be instantiated directly by user code.
"""
# cdef block is in lib.pxd
def __iter__(self):
while True:
try:
yield self.read_next_batch()
except StopIteration:
return
@property
def schema(self):
"""
Shared schema of the record batches in the stream.
"""
cdef shared_ptr[CSchema] c_schema
with nogil:
c_schema = self.reader.get().schema()
return pyarrow_wrap_schema(c_schema)
def get_next_batch(self):
import warnings
warnings.warn('Please use read_next_batch instead of '
'get_next_batch', FutureWarning)
return self.read_next_batch()
def read_next_batch(self):
"""
Read next RecordBatch from the stream.
Raises
------
StopIteration:
At end of stream.
"""
cdef shared_ptr[CRecordBatch] batch
with nogil:
check_status(self.reader.get().ReadNext(&batch))
if batch.get() == NULL:
raise StopIteration
return pyarrow_wrap_batch(batch)
def read_all(self):
"""
Read all record batches as a pyarrow.Table.
"""
cdef shared_ptr[CTable] table
with nogil:
check_status(self.reader.get().ReadAll(&table))
return pyarrow_wrap_table(table)
read_pandas = _ReadPandasMixin.read_pandas
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def _export_to_c(self, uintptr_t out_ptr):
"""
Export to a C ArrowArrayStream struct, given its pointer.
Parameters
----------
out_ptr: int
The raw pointer to a C ArrowArrayStream struct.
Be careful: if you don't pass the ArrowArrayStream struct to a
consumer, array memory will leak. This is a low-level function
intended for expert users.
"""
with nogil:
check_status(ExportRecordBatchReader(
self.reader, <ArrowArrayStream*> out_ptr))
@staticmethod
def _import_from_c(uintptr_t in_ptr):
"""
Import RecordBatchReader from a C ArrowArrayStream struct,
given its pointer.
Parameters
----------
in_ptr: int
The raw pointer to a C ArrowArrayStream struct.
This is a low-level function intended for expert users.
"""
cdef:
shared_ptr[CRecordBatchReader] c_reader
RecordBatchReader self
with nogil:
c_reader = GetResultValue(ImportRecordBatchReader(
<ArrowArrayStream*> in_ptr))
self = RecordBatchReader.__new__(RecordBatchReader)
self.reader = c_reader
return self
@staticmethod
def from_batches(schema, batches):
"""
Create RecordBatchReader from an iterable of batches.
Parameters
----------
schema : Schema
The shared schema of the record batches
batches : Iterable[RecordBatch]
The batches that this reader will return.
Returns
-------
reader : RecordBatchReader
"""
cdef:
shared_ptr[CSchema] c_schema
shared_ptr[CRecordBatchReader] c_reader
RecordBatchReader self
c_schema = pyarrow_unwrap_schema(schema)
c_reader = GetResultValue(CPyRecordBatchReader.Make(
c_schema, batches))
self = RecordBatchReader.__new__(RecordBatchReader)
self.reader = c_reader
return self
cdef class _RecordBatchStreamReader(RecordBatchReader):
cdef:
shared_ptr[CInputStream] in_stream
CIpcReadOptions options
CRecordBatchStreamReader* stream_reader
def __cinit__(self):
pass
def _open(self, source):
_get_input_stream(source, &self.in_stream)
with nogil:
self.reader = GetResultValue(CRecordBatchStreamReader.Open(
self.in_stream, self.options))
self.stream_reader = <CRecordBatchStreamReader*> self.reader.get()
@property
def stats(self):
"""
Current IPC read statistics.
"""
if not self.reader:
raise ValueError("Operation on closed reader")
return _wrap_read_stats(self.stream_reader.stats())
cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
def _open(self, sink, Schema schema not None,
IpcWriteOptions options=IpcWriteOptions()):
cdef:
shared_ptr[COutputStream] c_sink
self.options = options.c_options
get_writer(sink, &c_sink)
with nogil:
self.writer = GetResultValue(
MakeFileWriter(c_sink, schema.sp_schema, self.options))
cdef class _RecordBatchFileReader(_Weakrefable):
cdef:
shared_ptr[CRecordBatchFileReader] reader
shared_ptr[CRandomAccessFile] file
CIpcReadOptions options
cdef readonly:
Schema schema
def __cinit__(self):
pass
def _open(self, source, footer_offset=None):
try:
source = as_buffer(source)
except TypeError:
pass
get_reader(source, True, &self.file)
cdef int64_t offset = 0
if footer_offset is not None:
offset = footer_offset
with nogil:
if offset != 0:
self.reader = GetResultValue(
CRecordBatchFileReader.Open2(self.file.get(), offset,
self.options))
else:
self.reader = GetResultValue(
CRecordBatchFileReader.Open(self.file.get(),
self.options))
self.schema = pyarrow_wrap_schema(self.reader.get().schema())
@property
def num_record_batches(self):
return self.reader.get().num_record_batches()
def get_batch(self, int i):
cdef shared_ptr[CRecordBatch] batch
if i < 0 or i >= self.num_record_batches:
raise ValueError('Batch number {0} out of range'.format(i))
with nogil:
batch = GetResultValue(self.reader.get().ReadRecordBatch(i))
return pyarrow_wrap_batch(batch)
# TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
# time has passed
get_record_batch = get_batch
def read_all(self):
"""
Read all record batches as a pyarrow.Table
"""
cdef:
vector[shared_ptr[CRecordBatch]] batches
shared_ptr[CTable] table
int i, nbatches
nbatches = self.num_record_batches
batches.resize(nbatches)
with nogil:
for i in range(nbatches):
batches[i] = GetResultValue(self.reader.get()
.ReadRecordBatch(i))
table = GetResultValue(
CTable.FromRecordBatches(self.schema.sp_schema, move(batches)))
return pyarrow_wrap_table(table)
read_pandas = _ReadPandasMixin.read_pandas
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
@property
def stats(self):
"""
Current IPC read statistics.
"""
if not self.reader:
raise ValueError("Operation on closed reader")
return _wrap_read_stats(self.reader.get().stats())
def get_tensor_size(Tensor tensor):
"""
Return total size of serialized Tensor including metadata and padding.
"""
cdef int64_t size
with nogil:
check_status(GetTensorSize(deref(tensor.tp), &size))
return size
def get_record_batch_size(RecordBatch batch):
"""
Return total size of serialized RecordBatch including metadata and padding.
"""
cdef int64_t size
with nogil:
check_status(GetRecordBatchSize(deref(batch.batch), &size))
return size
def write_tensor(Tensor tensor, NativeFile dest):
"""
Write pyarrow.Tensor to pyarrow.NativeFile object its current position.
Parameters
----------
tensor : pyarrow.Tensor
dest : pyarrow.NativeFile
Returns
-------
bytes_written : int
Total number of bytes written to the file
"""
cdef:
int32_t metadata_length
int64_t body_length
handle = dest.get_output_stream()
with nogil:
check_status(
WriteTensor(deref(tensor.tp), handle.get(),
&metadata_length, &body_length))
return metadata_length + body_length
cdef NativeFile as_native_file(source):
if not isinstance(source, NativeFile):
if hasattr(source, 'read'):
source = PythonFile(source)
else:
source = BufferReader(source)
if not isinstance(source, NativeFile):
raise ValueError('Unable to read message from object with type: {0}'
.format(type(source)))
return source
def read_tensor(source):
"""Read pyarrow.Tensor from pyarrow.NativeFile object from current
position. If the file source supports zero copy (e.g. a memory map), then
this operation does not allocate any memory. This function not assume that
the stream is aligned
Parameters
----------
source : pyarrow.NativeFile
Returns
-------
tensor : Tensor
"""
cdef:
shared_ptr[CTensor] sp_tensor
CInputStream* c_stream
NativeFile nf = as_native_file(source)
c_stream = nf.get_input_stream().get()
with nogil:
sp_tensor = GetResultValue(ReadTensor(c_stream))
return pyarrow_wrap_tensor(sp_tensor)
def read_message(source):
"""
Read length-prefixed message from file or buffer-like object
Parameters
----------
source : pyarrow.NativeFile, file-like object, or buffer-like object
Returns
-------
message : Message
"""
cdef:
Message result = Message.__new__(Message)
CInputStream* c_stream
cdef NativeFile nf = as_native_file(source)
c_stream = nf.get_input_stream().get()
with nogil:
result.message = move(
GetResultValue(ReadMessage(c_stream, c_default_memory_pool())))
if result.message == nullptr:
raise EOFError("End of Arrow stream")
return result
def read_schema(obj, DictionaryMemo dictionary_memo=None):
"""
Read Schema from message or buffer
Parameters
----------
obj : buffer or Message
dictionary_memo : DictionaryMemo, optional
Needed to be able to reconstruct dictionary-encoded fields
with read_record_batch
Returns
-------
schema : Schema
"""
cdef:
shared_ptr[CSchema] result
shared_ptr[CRandomAccessFile] cpp_file
CDictionaryMemo temp_memo
CDictionaryMemo* arg_dict_memo
if isinstance(obj, Message):
raise NotImplementedError(type(obj))
get_reader(obj, True, &cpp_file)
if dictionary_memo is not None:
arg_dict_memo = dictionary_memo.memo
else:
arg_dict_memo = &temp_memo
with nogil:
result = GetResultValue(ReadSchema(cpp_file.get(), arg_dict_memo))
return pyarrow_wrap_schema(result)
def read_record_batch(obj, Schema schema,
DictionaryMemo dictionary_memo=None):
"""
Read RecordBatch from message, given a known schema. If reading data from a
complete IPC stream, use ipc.open_stream instead
Parameters
----------
obj : Message or Buffer-like
schema : Schema
dictionary_memo : DictionaryMemo, optional
If message contains dictionaries, must pass a populated
DictionaryMemo
Returns
-------
batch : RecordBatch
"""
cdef:
shared_ptr[CRecordBatch] result
Message message
CDictionaryMemo temp_memo
CDictionaryMemo* arg_dict_memo
if isinstance(obj, Message):
message = obj
else:
message = read_message(obj)
if dictionary_memo is not None:
arg_dict_memo = dictionary_memo.memo
else:
arg_dict_memo = &temp_memo
with nogil:
result = GetResultValue(
ReadRecordBatch(deref(message.message.get()),
schema.sp_schema,
arg_dict_memo,
CIpcReadOptions.Defaults()))
return pyarrow_wrap_batch(result)