blob: 27cd14a68853d2e0894180b50487e25afa28fc5a [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
.. ipython:: python
:suppress:
# set custom tmp working directory for files that create data
import os
import tempfile
orig_working_dir = os.getcwd()
temp_working_dir = tempfile.mkdtemp(prefix="pyarrow-")
os.chdir(temp_working_dir)
.. currentmodule:: pyarrow
.. _ipc:
Streaming, Serialization, and IPC
=================================
Writing and Reading Streams
---------------------------
Arrow defines two types of binary formats for serializing record batches:
* **Streaming format**: for sending an arbitrary length sequence of record
batches. The format must be processed from start to end, and does not support
random access
* **File or Random Access format**: for serializing a fixed number of record
batches. Supports random access, and thus is very useful when used with
memory maps
To follow this section, make sure to first read the section on :ref:`Memory and
IO <io>`.
Using streams
~~~~~~~~~~~~~
First, let's create a small record batch:
.. ipython:: python
import pyarrow as pa
data = [
pa.array([1, 2, 3, 4]),
pa.array(['foo', 'bar', 'baz', None]),
pa.array([True, None, False, True])
]
batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
batch.num_rows
batch.num_columns
Now, we can begin writing a stream containing some number of these batches. For
this we use :class:`~pyarrow.RecordBatchStreamWriter`, which can write to a
writeable ``NativeFile`` object or a writeable Python object. For convenience,
this one can be created with :func:`~pyarrow.ipc.new_stream`:
.. ipython:: python
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, batch.schema) as writer:
for i in range(5):
writer.write_batch(batch)
Here we used an in-memory Arrow buffer stream (``sink``),
but this could have been a socket or some other IO sink.
When creating the ``StreamWriter``, we pass the schema, since the schema
(column names and types) must be the same for all of the batches sent in this
particular stream. Now we can do:
.. ipython:: python
buf = sink.getvalue()
buf.size
Now ``buf`` contains the complete stream as an in-memory byte buffer. We can
read such a stream with :class:`~pyarrow.RecordBatchStreamReader` or the
convenience function ``pyarrow.ipc.open_stream``:
.. ipython:: python
with pa.ipc.open_stream(buf) as reader:
schema = reader.schema
batches = [b for b in reader]
schema
len(batches)
We can check the returned batches are the same as the original input:
.. ipython:: python
batches[0].equals(batch)
An important point is that if the input source supports zero-copy reads
(e.g. like a memory map, or ``pyarrow.BufferReader``), then the returned
batches are also zero-copy and do not allocate any new memory on read.
Writing and Reading Random Access Files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The :class:`~pyarrow.RecordBatchFileWriter` has the same API as
:class:`~pyarrow.RecordBatchStreamWriter`. You can create one with
:func:`~pyarrow.ipc.new_file`:
.. ipython:: python
sink = pa.BufferOutputStream()
with pa.ipc.new_file(sink, batch.schema) as writer:
for i in range(10):
writer.write_batch(batch)
buf = sink.getvalue()
buf.size
The difference between :class:`~pyarrow.RecordBatchFileReader` and
:class:`~pyarrow.RecordBatchStreamReader` is that the input source must have a
``seek`` method for random access. The stream reader only requires read
operations. We can also use the :func:`~pyarrow.ipc.open_file` method to open a file:
.. ipython:: python
with pa.ipc.open_file(buf) as reader:
num_record_batches = reader.num_record_batches
b = reader.get_batch(3)
Because we have access to the entire payload, we know the number of record
batches in the file, and can read any at random.
.. ipython:: python
num_record_batches
b.equals(batch)
Reading from Stream and File Format for pandas
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The stream and file reader classes have a special ``read_pandas`` method to
simplify reading multiple record batches and converting them to a single
DataFrame output:
.. ipython:: python
with pa.ipc.open_file(buf) as reader:
df = reader.read_pandas()
df[:5]
Efficiently Writing and Reading Arrow Data
------------------------------------------
Being optimized for zero copy and memory mapped data, Arrow allows to easily
read and write arrays consuming the minimum amount of resident memory.
When writing and reading raw Arrow data, we can use the Arrow File Format
or the Arrow Streaming Format.
To dump an array to file, you can use the :meth:`~pyarrow.ipc.new_file`
which will provide a new :class:`~pyarrow.ipc.RecordBatchFileWriter` instance
that can be used to write batches of data to that file.
For example to write an array of 10M integers, we could write it in 1000 chunks
of 10000 entries:
.. ipython:: python
BATCH_SIZE = 10000
NUM_BATCHES = 1000
schema = pa.schema([pa.field('nums', pa.int32())])
with pa.OSFile('bigfile.arrow', 'wb') as sink:
with pa.ipc.new_file(sink, schema) as writer:
for row in range(NUM_BATCHES):
batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
writer.write(batch)
record batches support multiple columns, so in practice we always write the
equivalent of a :class:`~pyarrow.Table`.
Writing in batches is effective because we in theory need to keep in memory only
the current batch we are writing. But when reading back, we can be even more effective
by directly mapping the data from disk and avoid allocating any new memory on read.
Under normal conditions, reading back our file will consume a few hundred megabytes
of memory:
.. ipython:: python
with pa.OSFile('bigfile.arrow', 'rb') as source:
loaded_array = pa.ipc.open_file(source).read_all()
print("LEN:", len(loaded_array))
print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
To more efficiently read big data from disk, we can memory map the file, so that
Arrow can directly reference the data mapped from disk and avoid having to
allocate its own memory.
In such case the operating system will be able to page in the mapped memory
lazily and page it out without any write back cost when under pressure,
allowing to more easily read arrays bigger than the total memory.
.. ipython:: python
with pa.memory_map('bigfile.arrow', 'rb') as source:
loaded_array = pa.ipc.open_file(source).read_all()
print("LEN:", len(loaded_array))
print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
.. note::
Other high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a
``memory_map`` option. But in those cases, the memory mapping can't help with
reducing resident memory consumption. See :ref:`parquet_mmap` for details.