| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # cython: profile=False |
| # distutils: language = c++ |
| # cython: embedsignature = True |
| # cython: language_level = 3 |
| |
| from __future__ import absolute_import |
| |
| from libcpp cimport bool as c_bool, nullptr |
| from libcpp.memory cimport shared_ptr, unique_ptr, make_shared |
| from libcpp.string cimport string as c_string |
| from libcpp.vector cimport vector as c_vector |
| from libcpp.unordered_map cimport unordered_map |
| from libc.stdint cimport int64_t, uint8_t, uintptr_t |
| from cython.operator cimport dereference as deref, preincrement as inc |
| from cpython.pycapsule cimport * |
| |
| import random |
| import socket |
| import warnings |
| |
| import pyarrow |
| from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer |
| from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer, |
| CFixedSizeBufferWriter, CStatus) |
| |
| from pyarrow import compat |
| |
| PLASMA_WAIT_TIMEOUT = 2 ** 30 |
| |
| |
| cdef extern from "plasma/common.h" nogil: |
| cdef cppclass CCudaIpcPlaceholder" plasma::internal::CudaIpcPlaceholder": |
| pass |
| |
| cdef cppclass CUniqueID" plasma::UniqueID": |
| |
| @staticmethod |
| CUniqueID from_binary(const c_string& binary) |
| |
| @staticmethod |
| CUniqueID from_random() |
| |
| c_bool operator==(const CUniqueID& rhs) const |
| |
| c_string hex() const |
| |
| c_string binary() const |
| |
| @staticmethod |
| int64_t size() |
| |
| cdef enum CObjectState" plasma::ObjectState": |
| PLASMA_CREATED" plasma::ObjectState::PLASMA_CREATED" |
| PLASMA_SEALED" plasma::ObjectState::PLASMA_SEALED" |
| |
| cdef struct CObjectTableEntry" plasma::ObjectTableEntry": |
| int fd |
| int device_num |
| int64_t map_size |
| ptrdiff_t offset |
| uint8_t* pointer |
| int64_t data_size |
| int64_t metadata_size |
| int ref_count |
| int64_t create_time |
| int64_t construct_duration |
| CObjectState state |
| shared_ptr[CCudaIpcPlaceholder] ipc_handle |
| |
| ctypedef unordered_map[CUniqueID, unique_ptr[CObjectTableEntry]] \ |
| CObjectTable" plasma::ObjectTable" |
| |
| |
| cdef extern from "plasma/common.h": |
| cdef int64_t kDigestSize" plasma::kDigestSize" |
| |
| cdef extern from "plasma/client.h" nogil: |
| |
| cdef cppclass CPlasmaClient" plasma::PlasmaClient": |
| |
| CPlasmaClient() |
| |
| CStatus Connect(const c_string& store_socket_name, |
| const c_string& manager_socket_name, |
| int release_delay, int num_retries) |
| |
| CStatus Create(const CUniqueID& object_id, int64_t data_size, |
| const uint8_t* metadata, int64_t metadata_size, |
| const shared_ptr[CBuffer]* data) |
| |
| CStatus CreateAndSeal(const CUniqueID& object_id, const c_string& data, |
| const c_string& metadata) |
| |
| CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms, |
| c_vector[CObjectBuffer]* object_buffers) |
| |
| CStatus Seal(const CUniqueID& object_id) |
| |
| CStatus Evict(int64_t num_bytes, int64_t& num_bytes_evicted) |
| |
| CStatus Hash(const CUniqueID& object_id, uint8_t* digest) |
| |
| CStatus Release(const CUniqueID& object_id) |
| |
| CStatus Contains(const CUniqueID& object_id, c_bool* has_object) |
| |
| CStatus List(CObjectTable* objects) |
| |
| CStatus Subscribe(int* fd) |
| |
| CStatus DecodeNotification(const uint8_t* buffer, |
| CUniqueID* object_id, int64_t* data_size, |
| int64_t* metadata_size) |
| |
| CStatus GetNotification(int fd, CUniqueID* object_id, |
| int64_t* data_size, int64_t* metadata_size) |
| |
| CStatus Disconnect() |
| |
| CStatus Delete(const c_vector[CUniqueID] object_ids) |
| |
| int64_t store_capacity() |
| |
| cdef extern from "plasma/client.h" nogil: |
| |
| cdef struct CObjectBuffer" plasma::ObjectBuffer": |
| shared_ptr[CBuffer] data |
| shared_ptr[CBuffer] metadata |
| |
| |
| def make_object_id(object_id): |
| return ObjectID(object_id) |
| |
| |
| cdef class ObjectID: |
| """ |
| An ObjectID represents a string of bytes used to identify Plasma objects. |
| """ |
| |
| cdef: |
| CUniqueID data |
| |
| def __cinit__(self, object_id): |
| if (not isinstance(object_id, bytes) or |
| len(object_id) != CUniqueID.size()): |
| raise ValueError("Object ID must by 20 bytes," |
| " is " + str(object_id)) |
| self.data = CUniqueID.from_binary(object_id) |
| |
| def __eq__(self, other): |
| try: |
| return self.data == (<ObjectID?>other).data |
| except TypeError: |
| return False |
| |
| def __hash__(self): |
| return hash(self.data.binary()) |
| |
| def __repr__(self): |
| return "ObjectID(" + self.data.hex().decode() + ")" |
| |
| def __reduce__(self): |
| return (make_object_id, (self.data.binary(),)) |
| |
| def binary(self): |
| """ |
| Return the binary representation of this ObjectID. |
| |
| Returns |
| ------- |
| bytes |
| Binary representation of the ObjectID. |
| """ |
| return self.data.binary() |
| |
| @staticmethod |
| def from_random(): |
| """ |
| Returns a randomly generated ObjectID. |
| |
| Returns |
| ------- |
| ObjectID |
| A randomly generated ObjectID. |
| """ |
| random_id = bytes(bytearray( |
| random.getrandbits(8) for _ in range(CUniqueID.size()))) |
| return ObjectID(random_id) |
| |
| |
| cdef class ObjectNotAvailable: |
| """ |
| Placeholder for an object that was not available within the given timeout. |
| """ |
| pass |
| |
| |
| cdef class PlasmaBuffer(Buffer): |
| """ |
| This is the type returned by calls to get with a PlasmaClient. |
| |
| We define our own class instead of directly returning a buffer object so |
| that we can add a custom destructor which notifies Plasma that the object |
| is no longer being used, so the memory in the Plasma store backing the |
| object can potentially be freed. |
| |
| Attributes |
| ---------- |
| object_id : ObjectID |
| The ID of the object in the buffer. |
| client : PlasmaClient |
| The PlasmaClient that we use to communicate with the store and manager. |
| """ |
| |
| cdef: |
| ObjectID object_id |
| PlasmaClient client |
| |
| @staticmethod |
| cdef PlasmaBuffer create(ObjectID object_id, PlasmaClient client, |
| const shared_ptr[CBuffer]& buffer): |
| cdef PlasmaBuffer self = PlasmaBuffer.__new__(PlasmaBuffer) |
| self.object_id = object_id |
| self.client = client |
| self.init(buffer) |
| return self |
| |
| def __init__(self): |
| raise TypeError("Do not call PlasmaBuffer's constructor directly, use " |
| "`PlasmaClient.create` instead.") |
| |
| def __dealloc__(self): |
| """ |
| Notify Plasma that the object is no longer needed. |
| |
| If the plasma client has been shut down, then don't do anything. |
| """ |
| self.client._release(self.object_id) |
| |
| |
| cdef class PlasmaClient: |
| """ |
| The PlasmaClient is used to interface with a plasma store and manager. |
| |
| The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a |
| buffer, and get a buffer. Buffers are referred to by object IDs, which are |
| strings. |
| """ |
| |
| cdef: |
| shared_ptr[CPlasmaClient] client |
| int notification_fd |
| c_string store_socket_name |
| |
| def __cinit__(self): |
| self.client.reset(new CPlasmaClient()) |
| self.notification_fd = -1 |
| self.store_socket_name = b"" |
| |
| cdef _get_object_buffers(self, object_ids, int64_t timeout_ms, |
| c_vector[CObjectBuffer]* result): |
| cdef: |
| c_vector[CUniqueID] ids |
| ObjectID object_id |
| |
| for object_id in object_ids: |
| ids.push_back(object_id.data) |
| with nogil: |
| check_status(self.client.get().Get(ids, timeout_ms, result)) |
| |
| # XXX C++ API should instead expose some kind of CreateAuto() |
| cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data, |
| int64_t size): |
| cdef shared_ptr[CBuffer] buffer |
| buffer.reset(new CMutableBuffer(data, size)) |
| return PlasmaBuffer.create(object_id, self, buffer) |
| |
| @property |
| def store_socket_name(self): |
| return self.store_socket_name.decode() |
| |
| def create(self, ObjectID object_id, int64_t data_size, |
| c_string metadata=b""): |
| """ |
| Create a new buffer in the PlasmaStore for a particular object ID. |
| |
| The returned buffer is mutable until seal is called. |
| |
| Parameters |
| ---------- |
| object_id : ObjectID |
| The object ID used to identify an object. |
| size : int |
| The size in bytes of the created buffer. |
| metadata : bytes |
| An optional string of bytes encoding whatever metadata the user |
| wishes to encode. |
| |
| Raises |
| ------ |
| PlasmaObjectExists |
| This exception is raised if the object could not be created because |
| there already is an object with the same ID in the plasma store. |
| |
| PlasmaStoreFull: This exception is raised if the object could |
| not be created because the plasma store is unable to evict |
| enough objects to create room for it. |
| """ |
| cdef shared_ptr[CBuffer] data |
| with nogil: |
| check_status(self.client.get().Create(object_id.data, data_size, |
| <uint8_t*>(metadata.data()), |
| metadata.size(), &data)) |
| return self._make_mutable_plasma_buffer(object_id, |
| data.get().mutable_data(), |
| data_size) |
| |
| def create_and_seal(self, ObjectID object_id, c_string data, |
| c_string metadata=b""): |
| """ |
| Store a new object in the PlasmaStore for a particular object ID. |
| |
| Parameters |
| ---------- |
| object_id : ObjectID |
| The object ID used to identify an object. |
| data : bytes |
| The object to store. |
| metadata : bytes |
| An optional string of bytes encoding whatever metadata the user |
| wishes to encode. |
| |
| Raises |
| ------ |
| PlasmaObjectExists |
| This exception is raised if the object could not be created because |
| there already is an object with the same ID in the plasma store. |
| |
| PlasmaStoreFull: This exception is raised if the object could |
| not be created because the plasma store is unable to evict |
| enough objects to create room for it. |
| """ |
| with nogil: |
| check_status(self.client.get().CreateAndSeal(object_id.data, data, |
| metadata)) |
| |
| def get_buffers(self, object_ids, timeout_ms=-1, with_meta=False): |
| """ |
| Returns data buffer from the PlasmaStore based on object ID. |
| |
| If the object has not been sealed yet, this call will block. The |
| retrieved buffer is immutable. |
| |
| Parameters |
| ---------- |
| object_ids : list |
| A list of ObjectIDs used to identify some objects. |
| timeout_ms : int |
| The number of milliseconds that the get call should block before |
| timing out and returning. Pass -1 if the call should block and 0 |
| if the call should return immediately. |
| with_meta : bool |
| |
| Returns |
| ------- |
| list |
| If with_meta=False, this is a list of PlasmaBuffers for the data |
| associated with the object_ids and None if the object was not |
| available. If with_meta=True, this is a list of tuples of |
| PlasmaBuffer and metadata bytes. |
| """ |
| cdef c_vector[CObjectBuffer] object_buffers |
| self._get_object_buffers(object_ids, timeout_ms, &object_buffers) |
| result = [] |
| for i in range(object_buffers.size()): |
| if object_buffers[i].data.get() != nullptr: |
| data = pyarrow_wrap_buffer(object_buffers[i].data) |
| else: |
| data = None |
| if not with_meta: |
| result.append(data) |
| else: |
| if object_buffers[i].metadata.get() != nullptr: |
| size = object_buffers[i].metadata.get().size() |
| metadata = object_buffers[i].metadata.get().data()[:size] |
| else: |
| metadata = None |
| result.append((metadata, data)) |
| return result |
| |
| def get_metadata(self, object_ids, timeout_ms=-1): |
| """ |
| Returns metadata buffer from the PlasmaStore based on object ID. |
| |
| If the object has not been sealed yet, this call will block. The |
| retrieved buffer is immutable. |
| |
| Parameters |
| ---------- |
| object_ids : list |
| A list of ObjectIDs used to identify some objects. |
| timeout_ms : int |
| The number of milliseconds that the get call should block before |
| timing out and returning. Pass -1 if the call should block and 0 |
| if the call should return immediately. |
| |
| Returns |
| ------- |
| list |
| List of PlasmaBuffers for the metadata associated with the |
| object_ids and None if the object was not available. |
| """ |
| cdef c_vector[CObjectBuffer] object_buffers |
| self._get_object_buffers(object_ids, timeout_ms, &object_buffers) |
| result = [] |
| for i in range(object_buffers.size()): |
| if object_buffers[i].metadata.get() != nullptr: |
| result.append(pyarrow_wrap_buffer(object_buffers[i].metadata)) |
| else: |
| result.append(None) |
| return result |
| |
| def put_raw_buffer(self, object value, ObjectID object_id=None, |
| c_string metadata=b"", int memcopy_threads=6): |
| """ |
| Store Python buffer into the object store. |
| |
| Parameters |
| ---------- |
| value : Python object that implements the buffer protocol |
| A Python buffer object to store. |
| object_id : ObjectID, default None |
| If this is provided, the specified object ID will be used to refer |
| to the object. |
| metadata : bytes |
| An optional string of bytes encoding whatever metadata the user |
| wishes to encode. |
| memcopy_threads : int, default 6 |
| The number of threads to use to write the serialized object into |
| the object store for large objects. |
| |
| Returns |
| ------- |
| The object ID associated to the Python buffer object. |
| """ |
| cdef ObjectID target_id = (object_id if object_id |
| else ObjectID.from_random()) |
| cdef Buffer arrow_buffer = pyarrow.py_buffer(value) |
| write_buffer = self.create(target_id, len(value), metadata) |
| stream = pyarrow.FixedSizeBufferWriter(write_buffer) |
| stream.set_memcopy_threads(memcopy_threads) |
| stream.write(arrow_buffer) |
| self.seal(target_id) |
| return target_id |
| |
| def put(self, object value, ObjectID object_id=None, int memcopy_threads=6, |
| serialization_context=None): |
| """ |
| Store a Python value into the object store. |
| |
| Parameters |
| ---------- |
| value : object |
| A Python object to store. |
| object_id : ObjectID, default None |
| If this is provided, the specified object ID will be used to refer |
| to the object. |
| memcopy_threads : int, default 6 |
| The number of threads to use to write the serialized object into |
| the object store for large objects. |
| serialization_context : pyarrow.SerializationContext, default None |
| Custom serialization and deserialization context. |
| |
| Returns |
| ------- |
| The object ID associated to the Python object. |
| """ |
| cdef ObjectID target_id = (object_id if object_id |
| else ObjectID.from_random()) |
| serialized = pyarrow.serialize(value, serialization_context) |
| buffer = self.create(target_id, serialized.total_bytes) |
| stream = pyarrow.FixedSizeBufferWriter(buffer) |
| stream.set_memcopy_threads(memcopy_threads) |
| serialized.write_to(stream) |
| self.seal(target_id) |
| return target_id |
| |
| def get(self, object_ids, int timeout_ms=-1, serialization_context=None): |
| """ |
| Get one or more Python values from the object store. |
| |
| Parameters |
| ---------- |
| object_ids : list or ObjectID |
| Object ID or list of object IDs associated to the values we get |
| from the store. |
| timeout_ms : int, default -1 |
| The number of milliseconds that the get call should block before |
| timing out and returning. Pass -1 if the call should block and 0 |
| if the call should return immediately. |
| serialization_context : pyarrow.SerializationContext, default None |
| Custom serialization and deserialization context. |
| |
| Returns |
| ------- |
| list or object |
| Python value or list of Python values for the data associated with |
| the object_ids and ObjectNotAvailable if the object was not |
| available. |
| """ |
| if isinstance(object_ids, compat.Sequence): |
| results = [] |
| buffers = self.get_buffers(object_ids, timeout_ms) |
| for i in range(len(object_ids)): |
| # buffers[i] is None if this object was not available within |
| # the timeout |
| if buffers[i]: |
| val = pyarrow.deserialize(buffers[i], |
| serialization_context) |
| results.append(val) |
| else: |
| results.append(ObjectNotAvailable) |
| return results |
| else: |
| return self.get([object_ids], timeout_ms, serialization_context)[0] |
| |
| def seal(self, ObjectID object_id): |
| """ |
| Seal the buffer in the PlasmaStore for a particular object ID. |
| |
| Once a buffer has been sealed, the buffer is immutable and can only be |
| accessed through get. |
| |
| Parameters |
| ---------- |
| object_id : ObjectID |
| A string used to identify an object. |
| """ |
| with nogil: |
| check_status(self.client.get().Seal(object_id.data)) |
| |
| def _release(self, ObjectID object_id): |
| """ |
| Notify Plasma that the object is no longer needed. |
| |
| Parameters |
| ---------- |
| object_id : ObjectID |
| A string used to identify an object. |
| """ |
| with nogil: |
| check_status(self.client.get().Release(object_id.data)) |
| |
| def contains(self, ObjectID object_id): |
| """ |
| Check if the object is present and sealed in the PlasmaStore. |
| |
| Parameters |
| ---------- |
| object_id : ObjectID |
| A string used to identify an object. |
| """ |
| cdef c_bool is_contained |
| with nogil: |
| check_status(self.client.get().Contains(object_id.data, |
| &is_contained)) |
| return is_contained |
| |
| def hash(self, ObjectID object_id): |
| """ |
| Compute the checksum of an object in the object store. |
| |
| Parameters |
| ---------- |
| object_id : ObjectID |
| A string used to identify an object. |
| |
| Returns |
| ------- |
| bytes |
| A digest string object's hash. If the object isn't in the object |
| store, the string will have length zero. |
| """ |
| cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize) |
| with nogil: |
| check_status(self.client.get().Hash(object_id.data, |
| digest.data())) |
| return bytes(digest[:]) |
| |
| def evict(self, int64_t num_bytes): |
| """ |
| Evict some objects until to recover some bytes. |
| |
| Recover at least num_bytes bytes if possible. |
| |
| Parameters |
| ---------- |
| num_bytes : int |
| The number of bytes to attempt to recover. |
| """ |
| cdef int64_t num_bytes_evicted = -1 |
| with nogil: |
| check_status(self.client.get().Evict(num_bytes, num_bytes_evicted)) |
| return num_bytes_evicted |
| |
| def subscribe(self): |
| """Subscribe to notifications about sealed objects.""" |
| with nogil: |
| check_status(self.client.get().Subscribe(&self.notification_fd)) |
| |
| def get_notification_socket(self): |
| """ |
| Get the notification socket. |
| """ |
| return compat.get_socket_from_fd(self.notification_fd, |
| family=socket.AF_UNIX, |
| type=socket.SOCK_STREAM) |
| |
| def decode_notification(self, const uint8_t* buf): |
| """ |
| Get the notification from the buffer. |
| |
| Returns |
| ------- |
| ObjectID |
| The object ID of the object that was stored. |
| int |
| The data size of the object that was stored. |
| int |
| The metadata size of the object that was stored. |
| """ |
| cdef CUniqueID object_id |
| cdef int64_t data_size |
| cdef int64_t metadata_size |
| with nogil: |
| check_status(self.client.get() |
| .DecodeNotification(buf, |
| &object_id, |
| &data_size, |
| &metadata_size)) |
| return ObjectID(object_id.binary()), data_size, metadata_size |
| |
| def get_next_notification(self): |
| """ |
| Get the next notification from the notification socket. |
| |
| Returns |
| ------- |
| ObjectID |
| The object ID of the object that was stored. |
| int |
| The data size of the object that was stored. |
| int |
| The metadata size of the object that was stored. |
| """ |
| cdef ObjectID object_id = ObjectID(CUniqueID.size() * b"\0") |
| cdef int64_t data_size |
| cdef int64_t metadata_size |
| with nogil: |
| check_status(self.client.get() |
| .GetNotification(self.notification_fd, |
| &object_id.data, |
| &data_size, |
| &metadata_size)) |
| return object_id, data_size, metadata_size |
| |
| def to_capsule(self): |
| return PyCapsule_New(<void *>self.client.get(), "plasma", NULL) |
| |
| def disconnect(self): |
| """ |
| Disconnect this client from the Plasma store. |
| """ |
| with nogil: |
| check_status(self.client.get().Disconnect()) |
| |
| def delete(self, object_ids): |
| """ |
| Delete the objects with the given IDs from other object store. |
| |
| Parameters |
| ---------- |
| object_ids : list |
| A list of strings used to identify the objects. |
| """ |
| cdef c_vector[CUniqueID] ids |
| cdef ObjectID object_id |
| for object_id in object_ids: |
| ids.push_back(object_id.data) |
| with nogil: |
| check_status(self.client.get().Delete(ids)) |
| |
| def list(self): |
| """ |
| Experimental: List the objects in the store. |
| |
| Returns |
| ------- |
| dict |
| Dictionary from ObjectIDs to an "info" dictionary describing the |
| object. The "info" dictionary has the following entries: |
| |
| data_size |
| size of the object in bytes |
| |
| metadata_size |
| size of the object metadata in bytes |
| |
| ref_count |
| Number of clients referencing the object buffer |
| |
| create_time |
| Unix timestamp of the creation of the object |
| |
| construct_duration |
| Time the creation of the object took in seconds |
| |
| state |
| "created" if the object is still being created and |
| "sealed" if it is already sealed |
| """ |
| cdef CObjectTable objects |
| with nogil: |
| check_status(self.client.get().List(&objects)) |
| result = dict() |
| cdef ObjectID object_id |
| cdef CObjectTableEntry entry |
| it = objects.begin() |
| while it != objects.end(): |
| object_id = ObjectID(deref(it).first.binary()) |
| entry = deref(deref(it).second) |
| if entry.state == CObjectState.PLASMA_CREATED: |
| state = "created" |
| else: |
| state = "sealed" |
| result[object_id] = { |
| "data_size": entry.data_size, |
| "metadata_size": entry.metadata_size, |
| "ref_count": entry.ref_count, |
| "create_time": entry.create_time, |
| "construct_duration": entry.construct_duration, |
| "state": state |
| } |
| inc(it) |
| return result |
| |
| def store_capacity(self): |
| """ |
| Get the memory capacity of the store. |
| |
| Returns |
| ------- |
| |
| int |
| The memory capacity of the store in bytes. |
| """ |
| return self.client.get().store_capacity() |
| |
| |
| def connect(store_socket_name, manager_socket_name=None, int release_delay=0, |
| int num_retries=-1): |
| """ |
| Return a new PlasmaClient that is connected a plasma store and |
| optionally a manager. |
| |
| Parameters |
| ---------- |
| store_socket_name : str |
| Name of the socket the plasma store is listening at. |
| manager_socket_name : str |
| This parameter is deprecated and has no effect. |
| release_delay : int |
| This parameter is deprecated and has no effect. |
| num_retries : int, default -1 |
| Number of times to try to connect to plasma store. Default value of -1 |
| uses the default (50) |
| """ |
| if manager_socket_name is not None: |
| warnings.warn( |
| "manager_socket_name in PlasmaClient.connect is deprecated", |
| FutureWarning) |
| cdef PlasmaClient result = PlasmaClient() |
| result.store_socket_name = store_socket_name.encode() |
| if release_delay != 0: |
| warnings.warn("release_delay in PlasmaClient.connect is deprecated", |
| FutureWarning) |
| with nogil: |
| check_status(result.client.get() |
| .Connect(result.store_socket_name, b"", |
| release_delay, num_retries)) |
| return result |