blob: 2db8e0038ec88113badb92892eab2ec389f55b2b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
# cython: linetrace=True
from libc.stdint cimport uint8_t, int64_t, uintptr_t
from libc.errno cimport EIO
from libc.stdio cimport snprintf
from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
from cpython cimport Py_buffer, PyBuffer_FillInfo
from nanoarrow_c cimport (
ArrowErrorCode,
ArrowError,
NANOARROW_OK,
ArrowArrayStream,
)
cdef extern from "nanoarrow_ipc.h" nogil:
struct ArrowIpcInputStream:
ArrowErrorCode (*read)(ArrowIpcInputStream* stream, uint8_t* buf,
int64_t buf_size_bytes, int64_t* size_read_out,
ArrowError* error)
void (*release)(ArrowIpcInputStream* stream)
void* private_data
struct ArrowIpcArrayStreamReaderOptions:
int64_t field_index
int use_shared_buffers
ArrowErrorCode ArrowIpcArrayStreamReaderInit(
ArrowArrayStream* out, ArrowIpcInputStream* input_stream,
ArrowIpcArrayStreamReaderOptions* options)
cdef class PyInputStreamPrivate:
cdef object _obj
cdef bint _close_obj
cdef void* _addr
cdef Py_ssize_t _size_bytes
def __cinit__(self, obj, close_obj=False):
self._obj = obj
self._close_obj = close_obj
self._addr = NULL
self._size_bytes = 0
@property
def obj(self):
return self._obj
@property
def close_obj(self):
return self._close_obj
def set_buffer(self, uintptr_t addr, Py_ssize_t size_bytes):
self._addr = <void*>addr
self._size_bytes = size_bytes
# Needed for at least some implementations of readinto()
def __len__(self):
return self._size_bytes
# Implement the buffer protocol so that this object can be used as
# the argument to xxx.readinto(). This ensures that no extra copies
# (beyond any buffering done by the upstream file-like object) are held
# since the upstream object has access to the preallocated output buffer.
# In this case, the preallocation is done by the ArrowArrayStream
# implementation before issuing each read call (two per message, with
# an extra call for a RecordBatch message to get the actual buffer data).
def __getbuffer__(self, Py_buffer* buffer, int flags):
PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, 0, flags)
def __releasebuffer__(self, Py_buffer* buffer):
pass
cdef ArrowErrorCode py_input_stream_read(ArrowIpcInputStream* stream, uint8_t* buf,
int64_t buf_size_bytes, int64_t* size_read_out,
ArrowError* error) noexcept nogil:
with gil:
stream_private = <object>stream.private_data
stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)
try:
size_read_out[0] = stream_private.obj.readinto(stream_private)
return NANOARROW_OK
except Exception as e:
cls = type(e).__name__.encode()
msg = str(e).encode()
snprintf(
error.message,
sizeof(error.message),
"%s: %s",
<const char*>cls,
<const char*>msg
)
return EIO
cdef void py_input_stream_release(ArrowIpcInputStream* stream) noexcept nogil:
with gil:
stream_private = <object>stream.private_data
if stream_private.close_obj:
stream_private.obj.close()
Py_DECREF(stream_private)
stream.private_data = NULL
stream.release = NULL
cdef class CIpcInputStream:
cdef ArrowIpcInputStream _stream
def __cinit__(self):
self._stream.release = NULL
def is_valid(self):
return self._stream.release != NULL
def __dealloc__(self):
# Duplicating release() to avoid Python API calls in the deallocator
if self._stream.release != NULL:
self._stream.release(&self._stream)
def release(self):
if self._stream.release != NULL:
self._stream.release(&self._stream)
return True
else:
return False
@staticmethod
def from_readable(obj, close_obj=False):
cdef CIpcInputStream stream = CIpcInputStream()
cdef PyInputStreamPrivate private_data = PyInputStreamPrivate(obj, close_obj)
stream._stream.private_data = <PyObject*>private_data
Py_INCREF(private_data)
stream._stream.read = &py_input_stream_read
stream._stream.release = &py_input_stream_release
return stream
def init_array_stream(CIpcInputStream input_stream, uintptr_t out):
cdef ArrowArrayStream* out_ptr = <ArrowArrayStream*>out
# There are some options here that could be exposed at some point
cdef int code = ArrowIpcArrayStreamReaderInit(out_ptr, &input_stream._stream, NULL)
if code != NANOARROW_OK:
raise RuntimeError(f"ArrowIpcArrayStreamReaderInit() failed with code [{code}]")