blob: e373bd0a69dc50f172e42ca1ee7c5593607a1af2 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
.. currentmodule:: pyarrow
.. _plasma:
The Plasma In-Memory Object Store
=================================
.. note::
As present, Plasma is only supported for use on Linux and macOS.
The Plasma API
--------------
Starting the Plasma store
^^^^^^^^^^^^^^^^^^^^^^^^^
You can start the Plasma store by issuing a terminal command similar to the
following:
.. code-block:: bash
plasma_store -m 1000000000 -s /tmp/plasma
The ``-m`` flag specifies the size of the store in bytes, and the ``-s`` flag
specifies the socket that the store will listen at. Thus, the above command
allows the Plasma store to use up to 1GB of memory, and sets the socket to
``/tmp/plasma``.
Leaving the current terminal window open as long as Plasma store should keep
running. Messages, concerning such as disconnecting clients, may occasionally be
printed to the screen. To stop running the Plasma store, you can press
``Ctrl-C`` in the terminal.
Creating a Plasma client
^^^^^^^^^^^^^^^^^^^^^^^^
To start a Plasma client from Python, call ``plasma.connect`` using the same
socket name:
.. code-block:: python
import pyarrow.plasma as plasma
client = plasma.connect("/tmp/plasma")
If the following error occurs from running the above Python code, that
means that either the socket given is incorrect, or the ``./plasma_store`` is
not currently running. Check to see if the Plasma store is still running.
.. code-block:: shell
>>> client = plasma.connect("/tmp/plasma")
Connection to socket failed for pathname /tmp/plasma
Could not connect to socket /tmp/plasma
Object IDs
^^^^^^^^^^
Each object in the Plasma store should be associated with a unique ID. The
Object ID then serves as a key that any client can use to retrieve that object
from the Plasma store. You can form an ``ObjectID`` object from a byte string of
length 20.
.. code-block:: shell
# Create an ObjectID.
>>> id = plasma.ObjectID(20 * b"a")
# The character "a" is encoded as 61 in hex.
>>> id
ObjectID(6161616161616161616161616161616161616161)
The random generation of Object IDs is often good enough to ensure unique IDs.
You can easily create a helper function that randomly generates object IDs as
follows:
.. code-block:: python
import numpy as np
def random_object_id():
return plasma.ObjectID(np.random.bytes(20))
Putting and Getting Python Objects
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Plasma supports two APIs for creating and accessing objects: A high level
API that allows storing and retrieving Python objects and a low level
API that allows creating, writing and sealing buffers and operating on
the binary data directly. In this section we describe the high level API.
This is how you can put and get a Python object:
.. code-block:: python
# Create a python object.
object_id = client.put("hello, world")
# Get the object.
client.get(object_id)
This works with all Python objects supported by the Arrow Python object
serialization.
You can also get multiple objects at the same time (which can be more
efficient since it avoids IPC round trips):
.. code-block:: python
# Create multiple python objects.
object_id1 = client.put(1)
object_id2 = client.put(2)
object_id3 = client.put(3)
# Get the objects.
client.get([object_id1, object_id2, object_id3])
Furthermore, it is possible to provide a timeout for the get call. If the
object is not available within the timeout, the special object
`pyarrow.ObjectNotAvailable` will be returned.
Creating an Object Buffer
^^^^^^^^^^^^^^^^^^^^^^^^^
Objects are created in Plasma in two stages. First, they are **created**, which
allocates a buffer for the object. At this point, the client can write to the
buffer and construct the object within the allocated buffer.
To create an object for Plasma, you need to create an object ID, as well as
give the object's maximum size in bytes.
.. code-block:: python
# Create an object buffer.
object_id = plasma.ObjectID(20 * b"a")
object_size = 1000
buffer = memoryview(client.create(object_id, object_size))
# Write to the buffer.
for i in range(1000):
buffer[i] = i % 128
When the client is done, the client **seals** the buffer, making the object
immutable, and making it available to other Plasma clients.
.. code-block:: python
# Seal the object. This makes the object immutable and available to other clients.
client.seal(object_id)
Getting an Object Buffer
^^^^^^^^^^^^^^^^^^^^^^^^
After an object has been sealed, any client who knows the object ID can get
the object buffer.
.. code-block:: python
# Create a different client. Note that this second client could be
# created in the same or in a separate, concurrent Python session.
client2 = plasma.connect("/tmp/plasma")
# Get the object in the second client. This blocks until the object has been sealed.
object_id2 = plasma.ObjectID(20 * b"a")
[buffer2] = client2.get_buffers([object_id])
If the object has not been sealed yet, then the call to client.get_buffers will
block until the object has been sealed by the client constructing the object.
Using the ``timeout_ms`` argument to get, you can specify a timeout for this (in
milliseconds). After the timeout, the interpreter will yield control back.
.. code-block:: shell
>>> buffer
<memory at 0x7fdbdc96e708>
>>> buffer[1]
1
>>> buffer2
<plasma.plasma.PlasmaBuffer object at 0x7fdbf2770e88>
>>> view2 = memoryview(buffer2)
>>> view2[1]
1
>>> view2[129]
1
>>> bytes(buffer[1:4])
b'\x01\x02\x03'
>>> bytes(view2[1:4])
b'\x01\x02\x03'
Listing objects in the store
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The objects in the store can be listed in the following way (note that
this functionality is currently experimental and the concrete representation
of the object info might change in the future):
.. code-block:: python
import pyarrow.plasma as plasma
import time
client = plasma.connect("/tmp/plasma")
client.put("hello, world")
# Sleep a little so we get different creation times
time.sleep(2)
client.put("another object")
# Create an object that is not sealed yet
object_id = plasma.ObjectID.from_random()
client.create(object_id, 100)
print(client.list())
>>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0,
>>> 'create_time': 1535223642,
>>> 'data_size': 460,
>>> 'metadata_size': 0,
>>> 'ref_count': 0,
>>> 'state': 'sealed'},
>>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0,
>>> 'create_time': 1535223644,
>>> 'data_size': 460,
>>> 'metadata_size': 0,
>>> 'ref_count': 0,
>>> 'state': 'sealed'},
>>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1,
>>> 'create_time': 1535223644,
>>> 'data_size': 100,
>>> 'metadata_size': 0,
>>> 'ref_count': 1,
>>> 'state': 'created'}}
Using Arrow and Pandas with Plasma
----------------------------------
Storing Arrow Objects in Plasma
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To store an Arrow object in Plasma, we must first **create** the object and then
**seal** it. However, Arrow objects such as ``Tensors`` may be more complicated
to write than simple binary data.
To create the object in Plasma, you still need an ``ObjectID`` and a size to
pass in. To find out the size of your Arrow object, you can use pyarrow
API such as ``pyarrow.ipc.get_tensor_size``.
.. code-block:: python
import numpy as np
import pyarrow as pa
# Create a pyarrow.Tensor object from a numpy random 2-dimensional array
data = np.random.randn(10, 4)
tensor = pa.Tensor.from_numpy(data)
# Create the object in Plasma
object_id = plasma.ObjectID(np.random.bytes(20))
data_size = pa.ipc.get_tensor_size(tensor)
buf = client.create(object_id, data_size)
To write the Arrow ``Tensor`` object into the buffer, you can use Plasma to
convert the ``memoryview`` buffer into a ``pyarrow.FixedSizeBufferWriter``
object. A ``pyarrow.FixedSizeBufferWriter`` is a format suitable for Arrow's
``pyarrow.ipc.write_tensor``:
.. code-block:: python
# Write the tensor into the Plasma-allocated buffer
stream = pa.FixedSizeBufferWriter(buf)
pa.ipc.write_tensor(tensor, stream) # Writes tensor's 552 bytes to Plasma stream
To finish storing the Arrow object in Plasma, call ``seal``:
.. code-block:: python
# Seal the Plasma object
client.seal(object_id)
Getting Arrow Objects from Plasma
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To read the object, first retrieve it as a ``PlasmaBuffer`` using its object ID.
.. code-block:: python
# Get the arrow object by ObjectID.
[buf2] = client.get_buffers([object_id])
To convert the ``PlasmaBuffer`` back into an Arrow ``Tensor``, first create a
pyarrow ``BufferReader`` object from it. You can then pass the ``BufferReader``
into ``pyarrow.ipc.read_tensor`` to reconstruct the Arrow ``Tensor`` object:
.. code-block:: python
# Reconstruct the Arrow tensor object.
reader = pa.BufferReader(buf2)
tensor2 = pa.ipc.read_tensor(reader)
Finally, you can use ``pyarrow.ipc.read_tensor`` to convert the Arrow object
back into numpy data:
.. code-block:: python
# Convert back to numpy
array = tensor2.to_numpy()
Storing Pandas DataFrames in Plasma
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Storing a Pandas ``DataFrame`` still follows the **create** then **seal**
process of storing an object in the Plasma store, however one cannot directly
write the ``DataFrame`` to Plasma with Pandas alone. Plasma also needs to know
the size of the ``DataFrame`` to allocate a buffer for.
See :ref:`pandas_interop` for more information on using Arrow with Pandas.
You can create the pyarrow equivalent of a Pandas ``DataFrame`` by using
``pyarrow.from_pandas`` to convert it to a ``RecordBatch``.
.. code-block:: python
import pyarrow as pa
import pandas as pd
# Create a Pandas DataFrame
d = {'one' : pd.Series([1., 2., 3.], index=['a', 'b', 'c']),
'two' : pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])}
df = pd.DataFrame(d)
# Convert the Pandas DataFrame into a PyArrow RecordBatch
record_batch = pa.RecordBatch.from_pandas(df)
Creating the Plasma object requires an ``ObjectID`` and the size of the
data. Now that we have converted the Pandas ``DataFrame`` into a PyArrow
``RecordBatch``, use the ``MockOutputStream`` to determine the
size of the Plasma object.
.. code-block:: python
# Create the Plasma object from the PyArrow RecordBatch. Most of the work here
# is done to determine the size of buffer to request from the object store.
object_id = plasma.ObjectID(np.random.bytes(20))
mock_sink = pa.MockOutputStream()
stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
data_size = mock_sink.size()
buf = client.create(object_id, data_size)
The DataFrame can now be written to the buffer as follows.
.. code-block:: python
# Write the PyArrow RecordBatch to Plasma
stream = pa.FixedSizeBufferWriter(buf)
stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
Finally, seal the finished object for use by all clients:
.. code-block:: python
# Seal the Plasma object
client.seal(object_id)
Getting Pandas DataFrames from Plasma
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Since we store the Pandas DataFrame as a PyArrow ``RecordBatch`` object,
to get the object back from the Plasma store, we follow similar steps
to those specified in `Getting Arrow Objects from Plasma`_.
We first have to convert the ``PlasmaBuffer`` returned from
``client.get_buffers`` into an Arrow ``BufferReader`` object.
.. code-block:: python
# Fetch the Plasma object
[data] = client.get_buffers([object_id]) # Get PlasmaBuffer from ObjectID
buffer = pa.BufferReader(data)
From the ``BufferReader``, we can create a specific ``RecordBatchStreamReader``
in Arrow to reconstruct the stored PyArrow ``RecordBatch`` object.
.. code-block:: python
# Convert object back into an Arrow RecordBatch
reader = pa.RecordBatchStreamReader(buffer)
record_batch = reader.read_next_batch()
The last step is to convert the PyArrow ``RecordBatch`` object back into
the original Pandas ``DataFrame`` structure.
.. code-block:: python
# Convert back into Pandas
result = record_batch.to_pandas()
Using Plasma with Huge Pages
----------------------------
On Linux it is possible to use the Plasma store with huge pages for increased
throughput. You first need to create a file system and activate huge pages with
.. code-block:: shell
sudo mkdir -p /mnt/hugepages
gid=`id -g`
uid=`id -u`
sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages
sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group"
sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages"
Note that you only need root access to create the file system, not for
running the object store. You can then start the Plasma store with the ``-d``
flag for the mount point of the huge page file system and the ``-h`` flag
which indicates that huge pages are activated:
.. code-block:: shell
plasma_store -s /tmp/plasma -m 10000000000 -d /mnt/hugepages -h
You can test this with the following script:
.. code-block:: python
import numpy as np
import pyarrow as pa
import pyarrow.plasma as plasma
import time
client = plasma.connect("/tmp/plasma")
data = np.random.randn(100000000)
tensor = pa.Tensor.from_numpy(data)
object_id = plasma.ObjectID(np.random.bytes(20))
buf = client.create(object_id, pa.ipc.get_tensor_size(tensor))
stream = pa.FixedSizeBufferWriter(buf)
stream.set_memcopy_threads(4)
a = time.time()
pa.ipc.write_tensor(tensor, stream)
print("Writing took ", time.time() - a)