blob: e352377f14e76fdf175e7c413635fdc1ca63e7e2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
# cython: embedsignature = True
# cython: language_level = 3
from __future__ import absolute_import
from libcpp cimport bool as c_bool, nullptr
from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.unordered_map cimport unordered_map
from libc.stdint cimport int64_t, uint8_t, uintptr_t
from cython.operator cimport dereference as deref, preincrement as inc
from cpython.pycapsule cimport *
import random
import socket
import warnings
import pyarrow
from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
CFixedSizeBufferWriter, CStatus)
from pyarrow import compat
PLASMA_WAIT_TIMEOUT = 2 ** 30
cdef extern from "plasma/common.h" nogil:
cdef cppclass CCudaIpcPlaceholder" plasma::internal::CudaIpcPlaceholder":
pass
cdef cppclass CUniqueID" plasma::UniqueID":
@staticmethod
CUniqueID from_binary(const c_string& binary)
@staticmethod
CUniqueID from_random()
c_bool operator==(const CUniqueID& rhs) const
c_string hex() const
c_string binary() const
@staticmethod
int64_t size()
cdef enum CObjectState" plasma::ObjectState":
PLASMA_CREATED" plasma::ObjectState::PLASMA_CREATED"
PLASMA_SEALED" plasma::ObjectState::PLASMA_SEALED"
cdef struct CObjectTableEntry" plasma::ObjectTableEntry":
int fd
int device_num
int64_t map_size
ptrdiff_t offset
uint8_t* pointer
int64_t data_size
int64_t metadata_size
int ref_count
int64_t create_time
int64_t construct_duration
CObjectState state
shared_ptr[CCudaIpcPlaceholder] ipc_handle
ctypedef unordered_map[CUniqueID, unique_ptr[CObjectTableEntry]] \
CObjectTable" plasma::ObjectTable"
cdef extern from "plasma/common.h":
cdef int64_t kDigestSize" plasma::kDigestSize"
cdef extern from "plasma/client.h" nogil:
cdef cppclass CPlasmaClient" plasma::PlasmaClient":
CPlasmaClient()
CStatus Connect(const c_string& store_socket_name,
const c_string& manager_socket_name,
int release_delay, int num_retries)
CStatus Create(const CUniqueID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
const shared_ptr[CBuffer]* data)
CStatus CreateAndSeal(const CUniqueID& object_id, const c_string& data,
const c_string& metadata)
CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms,
c_vector[CObjectBuffer]* object_buffers)
CStatus Seal(const CUniqueID& object_id)
CStatus Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
CStatus Hash(const CUniqueID& object_id, uint8_t* digest)
CStatus Release(const CUniqueID& object_id)
CStatus Contains(const CUniqueID& object_id, c_bool* has_object)
CStatus List(CObjectTable* objects)
CStatus Subscribe(int* fd)
CStatus DecodeNotification(const uint8_t* buffer,
CUniqueID* object_id, int64_t* data_size,
int64_t* metadata_size)
CStatus GetNotification(int fd, CUniqueID* object_id,
int64_t* data_size, int64_t* metadata_size)
CStatus Disconnect()
CStatus Delete(const c_vector[CUniqueID] object_ids)
int64_t store_capacity()
cdef extern from "plasma/client.h" nogil:
cdef struct CObjectBuffer" plasma::ObjectBuffer":
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata
def make_object_id(object_id):
return ObjectID(object_id)
cdef class ObjectID:
"""
An ObjectID represents a string of bytes used to identify Plasma objects.
"""
cdef:
CUniqueID data
def __cinit__(self, object_id):
if (not isinstance(object_id, bytes) or
len(object_id) != CUniqueID.size()):
raise ValueError("Object ID must by 20 bytes,"
" is " + str(object_id))
self.data = CUniqueID.from_binary(object_id)
def __eq__(self, other):
try:
return self.data == (<ObjectID?>other).data
except TypeError:
return False
def __hash__(self):
return hash(self.data.binary())
def __repr__(self):
return "ObjectID(" + self.data.hex().decode() + ")"
def __reduce__(self):
return (make_object_id, (self.data.binary(),))
def binary(self):
"""
Return the binary representation of this ObjectID.
Returns
-------
bytes
Binary representation of the ObjectID.
"""
return self.data.binary()
@staticmethod
def from_random():
"""
Returns a randomly generated ObjectID.
Returns
-------
ObjectID
A randomly generated ObjectID.
"""
random_id = bytes(bytearray(
random.getrandbits(8) for _ in range(CUniqueID.size())))
return ObjectID(random_id)
cdef class ObjectNotAvailable:
"""
Placeholder for an object that was not available within the given timeout.
"""
pass
cdef class PlasmaBuffer(Buffer):
"""
This is the type returned by calls to get with a PlasmaClient.
We define our own class instead of directly returning a buffer object so
that we can add a custom destructor which notifies Plasma that the object
is no longer being used, so the memory in the Plasma store backing the
object can potentially be freed.
Attributes
----------
object_id : ObjectID
The ID of the object in the buffer.
client : PlasmaClient
The PlasmaClient that we use to communicate with the store and manager.
"""
cdef:
ObjectID object_id
PlasmaClient client
@staticmethod
cdef PlasmaBuffer create(ObjectID object_id, PlasmaClient client,
const shared_ptr[CBuffer]& buffer):
cdef PlasmaBuffer self = PlasmaBuffer.__new__(PlasmaBuffer)
self.object_id = object_id
self.client = client
self.init(buffer)
return self
def __init__(self):
raise TypeError("Do not call PlasmaBuffer's constructor directly, use "
"`PlasmaClient.create` instead.")
def __dealloc__(self):
"""
Notify Plasma that the object is no longer needed.
If the plasma client has been shut down, then don't do anything.
"""
self.client._release(self.object_id)
cdef class PlasmaClient:
"""
The PlasmaClient is used to interface with a plasma store and manager.
The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a
buffer, and get a buffer. Buffers are referred to by object IDs, which are
strings.
"""
cdef:
shared_ptr[CPlasmaClient] client
int notification_fd
c_string store_socket_name
def __cinit__(self):
self.client.reset(new CPlasmaClient())
self.notification_fd = -1
self.store_socket_name = b""
cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
c_vector[CObjectBuffer]* result):
cdef:
c_vector[CUniqueID] ids
ObjectID object_id
for object_id in object_ids:
ids.push_back(object_id.data)
with nogil:
check_status(self.client.get().Get(ids, timeout_ms, result))
# XXX C++ API should instead expose some kind of CreateAuto()
cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data,
int64_t size):
cdef shared_ptr[CBuffer] buffer
buffer.reset(new CMutableBuffer(data, size))
return PlasmaBuffer.create(object_id, self, buffer)
@property
def store_socket_name(self):
return self.store_socket_name.decode()
def create(self, ObjectID object_id, int64_t data_size,
c_string metadata=b""):
"""
Create a new buffer in the PlasmaStore for a particular object ID.
The returned buffer is mutable until seal is called.
Parameters
----------
object_id : ObjectID
The object ID used to identify an object.
size : int
The size in bytes of the created buffer.
metadata : bytes
An optional string of bytes encoding whatever metadata the user
wishes to encode.
Raises
------
PlasmaObjectExists
This exception is raised if the object could not be created because
there already is an object with the same ID in the plasma store.
PlasmaStoreFull: This exception is raised if the object could
not be created because the plasma store is unable to evict
enough objects to create room for it.
"""
cdef shared_ptr[CBuffer] data
with nogil:
check_status(self.client.get().Create(object_id.data, data_size,
<uint8_t*>(metadata.data()),
metadata.size(), &data))
return self._make_mutable_plasma_buffer(object_id,
data.get().mutable_data(),
data_size)
def create_and_seal(self, ObjectID object_id, c_string data,
c_string metadata=b""):
"""
Store a new object in the PlasmaStore for a particular object ID.
Parameters
----------
object_id : ObjectID
The object ID used to identify an object.
data : bytes
The object to store.
metadata : bytes
An optional string of bytes encoding whatever metadata the user
wishes to encode.
Raises
------
PlasmaObjectExists
This exception is raised if the object could not be created because
there already is an object with the same ID in the plasma store.
PlasmaStoreFull: This exception is raised if the object could
not be created because the plasma store is unable to evict
enough objects to create room for it.
"""
with nogil:
check_status(self.client.get().CreateAndSeal(object_id.data, data,
metadata))
def get_buffers(self, object_ids, timeout_ms=-1, with_meta=False):
"""
Returns data buffer from the PlasmaStore based on object ID.
If the object has not been sealed yet, this call will block. The
retrieved buffer is immutable.
Parameters
----------
object_ids : list
A list of ObjectIDs used to identify some objects.
timeout_ms : int
The number of milliseconds that the get call should block before
timing out and returning. Pass -1 if the call should block and 0
if the call should return immediately.
with_meta : bool
Returns
-------
list
If with_meta=False, this is a list of PlasmaBuffers for the data
associated with the object_ids and None if the object was not
available. If with_meta=True, this is a list of tuples of
PlasmaBuffer and metadata bytes.
"""
cdef c_vector[CObjectBuffer] object_buffers
self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
result = []
for i in range(object_buffers.size()):
if object_buffers[i].data.get() != nullptr:
data = pyarrow_wrap_buffer(object_buffers[i].data)
else:
data = None
if not with_meta:
result.append(data)
else:
if object_buffers[i].metadata.get() != nullptr:
size = object_buffers[i].metadata.get().size()
metadata = object_buffers[i].metadata.get().data()[:size]
else:
metadata = None
result.append((metadata, data))
return result
def get_metadata(self, object_ids, timeout_ms=-1):
"""
Returns metadata buffer from the PlasmaStore based on object ID.
If the object has not been sealed yet, this call will block. The
retrieved buffer is immutable.
Parameters
----------
object_ids : list
A list of ObjectIDs used to identify some objects.
timeout_ms : int
The number of milliseconds that the get call should block before
timing out and returning. Pass -1 if the call should block and 0
if the call should return immediately.
Returns
-------
list
List of PlasmaBuffers for the metadata associated with the
object_ids and None if the object was not available.
"""
cdef c_vector[CObjectBuffer] object_buffers
self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
result = []
for i in range(object_buffers.size()):
if object_buffers[i].metadata.get() != nullptr:
result.append(pyarrow_wrap_buffer(object_buffers[i].metadata))
else:
result.append(None)
return result
def put_raw_buffer(self, object value, ObjectID object_id=None,
c_string metadata=b"", int memcopy_threads=6):
"""
Store Python buffer into the object store.
Parameters
----------
value : Python object that implements the buffer protocol
A Python buffer object to store.
object_id : ObjectID, default None
If this is provided, the specified object ID will be used to refer
to the object.
metadata : bytes
An optional string of bytes encoding whatever metadata the user
wishes to encode.
memcopy_threads : int, default 6
The number of threads to use to write the serialized object into
the object store for large objects.
Returns
-------
The object ID associated to the Python buffer object.
"""
cdef ObjectID target_id = (object_id if object_id
else ObjectID.from_random())
cdef Buffer arrow_buffer = pyarrow.py_buffer(value)
write_buffer = self.create(target_id, len(value), metadata)
stream = pyarrow.FixedSizeBufferWriter(write_buffer)
stream.set_memcopy_threads(memcopy_threads)
stream.write(arrow_buffer)
self.seal(target_id)
return target_id
def put(self, object value, ObjectID object_id=None, int memcopy_threads=6,
serialization_context=None):
"""
Store a Python value into the object store.
Parameters
----------
value : object
A Python object to store.
object_id : ObjectID, default None
If this is provided, the specified object ID will be used to refer
to the object.
memcopy_threads : int, default 6
The number of threads to use to write the serialized object into
the object store for large objects.
serialization_context : pyarrow.SerializationContext, default None
Custom serialization and deserialization context.
Returns
-------
The object ID associated to the Python object.
"""
cdef ObjectID target_id = (object_id if object_id
else ObjectID.from_random())
serialized = pyarrow.serialize(value, serialization_context)
buffer = self.create(target_id, serialized.total_bytes)
stream = pyarrow.FixedSizeBufferWriter(buffer)
stream.set_memcopy_threads(memcopy_threads)
serialized.write_to(stream)
self.seal(target_id)
return target_id
def get(self, object_ids, int timeout_ms=-1, serialization_context=None):
"""
Get one or more Python values from the object store.
Parameters
----------
object_ids : list or ObjectID
Object ID or list of object IDs associated to the values we get
from the store.
timeout_ms : int, default -1
The number of milliseconds that the get call should block before
timing out and returning. Pass -1 if the call should block and 0
if the call should return immediately.
serialization_context : pyarrow.SerializationContext, default None
Custom serialization and deserialization context.
Returns
-------
list or object
Python value or list of Python values for the data associated with
the object_ids and ObjectNotAvailable if the object was not
available.
"""
if isinstance(object_ids, compat.Sequence):
results = []
buffers = self.get_buffers(object_ids, timeout_ms)
for i in range(len(object_ids)):
# buffers[i] is None if this object was not available within
# the timeout
if buffers[i]:
val = pyarrow.deserialize(buffers[i],
serialization_context)
results.append(val)
else:
results.append(ObjectNotAvailable)
return results
else:
return self.get([object_ids], timeout_ms, serialization_context)[0]
def seal(self, ObjectID object_id):
"""
Seal the buffer in the PlasmaStore for a particular object ID.
Once a buffer has been sealed, the buffer is immutable and can only be
accessed through get.
Parameters
----------
object_id : ObjectID
A string used to identify an object.
"""
with nogil:
check_status(self.client.get().Seal(object_id.data))
def _release(self, ObjectID object_id):
"""
Notify Plasma that the object is no longer needed.
Parameters
----------
object_id : ObjectID
A string used to identify an object.
"""
with nogil:
check_status(self.client.get().Release(object_id.data))
def contains(self, ObjectID object_id):
"""
Check if the object is present and sealed in the PlasmaStore.
Parameters
----------
object_id : ObjectID
A string used to identify an object.
"""
cdef c_bool is_contained
with nogil:
check_status(self.client.get().Contains(object_id.data,
&is_contained))
return is_contained
def hash(self, ObjectID object_id):
"""
Compute the checksum of an object in the object store.
Parameters
----------
object_id : ObjectID
A string used to identify an object.
Returns
-------
bytes
A digest string object's hash. If the object isn't in the object
store, the string will have length zero.
"""
cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize)
with nogil:
check_status(self.client.get().Hash(object_id.data,
digest.data()))
return bytes(digest[:])
def evict(self, int64_t num_bytes):
"""
Evict some objects until to recover some bytes.
Recover at least num_bytes bytes if possible.
Parameters
----------
num_bytes : int
The number of bytes to attempt to recover.
"""
cdef int64_t num_bytes_evicted = -1
with nogil:
check_status(self.client.get().Evict(num_bytes, num_bytes_evicted))
return num_bytes_evicted
def subscribe(self):
"""Subscribe to notifications about sealed objects."""
with nogil:
check_status(self.client.get().Subscribe(&self.notification_fd))
def get_notification_socket(self):
"""
Get the notification socket.
"""
return compat.get_socket_from_fd(self.notification_fd,
family=socket.AF_UNIX,
type=socket.SOCK_STREAM)
def decode_notification(self, const uint8_t* buf):
"""
Get the notification from the buffer.
Returns
-------
ObjectID
The object ID of the object that was stored.
int
The data size of the object that was stored.
int
The metadata size of the object that was stored.
"""
cdef CUniqueID object_id
cdef int64_t data_size
cdef int64_t metadata_size
with nogil:
check_status(self.client.get()
.DecodeNotification(buf,
&object_id,
&data_size,
&metadata_size))
return ObjectID(object_id.binary()), data_size, metadata_size
def get_next_notification(self):
"""
Get the next notification from the notification socket.
Returns
-------
ObjectID
The object ID of the object that was stored.
int
The data size of the object that was stored.
int
The metadata size of the object that was stored.
"""
cdef ObjectID object_id = ObjectID(CUniqueID.size() * b"\0")
cdef int64_t data_size
cdef int64_t metadata_size
with nogil:
check_status(self.client.get()
.GetNotification(self.notification_fd,
&object_id.data,
&data_size,
&metadata_size))
return object_id, data_size, metadata_size
def to_capsule(self):
return PyCapsule_New(<void *>self.client.get(), "plasma", NULL)
def disconnect(self):
"""
Disconnect this client from the Plasma store.
"""
with nogil:
check_status(self.client.get().Disconnect())
def delete(self, object_ids):
"""
Delete the objects with the given IDs from other object store.
Parameters
----------
object_ids : list
A list of strings used to identify the objects.
"""
cdef c_vector[CUniqueID] ids
cdef ObjectID object_id
for object_id in object_ids:
ids.push_back(object_id.data)
with nogil:
check_status(self.client.get().Delete(ids))
def list(self):
"""
Experimental: List the objects in the store.
Returns
-------
dict
Dictionary from ObjectIDs to an "info" dictionary describing the
object. The "info" dictionary has the following entries:
data_size
size of the object in bytes
metadata_size
size of the object metadata in bytes
ref_count
Number of clients referencing the object buffer
create_time
Unix timestamp of the creation of the object
construct_duration
Time the creation of the object took in seconds
state
"created" if the object is still being created and
"sealed" if it is already sealed
"""
cdef CObjectTable objects
with nogil:
check_status(self.client.get().List(&objects))
result = dict()
cdef ObjectID object_id
cdef CObjectTableEntry entry
it = objects.begin()
while it != objects.end():
object_id = ObjectID(deref(it).first.binary())
entry = deref(deref(it).second)
if entry.state == CObjectState.PLASMA_CREATED:
state = "created"
else:
state = "sealed"
result[object_id] = {
"data_size": entry.data_size,
"metadata_size": entry.metadata_size,
"ref_count": entry.ref_count,
"create_time": entry.create_time,
"construct_duration": entry.construct_duration,
"state": state
}
inc(it)
return result
def store_capacity(self):
"""
Get the memory capacity of the store.
Returns
-------
int
The memory capacity of the store in bytes.
"""
return self.client.get().store_capacity()
def connect(store_socket_name, manager_socket_name=None, int release_delay=0,
int num_retries=-1):
"""
Return a new PlasmaClient that is connected a plasma store and
optionally a manager.
Parameters
----------
store_socket_name : str
Name of the socket the plasma store is listening at.
manager_socket_name : str
This parameter is deprecated and has no effect.
release_delay : int
This parameter is deprecated and has no effect.
num_retries : int, default -1
Number of times to try to connect to plasma store. Default value of -1
uses the default (50)
"""
if manager_socket_name is not None:
warnings.warn(
"manager_socket_name in PlasmaClient.connect is deprecated",
FutureWarning)
cdef PlasmaClient result = PlasmaClient()
result.store_socket_name = store_socket_name.encode()
if release_delay != 0:
warnings.warn("release_delay in PlasmaClient.connect is deprecated",
FutureWarning)
with nogil:
check_status(result.client.get()
.Connect(result.store_socket_name, b"",
release_delay, num_retries))
return result