| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # cython: language_level = 3 |
| |
| |
| from libc.stdint cimport uintptr_t, int64_t |
| from cpython.pycapsule cimport PyCapsule_GetPointer |
| |
| from nanoarrow_c cimport ( |
| ArrowArray, |
| ArrowArrayStream, |
| ArrowArrayStreamGetNext, |
| ArrowArrayStreamGetSchema, |
| ArrowArrayStreamMove, |
| ArrowBasicArrayStreamInit, |
| ArrowBasicArrayStreamSetArray, |
| ArrowBasicArrayStreamValidate, |
| ArrowBufferAppendInt64, |
| ArrowResolveChunk64, |
| ArrowType, |
| ) |
| |
| from nanoarrow cimport _types |
| from nanoarrow._array cimport CArray |
| from nanoarrow._buffer cimport CBuffer |
| from nanoarrow._schema cimport CSchema, assert_type_equal |
| from nanoarrow._utils cimport ( |
| alloc_c_array_stream, |
| c_array_shallow_copy, |
| Error |
| ) |
| |
| from typing import Iterable, List, Tuple |
| |
| from nanoarrow import _repr_utils |
| |
| |
| cdef class CArrayStream: |
| """Low-level ArrowArrayStream wrapper |
| |
| This object is a literal wrapper around an ArrowArrayStream. It provides methods that |
| that wrap the underlying C callbacks and handles the C Data interface lifecycle |
| (i.e., initialized ArrowArrayStream structures are always released). |
| |
| See `nanoarrow.c_array_stream()` for construction and usage examples. |
| """ |
| cdef object _base |
| cdef ArrowArrayStream* _ptr |
| cdef object _cached_schema |
| |
| def __cinit__(self, object base, uintptr_t addr): |
| self._base = base |
| self._ptr = <ArrowArrayStream*>addr |
| self._cached_schema = None |
| |
| @staticmethod |
| def allocate() -> CArrayStream: |
| """Allocate a released ArrowArrayStream""" |
| cdef ArrowArrayStream* c_array_stream_out |
| base = alloc_c_array_stream(&c_array_stream_out) |
| return CArrayStream(base, <uintptr_t>c_array_stream_out) |
| |
| @staticmethod |
| def from_c_arrays(arrays: List[CArray], CSchema schema, move=False, validate=True) -> CArrayStream: |
| """Create an ArrowArrayStream from an existing set of arrays |
| |
| Given a previously resolved list of arrays, create an ArrowArrayStream |
| representation of the sequence of chunks. |
| |
| Parameters |
| ---------- |
| arrays : List[CArray] |
| A list of arrays to use as batches. |
| schema : CSchema |
| The schema that will be returned. Must be type equal with the schema |
| of each array (this is checked if validate is ``True``) |
| move : bool, optional |
| If True, transfer ownership from each array instead of creating a |
| shallow copy. This is only safe if the caller knows the origin of the |
| arrays and knows that they will not be accessed after this stream has been |
| created. |
| validate : bool, optional |
| If True, enforce type equality between the provided schema and the schema |
| of each array. |
| """ |
| cdef ArrowArrayStream* c_array_stream_out |
| base = alloc_c_array_stream(&c_array_stream_out) |
| |
| # Don't create more copies than we have to (but make sure |
| # one exists for validation if requested) |
| cdef CSchema out_schema = schema |
| if validate and not move: |
| validate_schema = schema |
| out_schema = schema.__deepcopy__() |
| elif validate: |
| validate_schema = schema.__deepcopy__() |
| out_schema = schema |
| elif not move: |
| out_schema = schema.__deepcopy__() |
| |
| cdef int code = ArrowBasicArrayStreamInit(c_array_stream_out, out_schema._ptr, len(arrays)) |
| Error.raise_error_not_ok("ArrowBasicArrayStreamInit()", code) |
| |
| cdef ArrowArray tmp |
| cdef CArray array |
| for i in range(len(arrays)): |
| array = arrays[i] |
| |
| if validate: |
| assert_type_equal(array.schema, validate_schema, False) |
| |
| if not move: |
| c_array_shallow_copy(array._base, array._ptr, &tmp) |
| ArrowBasicArrayStreamSetArray(c_array_stream_out, i, &tmp) |
| else: |
| ArrowBasicArrayStreamSetArray(c_array_stream_out, i, array._ptr) |
| |
| cdef Error error = Error() |
| if validate: |
| code = ArrowBasicArrayStreamValidate(c_array_stream_out, &error.c_error) |
| error.raise_message_not_ok("ArrowBasicArrayStreamValidate()", code) |
| |
| return CArrayStream(base, <uintptr_t>c_array_stream_out) |
| |
| def release(self): |
| """Explicitly call the release callback of this stream""" |
| if self.is_valid(): |
| self._ptr.release(self._ptr) |
| |
| @staticmethod |
| def _import_from_c_capsule(stream_capsule) -> CArrayStream: |
| """Import from a ArrowArrayStream PyCapsule. |
| |
| Parameters |
| ---------- |
| stream_capsule : PyCapsule |
| A valid PyCapsule with name 'arrow_array_stream' containing an |
| ArrowArrayStream pointer. |
| """ |
| return CArrayStream( |
| stream_capsule, |
| <uintptr_t>PyCapsule_GetPointer(stream_capsule, 'arrow_array_stream') |
| ) |
| |
| def __arrow_c_stream__(self, requested_schema=None): |
| """ |
| Export the stream as an Arrow C stream PyCapsule. |
| |
| Parameters |
| ---------- |
| requested_schema : PyCapsule | None |
| A PyCapsule containing a C ArrowSchema representation of a requested |
| schema. Not supported. |
| |
| Returns |
| ------- |
| PyCapsule |
| """ |
| self._assert_valid() |
| |
| if requested_schema is not None: |
| raise NotImplementedError("requested_schema") |
| |
| cdef: |
| ArrowArrayStream* c_array_stream_out |
| |
| array_stream_capsule = alloc_c_array_stream(&c_array_stream_out) |
| ArrowArrayStreamMove(self._ptr, c_array_stream_out) |
| return array_stream_capsule |
| |
| def _addr(self) -> int: |
| """test to see if this causes a ci fail""" |
| return <uintptr_t>self._ptr |
| |
| def is_valid(self) -> bool: |
| """Check for a non-null and non-released underlying ArrowArrayStream""" |
| return self._ptr != NULL and self._ptr.release != NULL |
| |
| def _assert_valid(self): |
| if self._ptr == NULL: |
| raise RuntimeError("array stream pointer is NULL") |
| if self._ptr.release == NULL: |
| raise RuntimeError("array stream is released") |
| |
| def _get_schema(self, CSchema schema): |
| self._assert_valid() |
| cdef Error error = Error() |
| cdef int code = ArrowArrayStreamGetSchema(self._ptr, schema._ptr, &error.c_error) |
| error.raise_message_not_ok("ArrowArrayStream::get_schema()", code) |
| |
| def _get_cached_schema(self): |
| if self._cached_schema is None: |
| self._cached_schema = CSchema.allocate() |
| self._get_schema(self._cached_schema) |
| |
| return self._cached_schema |
| |
| def get_schema(self) -> CSchema: |
| """Get the schema associated with this stream |
| |
| Calling this method will always issue a call to the underlying stream's |
| get_schema callback. |
| """ |
| out = CSchema.allocate() |
| self._get_schema(out) |
| return out |
| |
| def get_next(self) -> CArray: |
| """Get the next Array from this stream |
| |
| Raises StopIteration when there are no more arrays in this stream. |
| """ |
| self._assert_valid() |
| |
| # We return a reference to the same Python object for each |
| # Array that is returned. This is independent of get_schema(), |
| # which is guaranteed to call the C object's callback and |
| # faithfully pass on the returned value. |
| |
| cdef Error error = Error() |
| cdef CArray array = CArray.allocate(self._get_cached_schema()) |
| cdef int code = ArrowArrayStreamGetNext(self._ptr, array._ptr, &error.c_error) |
| error.raise_message_not_ok("ArrowArrayStream::get_next()", code) |
| |
| if not array.is_valid(): |
| raise StopIteration() |
| else: |
| return array |
| |
| def __iter__(self): |
| return self |
| |
| def __next__(self): |
| return self.get_next() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *args, **kwargs): |
| self.release() |
| |
| def __repr__(self): |
| return _repr_utils.array_stream_repr(self) |
| |
| |
| cdef class CMaterializedArrayStream: |
| """Optimized representation of a fully consumed ArrowArrayStream |
| |
| This class provides a data structure similar to pyarrow's ChunkedArray |
| where each consumed array is a referenced-counted shared array. This |
| class wraps the utilities provided by the nanoarrow C library to iterate |
| over and facilitate log(n) random access to items in this container. |
| """ |
| cdef CSchema _schema |
| cdef CBuffer _array_ends |
| cdef list _arrays |
| cdef int64_t _total_length |
| |
| def __cinit__(self): |
| self._arrays = [] |
| self._total_length = 0 |
| self._schema = CSchema.allocate() |
| self._array_ends = CBuffer.empty() |
| cdef int code = ArrowBufferAppendInt64(self._array_ends._ptr, 0) |
| Error.raise_error_not_ok("ArrowBufferAppendInt64()", code) |
| |
| cdef _finalize(self): |
| self._array_ends._set_data_type(<ArrowType>_types.INT64) |
| |
| @property |
| def schema(self) -> CSchema: |
| return self._schema |
| |
| def __getitem__(self, k) -> Tuple[CArray, int]: |
| cdef int64_t kint |
| cdef int array_i |
| cdef const int64_t* sorted_offsets = <int64_t*>self._array_ends._ptr.data |
| |
| if isinstance(k, slice): |
| raise NotImplementedError("index with slice") |
| |
| kint = k |
| if kint < 0: |
| kint += self._total_length |
| if kint < 0 or kint >= self._total_length: |
| raise IndexError(f"Index {kint} is out of range") |
| |
| array_i = ArrowResolveChunk64(kint, sorted_offsets, 0, len(self._arrays)) |
| kint -= sorted_offsets[array_i] |
| return self._arrays[array_i], kint |
| |
| def __len__(self) -> int: |
| return self._array_ends[len(self._arrays)] |
| |
| def __iter__(self) -> Iterable[Tuple[CArray, int]]: |
| for c_array in self._arrays: |
| for item_i in range(len(c_array)): |
| yield c_array, item_i |
| |
| def array(self, int64_t i) -> CArray: |
| return self._arrays[i] |
| |
| @property |
| def n_arrays(self) -> int: |
| return len(self._arrays) |
| |
| @property |
| def arrays(self) -> Iterable[CArray]: |
| return iter(self._arrays) |
| |
| def __arrow_c_stream__(self, requested_schema=None): |
| # When an array stream from iterable is supported, that could be used here |
| # to avoid unnessary shallow copies. |
| stream = CArrayStream.from_c_arrays( |
| self._arrays, |
| self._schema, |
| move=False, |
| validate=False |
| ) |
| |
| return stream.__arrow_c_stream__(requested_schema=requested_schema) |
| |
| def child(self, int64_t i) -> CMaterializedArrayStream: |
| cdef CMaterializedArrayStream out = CMaterializedArrayStream() |
| cdef int code |
| |
| out._schema = self._schema.child(i) |
| out._arrays = [chunk.child(i) for chunk in self._arrays] |
| for child_chunk in out._arrays: |
| out._total_length += len(child_chunk) |
| code = ArrowBufferAppendInt64(out._array_ends._ptr, out._total_length) |
| Error.raise_error_not_ok("ArrowBufferAppendInt64()", code) |
| |
| out._finalize() |
| return out |
| |
| @staticmethod |
| def from_c_arrays(arrays: Iterable[CArray], CSchema schema, bint validate=True): |
| """"Create a materialized array stream from an existing iterable of arrays |
| |
| This is slightly more efficient than creating a stream and then consuming it |
| because the implementation can avoid a shallow copy of each array. |
| """ |
| cdef CMaterializedArrayStream out = CMaterializedArrayStream() |
| |
| for array in arrays: |
| if not isinstance(array, CArray): |
| raise TypeError(f"Expected CArray but got {type(array).__name__}") |
| |
| if len(array) == 0: |
| continue |
| |
| if validate: |
| assert_type_equal(array.schema, schema, False) |
| |
| out._total_length += len(array) |
| code = ArrowBufferAppendInt64(out._array_ends._ptr, out._total_length) |
| Error.raise_error_not_ok("ArrowBufferAppendInt64()", code) |
| out._arrays.append(array) |
| |
| out._schema = schema |
| out._finalize() |
| return out |
| |
| @staticmethod |
| def from_c_array(CArray array) -> CMaterializedArrayStream: |
| """"Create a materialized array stream from a single array |
| """ |
| return CMaterializedArrayStream.from_c_arrays( |
| [array], |
| array.schema, |
| validate=False |
| ) |
| |
| @staticmethod |
| def from_c_array_stream(CArrayStream stream) -> CMaterializedArrayStream: |
| """"Create a materialized array stream from an unmaterialized ArrowArrayStream |
| """ |
| with stream: |
| return CMaterializedArrayStream.from_c_arrays( |
| stream, |
| stream._get_cached_schema(), |
| validate=False |
| ) |