blob: 6d90179b39ea69dc1b8d469c0473100d391a23e9 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
.. currentmodule:: pyarrow
.. _ipc:
Streaming, Serialization, and IPC
=================================
Writing and Reading Streams
---------------------------
Arrow defines two types of binary formats for serializing record batches:
* **Streaming format**: for sending an arbitrary length sequence of record
batches. The format must be processed from start to end, and does not support
random access
* **File or Random Access format**: for serializing a fixed number of record
batches. Supports random access, and thus is very useful when used with
memory maps
To follow this section, make sure to first read the section on :ref:`Memory and
IO <io>`.
Using streams
~~~~~~~~~~~~~
First, let's create a small record batch:
.. ipython:: python
import pyarrow as pa
data = [
pa.array([1, 2, 3, 4]),
pa.array(['foo', 'bar', 'baz', None]),
pa.array([True, None, False, True])
]
batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
batch.num_rows
batch.num_columns
Now, we can begin writing a stream containing some number of these batches. For
this we use :class:`~pyarrow.RecordBatchStreamWriter`, which can write to a
writeable ``NativeFile`` object or a writeable Python object. For convenience,
this one can be created with :func:`~pyarrow.ipc.new_stream`:
.. ipython:: python
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, batch.schema)
Here we used an in-memory Arrow buffer stream, but this could have been a
socket or some other IO sink.
When creating the ``StreamWriter``, we pass the schema, since the schema
(column names and types) must be the same for all of the batches sent in this
particular stream. Now we can do:
.. ipython:: python
for i in range(5):
writer.write_batch(batch)
writer.close()
buf = sink.getvalue()
buf.size
Now ``buf`` contains the complete stream as an in-memory byte buffer. We can
read such a stream with :class:`~pyarrow.RecordBatchStreamReader` or the
convenience function ``pyarrow.ipc.open_stream``:
.. ipython:: python
reader = pa.ipc.open_stream(buf)
reader.schema
batches = [b for b in reader]
len(batches)
We can check the returned batches are the same as the original input:
.. ipython:: python
batches[0].equals(batch)
An important point is that if the input source supports zero-copy reads
(e.g. like a memory map, or ``pyarrow.BufferReader``), then the returned
batches are also zero-copy and do not allocate any new memory on read.
Writing and Reading Random Access Files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The :class:`~pyarrow.RecordBatchFileWriter` has the same API as
:class:`~pyarrow.RecordBatchStreamWriter`. You can create one with
:func:`~pyarrow.ipc.new_file`:
.. ipython:: python
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, batch.schema)
for i in range(10):
writer.write_batch(batch)
writer.close()
buf = sink.getvalue()
buf.size
The difference between :class:`~pyarrow.RecordBatchFileReader` and
:class:`~pyarrow.RecordBatchStreamReader` is that the input source must have a
``seek`` method for random access. The stream reader only requires read
operations. We can also use the :func:`~pyarrow.ipc.open_file` method to open a file:
.. ipython:: python
reader = pa.ipc.open_file(buf)
Because we have access to the entire payload, we know the number of record
batches in the file, and can read any at random:
.. ipython:: python
reader.num_record_batches
b = reader.get_batch(3)
b.equals(batch)
Reading from Stream and File Format for pandas
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The stream and file reader classes have a special ``read_pandas`` method to
simplify reading multiple record batches and converting them to a single
DataFrame output:
.. ipython:: python
df = pa.ipc.open_file(buf).read_pandas()
df[:5]
Arbitrary Object Serialization
------------------------------
.. warning::
The custom serialization functionality is deprecated in pyarrow 2.0, and
will be removed in a future version.
While the serialization functions in this section utilize the Arrow stream
protocol internally, they do not produce data that is compatible with the
above ``ipc.open_file`` and ``ipc.open_stream`` functions.
For arbitrary objects, you can use the standard library ``pickle``
functionality instead. For pyarrow objects, you can use the IPC
serialization format through the ``pyarrow.ipc`` module, as explained
above.
PyArrow serialization was originally meant to provide a higher-performance
alternative to ``pickle`` thanks to zero-copy semantics. However,
``pickle`` protocol 5 gained support for zero-copy using out-of-band
buffers, and can be used instead for similar benefits.
In ``pyarrow`` we are able to serialize and deserialize many kinds of Python
objects. As an example, consider a dictionary containing NumPy arrays:
.. ipython:: python
import numpy as np
data = {
i: np.random.randn(500, 500)
for i in range(100)
}
We use the ``pyarrow.serialize`` function to convert this data to a byte
buffer:
.. ipython:: python
:okwarning:
buf = pa.serialize(data).to_buffer()
type(buf)
buf.size
``pyarrow.serialize`` creates an intermediate object which can be converted to
a buffer (the ``to_buffer`` method) or written directly to an output stream.
``pyarrow.deserialize`` converts a buffer-like object back to the original
Python object:
.. ipython:: python
:okwarning:
restored_data = pa.deserialize(buf)
restored_data[0]
Serializing Custom Data Types
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If an unrecognized data type is encountered when serializing an object,
``pyarrow`` will fall back on using ``pickle`` for converting that type to a
byte string. There may be a more efficient way, though.
Consider a class with two members, one of which is a NumPy array:
.. code-block:: python
class MyData:
def __init__(self, name, data):
self.name = name
self.data = data
We write functions to convert this to and from a dictionary with simpler types:
.. code-block:: python
def _serialize_MyData(val):
return {'name': val.name, 'data': val.data}
def _deserialize_MyData(data):
return MyData(data['name'], data['data']
then, we must register these functions in a ``SerializationContext`` so that
``MyData`` can be recognized:
.. code-block:: python
context = pa.SerializationContext()
context.register_type(MyData, 'MyData',
custom_serializer=_serialize_MyData,
custom_deserializer=_deserialize_MyData)
Lastly, we use this context as an additional argument to ``pyarrow.serialize``:
.. code-block:: python
buf = pa.serialize(val, context=context).to_buffer()
restored_val = pa.deserialize(buf, context=context)
The ``SerializationContext`` also has convenience methods ``serialize`` and
``deserialize``, so these are equivalent statements:
.. code-block:: python
buf = context.serialize(val).to_buffer()
restored_val = context.deserialize(buf)
Component-based Serialization
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For serializing Python objects containing some number of NumPy arrays, Arrow
buffers, or other data types, it may be desirable to transport their serialized
representation without having to produce an intermediate copy using the
``to_buffer`` method. To motivate this, suppose we have a list of NumPy arrays:
.. ipython:: python
import numpy as np
data = [np.random.randn(10, 10) for i in range(5)]
The call ``pa.serialize(data)`` does not copy the memory inside each of these
NumPy arrays. This serialized representation can be then decomposed into a
dictionary containing a sequence of ``pyarrow.Buffer`` objects containing
metadata for each array and references to the memory inside the arrays. To do
this, use the ``to_components`` method:
.. ipython:: python
:okwarning:
serialized = pa.serialize(data)
components = serialized.to_components()
The particular details of the output of ``to_components`` are not too
important. The objects in the ``'data'`` field are ``pyarrow.Buffer`` objects,
which are zero-copy convertible to Python ``memoryview`` objects:
.. ipython:: python
memoryview(components['data'][0])
A memoryview can be converted back to a Arrow ``Buffer`` with
``pyarrow.py_buffer``:
.. ipython:: python
mv = memoryview(components['data'][0])
buf = pa.py_buffer(mv)
An object can be reconstructed from its component-based representation using
``deserialize_components``:
.. ipython:: python
:okwarning:
restored_data = pa.deserialize_components(components)
restored_data[0]
``deserialize_components`` is also available as a method on
``SerializationContext`` objects.