blob: 869087b57b4bb16c4c6c53be161d4ab38237090a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
from libc.stdint cimport uintptr_t, uint8_t, int64_t
from cpython.pycapsule cimport PyCapsule_GetPointer
from cpython.unicode cimport PyUnicode_AsUTF8AndSize
from cpython cimport (
Py_buffer,
PyBuffer_Release,
PyBUF_ANY_CONTIGUOUS,
PyBUF_FORMAT,
PyBytes_FromStringAndSize,
PyObject_GetBuffer,
PyUnicode_FromStringAndSize,
)
from nanoarrow_c cimport (
ArrowArray,
ArrowArrayAppendBytes,
ArrowArrayAppendNull,
ArrowArrayAppendString,
ArrowArrayBuffer,
ArrowArrayFinishBuilding,
ArrowArrayInitFromSchema,
ArrowArrayInitFromType,
ArrowArrayMove,
ArrowArrayRelease,
ArrowArrayStartAppending,
ArrowArrayView,
ArrowArrayViewComputeNullCount,
ArrowArrayViewInitFromSchema,
ArrowArrayViewIsNull,
ArrowArrayViewGetBytesUnsafe,
ArrowArrayViewGetBufferDataType,
ArrowArrayViewGetBufferElementSizeBits,
ArrowArrayViewGetBufferType,
ArrowArrayViewGetBufferView,
ArrowArrayViewGetNumBuffers,
ArrowArrayViewGetStringUnsafe,
ArrowArrayViewSetArray,
ArrowArrayViewSetArrayMinimal,
ArrowBitCountSet,
ArrowBuffer,
ArrowBufferMove,
ArrowBufferType,
ArrowBufferView,
ArrowSchemaInitFromType,
ArrowStringView,
ArrowType,
ArrowTypeString,
ArrowValidationLevel,
NANOARROW_BUFFER_TYPE_DATA,
NANOARROW_BUFFER_TYPE_DATA_OFFSET,
NANOARROW_BUFFER_TYPE_VARIADIC_DATA,
NANOARROW_BUFFER_TYPE_VARIADIC_SIZE,
NANOARROW_BUFFER_TYPE_TYPE_ID,
NANOARROW_BUFFER_TYPE_UNION_OFFSET,
NANOARROW_BUFFER_TYPE_VALIDITY,
NANOARROW_VALIDATION_LEVEL_DEFAULT,
NANOARROW_VALIDATION_LEVEL_FULL,
NANOARROW_VALIDATION_LEVEL_MINIMAL,
NANOARROW_VALIDATION_LEVEL_NONE,
NANOARROW_OK,
)
from nanoarrow_device_c cimport (
ARROW_DEVICE_CPU,
ArrowDeviceType,
ArrowDeviceArray,
ArrowDeviceArrayInit,
)
from nanoarrow._device cimport Device, CSharedSyncEvent
from nanoarrow._buffer cimport CBuffer, CBufferView
from nanoarrow._schema cimport CSchema, CLayout
from nanoarrow._utils cimport (
alloc_c_array,
alloc_c_device_array,
alloc_c_array_view,
c_array_shallow_copy,
c_device_array_shallow_copy,
Error
)
from typing import Iterable, Tuple, Union
from nanoarrow import _repr_utils
from nanoarrow._device import DEVICE_CPU, DeviceType
cdef class CArrayView:
"""Low-level ArrowArrayView wrapper
This object is a literal wrapper around an ArrowArrayView. It provides field accessors
that return Python objects and handles the structure lifecycle (i.e., initialized
ArrowArrayView structures are always released).
See `nanoarrow.c_array_view()` for construction and usage examples.
"""
def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowArrayView*>addr
self._event = CSharedSyncEvent(DEVICE_CPU)
def _set_array(self, CArray array, Device device=DEVICE_CPU):
cdef Error error = Error()
cdef int code
if device is DEVICE_CPU:
code = ArrowArrayViewSetArray(self._ptr, array._ptr, &error.c_error)
else:
code = ArrowArrayViewSetArrayMinimal(self._ptr, array._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayViewSetArray()", code)
self._array_base = array._base
self._event = CSharedSyncEvent(device, <uintptr_t>array._sync_event)
return self
@property
def storage_type_id(self):
return self._ptr.storage_type
@property
def storage_type(self):
cdef const char* type_str = ArrowTypeString(self._ptr.storage_type)
if type_str != NULL:
return type_str.decode('UTF-8')
@property
def layout(self):
return CLayout(self, <uintptr_t>&self._ptr.layout)
def __len__(self):
return self._ptr.length
@property
def length(self):
return len(self)
@property
def offset(self):
return self._ptr.offset
@property
def null_count(self):
if self._ptr.null_count != -1:
return self._ptr.null_count
cdef ArrowBufferType buffer_type = self._ptr.layout.buffer_type[0]
cdef const uint8_t* validity_bits = self._ptr.buffer_views[0].data.as_uint8
if buffer_type != NANOARROW_BUFFER_TYPE_VALIDITY:
self._ptr.null_count = 0
elif validity_bits == NULL:
self._ptr.null_count = 0
elif self._event.device is DEVICE_CPU:
self._ptr.null_count = ArrowArrayViewComputeNullCount(self._ptr)
return self._ptr.null_count
@property
def n_children(self):
return self._ptr.n_children
def child(self, int64_t i):
if i < 0 or i >= self._ptr.n_children:
raise IndexError(f"{i} out of range [0, {self._ptr.n_children})")
cdef CArrayView child = CArrayView(
self._base,
<uintptr_t>self._ptr.children[i]
)
child._event = self._event
return child
@property
def children(self):
for i in range(self.n_children):
yield self.child(i)
@property
def n_buffers(self):
return ArrowArrayViewGetNumBuffers(self._ptr)
def _buffer_info(self, int64_t i):
if i < 0 or i >= self.n_buffers:
raise IndexError(f"{i} out of range [0, {self.n_buffers}]")
cdef ArrowBufferView view = ArrowArrayViewGetBufferView(self._ptr, i)
return (
ArrowArrayViewGetBufferType(self._ptr, i),
ArrowArrayViewGetBufferDataType(self._ptr, i),
ArrowArrayViewGetBufferElementSizeBits(self._ptr, i),
<uintptr_t>view.data.data,
view.size_bytes
)
def buffer_type(self, int64_t i):
buffer_type = self._buffer_info(i)[0]
if buffer_type == NANOARROW_BUFFER_TYPE_VALIDITY:
return "validity"
elif buffer_type == NANOARROW_BUFFER_TYPE_TYPE_ID:
return "type_id"
elif buffer_type == NANOARROW_BUFFER_TYPE_UNION_OFFSET:
return "union_offset"
elif buffer_type == NANOARROW_BUFFER_TYPE_DATA_OFFSET:
return "data_offset"
elif buffer_type == NANOARROW_BUFFER_TYPE_DATA:
return "data"
elif buffer_type == NANOARROW_BUFFER_TYPE_VARIADIC_DATA:
return "variadic_data"
elif buffer_type == NANOARROW_BUFFER_TYPE_VARIADIC_SIZE:
return "variadic_size"
else:
return "none"
def buffer(self, int64_t i):
_, data_type, element_size_bits, addr, size = self._buffer_info(i)
cdef ArrowBufferView buffer_view
buffer_view.data.data = <void*>addr
buffer_view.size_bytes = size
# Check the buffer size here because the error later is cryptic.
# Buffer sizes are set to -1 when they are "unknown", so because of errors
# in nanoarrow/C or because the array is on a non-CPU device, that -1 value
# could leak its way here.
if buffer_view.size_bytes < 0:
raise RuntimeError(f"ArrowArrayView buffer {i} has size_bytes < 0")
return CBufferView(
self._array_base,
addr,
size,
data_type,
element_size_bits,
self._event
)
@property
def buffers(self):
for i in range(self.n_buffers):
yield self.buffer(i)
@property
def dictionary(self):
if self._ptr.dictionary == NULL:
return None
cdef CArrayView dictionary = CArrayView(
self,
<uintptr_t>self._ptr.dictionary
)
dictionary._event = self._event
return dictionary
def _iter_bytes(self, int64_t offset, int64_t length) -> bytes | None:
cdef ArrowBufferView item_view
for i in range(offset, length):
if ArrowArrayViewIsNull(self._ptr, i):
yield None
else:
item_view = ArrowArrayViewGetBytesUnsafe(self._ptr, i)
yield PyBytes_FromStringAndSize(item_view.data.as_char, item_view.size_bytes)
def _iter_str(self, int64_t offset, int64_t length) -> str | None:
cdef ArrowStringView item_view
for i in range(offset, length):
if ArrowArrayViewIsNull(self._ptr, i):
yield None
else:
item_view = ArrowArrayViewGetStringUnsafe(self._ptr, i)
yield PyUnicode_FromStringAndSize(item_view.data, item_view.size_bytes)
def __repr__(self):
return _repr_utils.array_view_repr(self)
@staticmethod
def from_schema(CSchema schema):
cdef ArrowArrayView* c_array_view
base = alloc_c_array_view(&c_array_view)
cdef Error error = Error()
cdef int code = ArrowArrayViewInitFromSchema(c_array_view,
schema._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayViewInitFromSchema()", code)
return CArrayView(base, <uintptr_t>c_array_view)
@staticmethod
def from_array(CArray array, Device device=DEVICE_CPU):
out = CArrayView.from_schema(array._schema)
return out._set_array(array, device)
cdef class CArray:
"""Low-level ArrowArray wrapper
This object is a literal wrapper around a read-only ArrowArray. It provides field accessors
that return Python objects and handles the C Data interface lifecycle (i.e., initialized
ArrowArray structures are always released).
See `nanoarrow.c_array()` for construction and usage examples.
"""
@staticmethod
def allocate(CSchema schema) -> CArray:
"""Allocate a released ArrowArray"""
cdef ArrowArray* c_array_out
base = alloc_c_array(&c_array_out)
return CArray(base, <uintptr_t>c_array_out, schema)
def __cinit__(self, object base, uintptr_t addr, CSchema schema):
self._base = base
self._ptr = <ArrowArray*>addr
self._schema = schema
self._device_type = ARROW_DEVICE_CPU
self._device_id = -1
self._sync_event = NULL
cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id, void* sync_event):
self._device_type = device_type
self._device_id = device_id
self._sync_event = sync_event
@staticmethod
def _import_from_c_capsule(schema_capsule, array_capsule) -> CArray:
"""Import from a ArrowSchema and ArrowArray PyCapsule tuple.
Parameters
----------
schema_capsule : PyCapsule
A valid PyCapsule with name 'arrow_schema' containing an
ArrowSchema pointer.
array_capsule : PyCapsule
A valid PyCapsule with name 'arrow_array' containing an
ArrowArray pointer.
"""
cdef:
CSchema out_schema
CArray out
out_schema = CSchema._import_from_c_capsule(schema_capsule)
out = CArray(
array_capsule,
<uintptr_t>PyCapsule_GetPointer(array_capsule, 'arrow_array'),
out_schema
)
return out
def __getitem__(self, k) -> CArray:
self._assert_valid()
if not isinstance(k, slice):
raise TypeError(
f"Can't subset CArray with object of type {type(k).__name__}")
if k.step is not None:
raise ValueError("Can't slice CArray with step")
cdef int64_t start = 0 if k.start is None else k.start
cdef int64_t stop = self._ptr.length if k.stop is None else k.stop
if start < 0:
start = self._ptr.length + start
if stop < 0:
stop = self._ptr.length + stop
if start > self._ptr.length or stop > self._ptr.length or stop < start:
raise IndexError(
f"{k} does not describe a valid slice of CArray "
f"with length {self._ptr.length}"
)
cdef ArrowArray* c_array_out
base = alloc_c_array(&c_array_out)
c_array_shallow_copy(self._base, self._ptr, c_array_out)
c_array_out.offset = c_array_out.offset + start
c_array_out.length = stop - start
cdef CArray out = CArray(base, <uintptr_t>c_array_out, self._schema)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out
def __arrow_c_array__(self, requested_schema=None):
"""
Get a pair of PyCapsules containing a C ArrowArray representation of the object.
Parameters
----------
requested_schema : PyCapsule | None
A PyCapsule containing a C ArrowSchema representation of a requested
schema. Not supported.
Returns
-------
Tuple[PyCapsule, PyCapsule]
A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
respectively.
"""
self._assert_valid()
if self._device_type != ARROW_DEVICE_CPU:
raise ValueError(
"Can't invoke __arrow_c_array__ on non-CPU array "
f"with device_type {self._device_type}")
if requested_schema is not None:
raise NotImplementedError("requested_schema")
# Export a shallow copy pointing to the same data in a way
# that ensures this object stays valid.
# TODO optimize this to export a version where children are reference
# counted and can be released separately
cdef ArrowArray* c_array_out
array_capsule = alloc_c_array(&c_array_out)
c_array_shallow_copy(self._base, self._ptr, c_array_out)
return self._schema.__arrow_c_schema__(), array_capsule
def _addr(self) -> int:
return <uintptr_t>self._ptr
def is_valid(self) -> bool:
"""Check for a non-null and non-released underlying ArrowArray"""
return self._ptr != NULL and self._ptr.release != NULL
def _assert_valid(self):
if self._ptr == NULL:
raise RuntimeError("CArray is NULL")
if self._ptr.release == NULL:
raise RuntimeError("CArray is released")
def view(self) -> CArrayView:
"""Allocate a :class:`CArrayView` to access the buffers of this array"""
device = Device.resolve(self._device_type, self._device_id)
return CArrayView.from_array(self, device)
@property
def schema(self) -> CSchema:
return self._schema
@property
def device_type(self) -> DeviceType:
return DeviceType(self._device_type)
@property
def device_type_id(self) -> int:
return self._device_type
@property
def device_id(self) -> int:
return self._device_id
def __len__(self) -> int:
self._assert_valid()
return self._ptr.length
@property
def length(self) -> int:
return len(self)
@property
def offset(self) -> int:
self._assert_valid()
return self._ptr.offset
@property
def null_count(self) -> int:
self._assert_valid()
return self._ptr.null_count
@property
def n_buffers(self) -> int:
self._assert_valid()
return self._ptr.n_buffers
@property
def buffers(self) -> Tuple[int, ...]:
self._assert_valid()
return tuple(<uintptr_t>self._ptr.buffers[i] for i in range(self._ptr.n_buffers))
@property
def n_children(self) -> int:
self._assert_valid()
return self._ptr.n_children
def child(self, int64_t i):
self._assert_valid()
if i < 0 or i >= self._ptr.n_children:
raise IndexError(f"{i} out of range [0, {self._ptr.n_children})")
cdef CArray out = CArray(
self._base,
<uintptr_t>self._ptr.children[i],
self._schema.child(i)
)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out
@property
def children(self) -> Iterable[CArray]:
for i in range(self.n_children):
yield self.child(i)
@property
def dictionary(self) -> Union[CArray, None]:
self._assert_valid()
cdef CArray out
if self._ptr.dictionary != NULL:
out = CArray(self, <uintptr_t>self._ptr.dictionary, self._schema.dictionary)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out
else:
return None
def __repr__(self) -> str:
return _repr_utils.array_repr(self)
cdef class CArrayBuilder:
"""Helper for constructing an ArrowArray
The primary function of this class is to wrap the nanoarrow C library calls
that build up the components of an ArrowArray.
"""
cdef CArray c_array
cdef ArrowArray* _ptr
cdef Device _device
cdef bint _can_validate
def __cinit__(self, CArray array, Device device=DEVICE_CPU):
self.c_array = array
self._ptr = array._ptr
self._device = device
self._can_validate = device is DEVICE_CPU
@staticmethod
def allocate(Device device=DEVICE_CPU):
"""Create a CArrayBuilder
Allocates memory for an ArrowArray and populates it with nanoarrow's
ArrowArray private_data/release callback implementation. This should
usually be followed by :meth:`init_from_type` or :meth:`init_from_schema`.
"""
return CArrayBuilder(CArray.allocate(CSchema.allocate()), device)
def is_empty(self) -> bool:
"""Check if any items have been appended to this builder"""
if self._ptr.release == NULL:
raise RuntimeError("CArrayBuilder is not initialized")
return self._ptr.length == 0
def init_from_type(self, int type_id) -> CArrayBuilder:
if self._ptr.release != NULL:
raise RuntimeError("CArrayBuilder is already initialized")
cdef int code = ArrowArrayInitFromType(self._ptr, <ArrowType>type_id)
Error.raise_error_not_ok("ArrowArrayInitFromType()", code)
code = ArrowSchemaInitFromType(self.c_array._schema._ptr, <ArrowType>type_id)
Error.raise_error_not_ok("ArrowSchemaInitFromType()", code)
return self
def init_from_schema(self, CSchema schema) -> CArrayBuilder:
if self._ptr.release != NULL:
raise RuntimeError("CArrayBuilder is already initialized")
cdef Error error = Error()
cdef int code = ArrowArrayInitFromSchema(self._ptr, schema._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayInitFromType()", code)
self.c_array._schema = schema
return self
def start_appending(self) -> CArrayBuilder:
"""Use append mode for building this ArrowArray
Calling this method is required to produce a valid array prior to calling
:meth:`append_strings` or `append_bytes`.
"""
if self._device != DEVICE_CPU:
raise ValueError("Can't append to non-CPU array")
cdef int code = ArrowArrayStartAppending(self._ptr)
Error.raise_error_not_ok("ArrowArrayStartAppending()", code)
return self
def append_strings(self, obj: Iterable[Union[str, None]]) -> CArrayBuilder:
cdef int code
cdef Py_ssize_t item_utf8_size
cdef ArrowStringView item
for py_item in obj:
if py_item is None:
code = ArrowArrayAppendNull(self._ptr, 1)
else:
# Cython raises the error from PyUnicode_AsUTF8AndSize()
# in the event that py_item is not a str(); however, we
# set item_utf8_size = 0 to be safe.
item_utf8_size = 0
item.data = PyUnicode_AsUTF8AndSize(py_item, &item_utf8_size)
item.size_bytes = item_utf8_size
code = ArrowArrayAppendString(self._ptr, item)
if code != NANOARROW_OK:
Error.raise_error(f"append string item {py_item}", code)
return self
def append_bytes(self, obj: Iterable[Union[str, None]]) -> CArrayBuilder:
cdef Py_buffer buffer
cdef ArrowBufferView item
for py_item in obj:
if py_item is None:
code = ArrowArrayAppendNull(self._ptr, 1)
else:
PyObject_GetBuffer(py_item, &buffer, PyBUF_ANY_CONTIGUOUS | PyBUF_FORMAT)
if buffer.ndim != 1:
raise ValueError("Can't append buffer with dimensions != 1 to binary array")
if buffer.itemsize != 1:
PyBuffer_Release(&buffer)
raise ValueError("Can't append buffer with itemsize != 1 to binary array")
item.data.data = buffer.buf
item.size_bytes = buffer.len
code = ArrowArrayAppendBytes(self._ptr, item)
PyBuffer_Release(&buffer)
if code != NANOARROW_OK:
Error.raise_error(f"append bytes item {py_item}", code)
def set_offset(self, int64_t offset) -> CArrayBuilder:
self.c_array._assert_valid()
self._ptr.offset = offset
return self
def set_length(self, int64_t length) -> CArrayBuilder:
self.c_array._assert_valid()
self._ptr.length = length
return self
def set_null_count(self, int64_t null_count) -> CArrayBuilder:
self.c_array._assert_valid()
self._ptr.null_count = null_count
return self
def resolve_null_count(self) -> CArrayBuilder:
"""Ensure the output null count is synchronized with existing buffers
Note that this will not attempt to access non-CPU buffers such that
:attr:`null_count` might still be -1 after calling this method.
"""
self.c_array._assert_valid()
# This doesn't apply to unions. We currently don't have a schema view
# or array view we can use to query the type ID, so just use the format
# string for now.
format = self.c_array.schema.format
if format.startswith("+us:") or format.startswith("+ud:"):
return self
# Don't overwrite an explicit null count
if self._ptr.null_count != -1:
return self
cdef ArrowBuffer* validity_buffer = ArrowArrayBuffer(self._ptr, 0)
if validity_buffer.size_bytes == 0:
self._ptr.null_count = 0
return self
# Don't attempt to access a non-cpu buffer
if self._device != DEVICE_CPU:
return self
# From _ArrowBytesForBits(), which is not included in nanoarrow_c.pxd
# because it's an internal inline function.
cdef int64_t bits = self._ptr.offset + self._ptr.length
cdef int64_t bytes_required = (bits >> 3) + ((bits & 7) != 0)
if validity_buffer.size_bytes < bytes_required:
raise ValueError(
f"Expected validity bitmap >= {bytes_required} bytes "
f"but got validity bitmap with {validity_buffer.size_bytes} bytes"
)
cdef int64_t count = ArrowBitCountSet(
validity_buffer.data,
self._ptr.offset,
self._ptr.length
)
self._ptr.null_count = self._ptr.length - count
return self
def set_buffer(self, int64_t i, CBuffer buffer, move=False) -> CArrayBuilder:
"""Set an ArrowArray buffer
Sets a buffer of this ArrowArray such the pointer at array->buffers[i] is
equal to buffer->data and such that the buffer's lifcycle is managed by
the array. If move is True, the input Python object that previously wrapped
the ArrowBuffer will be invalidated, which is usually the desired behaviour
if you built or imported a buffer specifically to build this array. If move
is False (the default), this function will a make a shallow copy via another
layer of Python object wrapping.
"""
if i < 0 or i > 3:
raise IndexError("i must be >= 0 and <= 3")
if buffer._device != self._device:
raise ValueError(
f"Builder device ({self._device.device_type}/{self._device.device_id})"
" and buffer device "
f"({buffer._device.device_type}/{buffer._device.device_id})"
" are not identical"
)
self.c_array._assert_valid()
if not move:
buffer = CBuffer.from_pybuffer(buffer)
ArrowBufferMove(buffer._ptr, ArrowArrayBuffer(self._ptr, i))
# The buffer's lifecycle is now owned by the array; however, we need
# array->buffers[i] to be updated such that it equals
# ArrowArrayBuffer(array, i)->data.
self._ptr.buffers[i] = ArrowArrayBuffer(self._ptr, i).data
return self
def set_child(self, int64_t i, CArray c_array, move=False) -> CArrayBuilder:
"""Set an ArrowArray child
Set a child of this array by performing a show copy or optionally
transferring ownership to this object. The initialized child array
must have been initialized before this call by initializing this
builder with a schema containing the correct number of children.
"""
cdef CArray child = self.c_array.child(i)
if child._ptr.release != NULL:
ArrowArrayRelease(child._ptr)
if (
self._device.device_type_id != c_array.device_type_id
or self._device.device_id != c_array.device_id
):
raise ValueError(
f"Builder device ({self._device.device_type}/{self._device.device_id})"
" and child device "
f"({c_array.device_type}/{c_array.device_id}) are not identical"
)
# There is probably a way to avoid a full synchronize for each child
# (e.g., perhaps the ArrayBuilder could allocate a stream to use such
# that an event can be allocated on finish_device() and synchronization
# could be avoided entirely). Including this for now for safety.
cdef CSharedSyncEvent sync = CSharedSyncEvent(
self._device,
<uintptr_t>c_array._sync_event
)
sync.synchronize()
if not move:
c_array_shallow_copy(c_array._base, c_array._ptr, child._ptr)
else:
ArrowArrayMove(c_array._ptr, child._ptr)
# After setting children, we can't use the built-in validation done by
# ArrowArrayFinishBuilding() because it assumes that the private_data of
# each array (recursively) is one that was initialized by ArrowArrayInit()
self._can_validate = False
return self
def finish(self, validation_level=None) -> CArray:
"""Finish building this array
Performs any steps required to return a valid ArrowArray and optionally
validates the output to ensure that the result is valid (given the information
the array has available to it).
Parameters
----------
validation_level : None, "full", "default", "minimal", or "none", optional
Explicitly define a validation level or use None to perform default
validation if possible. Validation may not be possible if children
were set that were not created by nanoarrow.
"""
self.c_array._assert_valid()
cdef ArrowValidationLevel c_validation_level
cdef Error error = Error()
cdef int code
if self._can_validate:
c_validation_level = NANOARROW_VALIDATION_LEVEL_DEFAULT
if validation_level == "full":
c_validation_level = NANOARROW_VALIDATION_LEVEL_FULL
elif validation_level == "minimal":
c_validation_level = NANOARROW_VALIDATION_LEVEL_MINIMAL
elif validation_level == "none":
c_validation_level = NANOARROW_VALIDATION_LEVEL_NONE
code = ArrowArrayFinishBuilding(self._ptr, c_validation_level, &error.c_error)
error.raise_message_not_ok("ArrowArrayFinishBuildingDefault()", code)
elif validation_level not in (None, "none"):
raise NotImplementedError("Validation for array with children is not implemented")
out = self.c_array
self.c_array = CArray.allocate(CSchema.allocate())
self._ptr = self.c_array._ptr
self._can_validate = True
return out
def finish_device(self):
"""Finish building this array and export to an ArrowDeviceArray
Calls :meth:`finish`, propagating device information into an ArrowDeviceArray.
"""
cdef CArray array = self.finish()
cdef ArrowDeviceArray* device_array_ptr
holder = alloc_c_device_array(&device_array_ptr)
cdef int code = ArrowDeviceArrayInit(self._device._ptr, device_array_ptr, array._ptr, NULL)
Error.raise_error_not_ok("ArrowDeviceArrayInit", code)
return CDeviceArray(holder, <uintptr_t>device_array_ptr, array._schema)
cdef class CDeviceArray:
"""Low-level ArrowDeviceArray wrapper
This object is a literal wrapper around an ArrowDeviceArray. It provides field accessors
that return Python objects and handles the structure lifecycle (i.e., initialized
ArrowDeviceArray structures are always released).
See `nanoarrow.device.c_device_array()` for construction and usage examples.
"""
def __cinit__(self, object base, uintptr_t addr, CSchema schema):
self._base = base
self._ptr = <ArrowDeviceArray*>addr
self._schema = schema
@staticmethod
def _init_from_array(Device device, uintptr_t array_addr, CSchema schema):
cdef ArrowArray* array_ptr = <ArrowArray*>array_addr
cdef ArrowDeviceArray* device_array_ptr
cdef void* sync_event = NULL
holder = alloc_c_device_array(&device_array_ptr)
cdef int code = ArrowDeviceArrayInit(device._ptr, device_array_ptr, array_ptr, sync_event)
Error.raise_error_not_ok("ArrowDeviceArrayInit", code)
return CDeviceArray(holder, <uintptr_t>device_array_ptr, schema)
@property
def schema(self) -> CSchema:
return self._schema
@property
def device_type(self) -> DeviceType:
return DeviceType(self._ptr.device_type)
@property
def device_type_id(self) -> int:
return self._ptr.device_type
@property
def device_id(self) -> int:
return self._ptr.device_id
@property
def array(self) -> CArray:
cdef CArray array = CArray(self, <uintptr_t>&self._ptr.array, self._schema)
array._set_device(self._ptr.device_type, self._ptr.device_id, self._ptr.sync_event)
return array
def view(self) -> CArrayView:
return self.array.view()
def __arrow_c_array__(self, requested_schema=None):
return self.array.__arrow_c_array__(requested_schema=requested_schema)
def __arrow_c_device_array__(self, requested_schema=None):
if requested_schema is not None:
raise NotImplementedError("requested_schema")
# TODO: evaluate whether we need to synchronize here or whether we should
# move device arrays instead of shallow-copying them
cdef ArrowDeviceArray* c_array_out
device_array_capsule = alloc_c_device_array(&c_array_out)
c_device_array_shallow_copy(self._base, self._ptr, c_array_out)
return self._schema.__arrow_c_schema__(), device_array_capsule
@staticmethod
def _import_from_c_capsule(schema_capsule, device_array_capsule) -> CDeviceArray:
"""
Import from an ArrowSchema and ArrowArray PyCapsule tuple.
Parameters
----------
schema_capsule : PyCapsule
A valid PyCapsule with name 'arrow_schema' containing an
ArrowSchema pointer.
device_array_capsule : PyCapsule
A valid PyCapsule with name 'arrow_device_array' containing an
ArrowDeviceArray pointer.
"""
cdef:
CSchema out_schema
CDeviceArray out
out_schema = CSchema._import_from_c_capsule(schema_capsule)
out = CDeviceArray(
device_array_capsule,
<uintptr_t>PyCapsule_GetPointer(device_array_capsule, 'arrow_device_array'),
out_schema
)
return out
def __repr__(self) -> str:
return _repr_utils.device_array_repr(self)