|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements.  See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership.  The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied.  See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  |  | 
|  | from cpython.ref cimport PyObject | 
|  |  | 
|  | import warnings | 
|  |  | 
|  |  | 
|  | def _deprecate_serialization(name): | 
|  | msg = ( | 
|  | "'pyarrow.{}' is deprecated as of 2.0.0 and will be removed in a " | 
|  | "future version. Use pickle or the pyarrow IPC functionality instead." | 
|  | ).format(name) | 
|  | warnings.warn(msg, FutureWarning, stacklevel=3) | 
|  |  | 
|  |  | 
|  | def is_named_tuple(cls): | 
|  | """ | 
|  | Return True if cls is a namedtuple and False otherwise. | 
|  | """ | 
|  | b = cls.__bases__ | 
|  | if len(b) != 1 or b[0] != tuple: | 
|  | return False | 
|  | f = getattr(cls, "_fields", None) | 
|  | if not isinstance(f, tuple): | 
|  | return False | 
|  | return all(isinstance(n, str) for n in f) | 
|  |  | 
|  |  | 
|  | class SerializationCallbackError(ArrowSerializationError): | 
|  | def __init__(self, message, example_object): | 
|  | ArrowSerializationError.__init__(self, message) | 
|  | self.example_object = example_object | 
|  |  | 
|  |  | 
|  | class DeserializationCallbackError(ArrowSerializationError): | 
|  | def __init__(self, message, type_id): | 
|  | ArrowSerializationError.__init__(self, message) | 
|  | self.type_id = type_id | 
|  |  | 
|  |  | 
|  | cdef class SerializationContext(_Weakrefable): | 
|  | cdef: | 
|  | object type_to_type_id | 
|  | object whitelisted_types | 
|  | object types_to_pickle | 
|  | object custom_serializers | 
|  | object custom_deserializers | 
|  | object pickle_serializer | 
|  | object pickle_deserializer | 
|  |  | 
|  | def __init__(self): | 
|  | # Types with special serialization handlers | 
|  | self.type_to_type_id = dict() | 
|  | self.whitelisted_types = dict() | 
|  | self.types_to_pickle = set() | 
|  | self.custom_serializers = dict() | 
|  | self.custom_deserializers = dict() | 
|  | self.pickle_serializer = pickle.dumps | 
|  | self.pickle_deserializer = pickle.loads | 
|  |  | 
|  | def set_pickle(self, serializer, deserializer): | 
|  | """ | 
|  | Set the serializer and deserializer to use for objects that are to be | 
|  | pickled. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | serializer : callable | 
|  | The serializer to use (e.g., pickle.dumps or cloudpickle.dumps). | 
|  | deserializer : callable | 
|  | The deserializer to use (e.g., pickle.dumps or cloudpickle.dumps). | 
|  | """ | 
|  | self.pickle_serializer = serializer | 
|  | self.pickle_deserializer = deserializer | 
|  |  | 
|  | def clone(self): | 
|  | """ | 
|  | Return copy of this SerializationContext. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | clone : SerializationContext | 
|  | """ | 
|  | result = SerializationContext() | 
|  | result.type_to_type_id = self.type_to_type_id.copy() | 
|  | result.whitelisted_types = self.whitelisted_types.copy() | 
|  | result.types_to_pickle = self.types_to_pickle.copy() | 
|  | result.custom_serializers = self.custom_serializers.copy() | 
|  | result.custom_deserializers = self.custom_deserializers.copy() | 
|  | result.pickle_serializer = self.pickle_serializer | 
|  | result.pickle_deserializer = self.pickle_deserializer | 
|  |  | 
|  | return result | 
|  |  | 
|  | def register_type(self, type_, type_id, pickle=False, | 
|  | custom_serializer=None, custom_deserializer=None): | 
|  | r""" | 
|  | EXPERIMENTAL: Add type to the list of types we can serialize. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | type\_ : type | 
|  | The type that we can serialize. | 
|  | type_id : string | 
|  | A string used to identify the type. | 
|  | pickle : bool | 
|  | True if the serialization should be done with pickle. | 
|  | False if it should be done efficiently with Arrow. | 
|  | custom_serializer : callable | 
|  | This argument is optional, but can be provided to | 
|  | serialize objects of the class in a particular way. | 
|  | custom_deserializer : callable | 
|  | This argument is optional, but can be provided to | 
|  | deserialize objects of the class in a particular way. | 
|  | """ | 
|  | if not isinstance(type_id, str): | 
|  | raise TypeError("The type_id argument must be a string. The value " | 
|  | "passed in has type {}.".format(type(type_id))) | 
|  |  | 
|  | self.type_to_type_id[type_] = type_id | 
|  | self.whitelisted_types[type_id] = type_ | 
|  | if pickle: | 
|  | self.types_to_pickle.add(type_id) | 
|  | if custom_serializer is not None: | 
|  | self.custom_serializers[type_id] = custom_serializer | 
|  | self.custom_deserializers[type_id] = custom_deserializer | 
|  |  | 
|  | def _serialize_callback(self, obj): | 
|  | found = False | 
|  | for type_ in type(obj).__mro__: | 
|  | if type_ in self.type_to_type_id: | 
|  | found = True | 
|  | break | 
|  |  | 
|  | if not found: | 
|  | raise SerializationCallbackError( | 
|  | "pyarrow does not know how to " | 
|  | "serialize objects of type {}.".format(type(obj)), obj | 
|  | ) | 
|  |  | 
|  | # use the closest match to type(obj) | 
|  | type_id = self.type_to_type_id[type_] | 
|  | if type_id in self.types_to_pickle: | 
|  | serialized_obj = {"data": self.pickle_serializer(obj), | 
|  | "pickle": True} | 
|  | elif type_id in self.custom_serializers: | 
|  | serialized_obj = {"data": self.custom_serializers[type_id](obj)} | 
|  | else: | 
|  | if is_named_tuple(type_): | 
|  | serialized_obj = {} | 
|  | serialized_obj["_pa_getnewargs_"] = obj.__getnewargs__() | 
|  | elif hasattr(obj, "__dict__"): | 
|  | serialized_obj = obj.__dict__ | 
|  | else: | 
|  | msg = "We do not know how to serialize " \ | 
|  | "the object '{}'".format(obj) | 
|  | raise SerializationCallbackError(msg, obj) | 
|  | return dict(serialized_obj, **{"_pytype_": type_id}) | 
|  |  | 
|  | def _deserialize_callback(self, serialized_obj): | 
|  | type_id = serialized_obj["_pytype_"] | 
|  | if isinstance(type_id, bytes): | 
|  | # ARROW-4675: Python 2 serialized, read in Python 3 | 
|  | type_id = frombytes(type_id) | 
|  |  | 
|  | if "pickle" in serialized_obj: | 
|  | # The object was pickled, so unpickle it. | 
|  | obj = self.pickle_deserializer(serialized_obj["data"]) | 
|  | else: | 
|  | assert type_id not in self.types_to_pickle | 
|  | if type_id not in self.whitelisted_types: | 
|  | msg = "Type ID " + type_id + " not registered in " \ | 
|  | "deserialization callback" | 
|  | raise DeserializationCallbackError(msg, type_id) | 
|  | type_ = self.whitelisted_types[type_id] | 
|  | if type_id in self.custom_deserializers: | 
|  | obj = self.custom_deserializers[type_id]( | 
|  | serialized_obj["data"]) | 
|  | else: | 
|  | # In this case, serialized_obj should just be | 
|  | # the __dict__ field. | 
|  | if "_pa_getnewargs_" in serialized_obj: | 
|  | obj = type_.__new__( | 
|  | type_, *serialized_obj["_pa_getnewargs_"]) | 
|  | else: | 
|  | obj = type_.__new__(type_) | 
|  | serialized_obj.pop("_pytype_") | 
|  | obj.__dict__.update(serialized_obj) | 
|  | return obj | 
|  |  | 
|  | def serialize(self, obj): | 
|  | """ | 
|  | Call pyarrow.serialize and pass this SerializationContext. | 
|  | """ | 
|  | return serialize(obj, context=self) | 
|  |  | 
|  | def serialize_to(self, object value, sink): | 
|  | """ | 
|  | Call pyarrow.serialize_to and pass this SerializationContext. | 
|  | """ | 
|  | return serialize_to(value, sink, context=self) | 
|  |  | 
|  | def deserialize(self, what): | 
|  | """ | 
|  | Call pyarrow.deserialize and pass this SerializationContext. | 
|  | """ | 
|  | return deserialize(what, context=self) | 
|  |  | 
|  | def deserialize_components(self, what): | 
|  | """ | 
|  | Call pyarrow.deserialize_components and pass this SerializationContext. | 
|  | """ | 
|  | return deserialize_components(what, context=self) | 
|  |  | 
|  |  | 
|  | _default_serialization_context = SerializationContext() | 
|  | _default_context_initialized = False | 
|  |  | 
|  |  | 
|  | def _get_default_context(): | 
|  | global _default_context_initialized | 
|  | from pyarrow.serialization import _register_default_serialization_handlers | 
|  | if not _default_context_initialized: | 
|  | _register_default_serialization_handlers( | 
|  | _default_serialization_context) | 
|  | _default_context_initialized = True | 
|  | return _default_serialization_context | 
|  |  | 
|  |  | 
|  | cdef class SerializedPyObject(_Weakrefable): | 
|  | """ | 
|  | Arrow-serialized representation of Python object. | 
|  | """ | 
|  | cdef: | 
|  | CSerializedPyObject data | 
|  |  | 
|  | cdef readonly: | 
|  | object base | 
|  |  | 
|  | @property | 
|  | def total_bytes(self): | 
|  | cdef CMockOutputStream mock_stream | 
|  | with nogil: | 
|  | check_status(self.data.WriteTo(&mock_stream)) | 
|  |  | 
|  | return mock_stream.GetExtentBytesWritten() | 
|  |  | 
|  | def write_to(self, sink): | 
|  | """ | 
|  | Write serialized object to a sink. | 
|  | """ | 
|  | cdef shared_ptr[COutputStream] stream | 
|  | get_writer(sink, &stream) | 
|  | self._write_to(stream.get()) | 
|  |  | 
|  | cdef _write_to(self, COutputStream* stream): | 
|  | with nogil: | 
|  | check_status(self.data.WriteTo(stream)) | 
|  |  | 
|  | def deserialize(self, SerializationContext context=None): | 
|  | """ | 
|  | Convert back to Python object. | 
|  | """ | 
|  | cdef PyObject* result | 
|  |  | 
|  | if context is None: | 
|  | context = _get_default_context() | 
|  |  | 
|  | with nogil: | 
|  | check_status(DeserializeObject(context, self.data, | 
|  | <PyObject*> self.base, &result)) | 
|  |  | 
|  | # PyObject_to_object is necessary to avoid a memory leak; | 
|  | # also unpack the list the object was wrapped in in serialize | 
|  | return PyObject_to_object(result)[0] | 
|  |  | 
|  | def to_buffer(self, nthreads=1): | 
|  | """ | 
|  | Write serialized data as Buffer. | 
|  | """ | 
|  | cdef Buffer output = allocate_buffer(self.total_bytes) | 
|  | sink = FixedSizeBufferWriter(output) | 
|  | if nthreads > 1: | 
|  | sink.set_memcopy_threads(nthreads) | 
|  | self.write_to(sink) | 
|  | return output | 
|  |  | 
|  | @staticmethod | 
|  | def from_components(components): | 
|  | """ | 
|  | Reconstruct SerializedPyObject from output of | 
|  | SerializedPyObject.to_components. | 
|  | """ | 
|  | cdef: | 
|  | int num_tensors = components['num_tensors'] | 
|  | int num_ndarrays = components['num_ndarrays'] | 
|  | int num_buffers = components['num_buffers'] | 
|  | list buffers = components['data'] | 
|  | SparseTensorCounts num_sparse_tensors = SparseTensorCounts() | 
|  | SerializedPyObject result = SerializedPyObject() | 
|  |  | 
|  | num_sparse_tensors.coo = components['num_sparse_tensors']['coo'] | 
|  | num_sparse_tensors.csr = components['num_sparse_tensors']['csr'] | 
|  | num_sparse_tensors.csc = components['num_sparse_tensors']['csc'] | 
|  | num_sparse_tensors.csf = components['num_sparse_tensors']['csf'] | 
|  | num_sparse_tensors.ndim_csf = \ | 
|  | components['num_sparse_tensors']['ndim_csf'] | 
|  |  | 
|  | with nogil: | 
|  | check_status(GetSerializedFromComponents(num_tensors, | 
|  | num_sparse_tensors, | 
|  | num_ndarrays, | 
|  | num_buffers, | 
|  | buffers, &result.data)) | 
|  |  | 
|  | return result | 
|  |  | 
|  | def to_components(self, memory_pool=None): | 
|  | """ | 
|  | Return the decomposed dict representation of the serialized object | 
|  | containing a collection of Buffer objects which maximize opportunities | 
|  | for zero-copy. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | memory_pool : MemoryPool default None | 
|  | Pool to use for necessary allocations. | 
|  |  | 
|  | Returns | 
|  |  | 
|  | """ | 
|  | cdef PyObject* result | 
|  | cdef CMemoryPool* c_pool = maybe_unbox_memory_pool(memory_pool) | 
|  |  | 
|  | with nogil: | 
|  | check_status(self.data.GetComponents(c_pool, &result)) | 
|  |  | 
|  | return PyObject_to_object(result) | 
|  |  | 
|  |  | 
|  | def serialize(object value, SerializationContext context=None): | 
|  | """ | 
|  | DEPRECATED: Serialize a general Python sequence for transient storage | 
|  | and transport. | 
|  |  | 
|  | .. deprecated:: 2.0 | 
|  | The custom serialization functionality is deprecated in pyarrow 2.0, | 
|  | and will be removed in a future version. Use the standard library | 
|  | ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for | 
|  | more). | 
|  |  | 
|  | Notes | 
|  | ----- | 
|  | This function produces data that is incompatible with the standard | 
|  | Arrow IPC binary protocol, i.e. it cannot be used with ipc.open_stream or | 
|  | ipc.open_file. You can use deserialize, deserialize_from, or | 
|  | deserialize_components to read it. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | value: object | 
|  | Python object for the sequence that is to be serialized. | 
|  | context : SerializationContext | 
|  | Custom serialization and deserialization context, uses a default | 
|  | context with some standard type handlers if not specified. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | serialized : SerializedPyObject | 
|  |  | 
|  | """ | 
|  | _deprecate_serialization("serialize") | 
|  | return _serialize(value, context) | 
|  |  | 
|  |  | 
|  | def _serialize(object value, SerializationContext context=None): | 
|  | cdef SerializedPyObject serialized = SerializedPyObject() | 
|  | wrapped_value = [value] | 
|  |  | 
|  | if context is None: | 
|  | context = _get_default_context() | 
|  |  | 
|  | with nogil: | 
|  | check_status(SerializeObject(context, wrapped_value, &serialized.data)) | 
|  | return serialized | 
|  |  | 
|  |  | 
|  | def serialize_to(object value, sink, SerializationContext context=None): | 
|  | """ | 
|  | DEPRECATED: Serialize a Python sequence to a file. | 
|  |  | 
|  | .. deprecated:: 2.0 | 
|  | The custom serialization functionality is deprecated in pyarrow 2.0, | 
|  | and will be removed in a future version. Use the standard library | 
|  | ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for | 
|  | more). | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | value: object | 
|  | Python object for the sequence that is to be serialized. | 
|  | sink: NativeFile or file-like | 
|  | File the sequence will be written to. | 
|  | context : SerializationContext | 
|  | Custom serialization and deserialization context, uses a default | 
|  | context with some standard type handlers if not specified. | 
|  | """ | 
|  | _deprecate_serialization("serialize_to") | 
|  | serialized = _serialize(value, context) | 
|  | serialized.write_to(sink) | 
|  |  | 
|  |  | 
|  | def read_serialized(source, base=None): | 
|  | """ | 
|  | DEPRECATED: Read serialized Python sequence from file-like object. | 
|  |  | 
|  | .. deprecated:: 2.0 | 
|  | The custom serialization functionality is deprecated in pyarrow 2.0, | 
|  | and will be removed in a future version. Use the standard library | 
|  | ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for | 
|  | more). | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | source: NativeFile | 
|  | File to read the sequence from. | 
|  | base: object | 
|  | This object will be the base object of all the numpy arrays | 
|  | contained in the sequence. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | serialized : the serialized data | 
|  | """ | 
|  | _deprecate_serialization("read_serialized") | 
|  | return _read_serialized(source, base=base) | 
|  |  | 
|  |  | 
|  | def _read_serialized(source, base=None): | 
|  | cdef shared_ptr[CRandomAccessFile] stream | 
|  | get_reader(source, True, &stream) | 
|  |  | 
|  | cdef SerializedPyObject serialized = SerializedPyObject() | 
|  | serialized.base = base | 
|  | with nogil: | 
|  | check_status(ReadSerializedObject(stream.get(), &serialized.data)) | 
|  |  | 
|  | return serialized | 
|  |  | 
|  |  | 
|  | def deserialize_from(source, object base, SerializationContext context=None): | 
|  | """ | 
|  | DEPRECATED: Deserialize a Python sequence from a file. | 
|  |  | 
|  | .. deprecated:: 2.0 | 
|  | The custom serialization functionality is deprecated in pyarrow 2.0, | 
|  | and will be removed in a future version. Use the standard library | 
|  | ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for | 
|  | more). | 
|  |  | 
|  | This only can interact with data produced by pyarrow.serialize or | 
|  | pyarrow.serialize_to. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | source: NativeFile | 
|  | File to read the sequence from. | 
|  | base: object | 
|  | This object will be the base object of all the numpy arrays | 
|  | contained in the sequence. | 
|  | context : SerializationContext | 
|  | Custom serialization and deserialization context. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | object | 
|  | Python object for the deserialized sequence. | 
|  | """ | 
|  | _deprecate_serialization("deserialize_from") | 
|  | serialized = _read_serialized(source, base=base) | 
|  | return serialized.deserialize(context) | 
|  |  | 
|  |  | 
|  | def deserialize_components(components, SerializationContext context=None): | 
|  | """ | 
|  | DEPRECATED: Reconstruct Python object from output of | 
|  | SerializedPyObject.to_components. | 
|  |  | 
|  | .. deprecated:: 2.0 | 
|  | The custom serialization functionality is deprecated in pyarrow 2.0, | 
|  | and will be removed in a future version. Use the standard library | 
|  | ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for | 
|  | more). | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | components : dict | 
|  | Output of SerializedPyObject.to_components | 
|  | context : SerializationContext, default None | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | object : the Python object that was originally serialized | 
|  | """ | 
|  | _deprecate_serialization("deserialize_components") | 
|  | serialized = SerializedPyObject.from_components(components) | 
|  | return serialized.deserialize(context) | 
|  |  | 
|  |  | 
|  | def deserialize(obj, SerializationContext context=None): | 
|  | """ | 
|  | DEPRECATED: Deserialize Python object from Buffer or other Python | 
|  | object supporting the buffer protocol. | 
|  |  | 
|  | .. deprecated:: 2.0 | 
|  | The custom serialization functionality is deprecated in pyarrow 2.0, | 
|  | and will be removed in a future version. Use the standard library | 
|  | ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for | 
|  | more). | 
|  |  | 
|  | This only can interact with data produced by pyarrow.serialize or | 
|  | pyarrow.serialize_to. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | obj : pyarrow.Buffer or Python object supporting buffer protocol | 
|  | context : SerializationContext | 
|  | Custom serialization and deserialization context. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | deserialized : object | 
|  | """ | 
|  | _deprecate_serialization("deserialize") | 
|  | return _deserialize(obj, context=context) | 
|  |  | 
|  |  | 
|  | def _deserialize(obj, SerializationContext context=None): | 
|  | source = BufferReader(obj) | 
|  | serialized = _read_serialized(source, base=obj) | 
|  | return serialized.deserialize(context) |