blob: 326aefa498513ead4bc2bd5430e98cc81f9d92a4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
_NULL = NA = None
cdef class Scalar:
"""
The base class for all array elements.
"""
cdef class NullType(Scalar):
"""
Singleton for null array elements.
"""
# TODO rename this NullValue?
def __cinit__(self):
global NA
if NA is not None:
raise Exception('Cannot create multiple NAType instances')
self.type = null()
def __repr__(self):
return 'NULL'
def as_py(self):
"""
Return None
"""
return None
_NULL = NA = NullType()
cdef class ArrayValue(Scalar):
"""
The base class for non-null array elements.
"""
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, use array "
"subscription instead."
.format(self.__class__.__name__))
cdef void init(self, DataType type, const shared_ptr[CArray]& sp_array,
int64_t index):
self.type = type
self.index = index
self._set_array(sp_array)
cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
self.sp_array = sp_array
def __repr__(self):
if hasattr(self, 'as_py'):
return repr(self.as_py())
else:
return super(Scalar, self).__repr__()
def __str__(self):
if hasattr(self, 'as_py'):
return str(self.as_py())
else:
return super(Scalar, self).__str__()
def __eq__(self, other):
if hasattr(self, 'as_py'):
if isinstance(other, ArrayValue):
other = other.as_py()
return self.as_py() == other
else:
raise NotImplementedError(
"Cannot compare Arrow values that don't support as_py()")
def __hash__(self):
return hash(self.as_py())
cdef class BooleanValue(ArrayValue):
"""
Concrete class for boolean array elements.
"""
def as_py(self):
"""
Return this value as a Python bool.
"""
cdef CBooleanArray* ap = <CBooleanArray*> self.sp_array.get()
return ap.Value(self.index)
cdef class Int8Value(ArrayValue):
"""
Concrete class for int8 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt8Array* ap = <CInt8Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class UInt8Value(ArrayValue):
"""
Concrete class for uint8 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt8Array* ap = <CUInt8Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class Int16Value(ArrayValue):
"""
Concrete class for int16 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt16Array* ap = <CInt16Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class UInt16Value(ArrayValue):
"""
Concrete class for uint16 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt16Array* ap = <CUInt16Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class Int32Value(ArrayValue):
"""
Concrete class for int32 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt32Array* ap = <CInt32Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class UInt32Value(ArrayValue):
"""
Concrete class for uint32 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt32Array* ap = <CUInt32Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class Int64Value(ArrayValue):
"""
Concrete class for int64 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt64Array* ap = <CInt64Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class UInt64Value(ArrayValue):
"""
Concrete class for uint64 array elements.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt64Array* ap = <CUInt64Array*> self.sp_array.get()
return ap.Value(self.index)
cdef class Date32Value(ArrayValue):
"""
Concrete class for date32 array elements.
"""
def as_py(self):
"""
Return this value as a Python datetime.datetime instance.
"""
cdef CDate32Array* ap = <CDate32Array*> self.sp_array.get()
# Shift to seconds since epoch
return datetime.datetime.utcfromtimestamp(
int(ap.Value(self.index)) * 86400).date()
cdef class Date64Value(ArrayValue):
"""
Concrete class for date64 array elements.
"""
def as_py(self):
"""
Return this value as a Python datetime.datetime instance.
"""
cdef CDate64Array* ap = <CDate64Array*> self.sp_array.get()
return datetime.datetime.utcfromtimestamp(
ap.Value(self.index) / 1000).date()
cdef class Time32Value(ArrayValue):
"""
Concrete class for time32 array elements.
"""
def as_py(self):
"""
Return this value as a Python datetime.timedelta instance.
"""
cdef:
CTime32Array* ap = <CTime32Array*> self.sp_array.get()
CTime32Type* dtype = <CTime32Type*> ap.type().get()
if dtype.unit() == TimeUnit_SECOND:
delta = datetime.timedelta(seconds=ap.Value(self.index))
return (datetime.datetime(1970, 1, 1) + delta).time()
else:
return _box_time_milli(ap.Value(self.index))
cdef class Time64Value(ArrayValue):
"""
Concrete class for time64 array elements.
"""
def as_py(self):
"""
Return this value as a Python datetime.timedelta instance.
"""
cdef:
CTime64Array* ap = <CTime64Array*> self.sp_array.get()
CTime64Type* dtype = <CTime64Type*> ap.type().get()
cdef int64_t val = ap.Value(self.index)
if dtype.unit() == TimeUnit_MICRO:
return _box_time_micro(val)
else:
return (datetime.datetime(1970, 1, 1) +
datetime.timedelta(microseconds=val / 1000)).time()
cpdef _box_time_milli(int64_t val):
delta = datetime.timedelta(milliseconds=val)
return (datetime.datetime(1970, 1, 1) + delta).time()
cpdef _box_time_micro(int64_t val):
return (datetime.datetime(1970, 1, 1) +
datetime.timedelta(microseconds=val)).time()
cdef dict _DATETIME_CONVERSION_FUNCTIONS = {}
cdef c_bool _datetime_conversion_initialized = False
def _datetime_conversion_functions():
global _datetime_conversion_initialized
if _datetime_conversion_initialized:
return _DATETIME_CONVERSION_FUNCTIONS
try:
import pandas as pd
except ImportError:
_DATETIME_CONVERSION_FUNCTIONS.update({
TimeUnit_SECOND: lambda x, tzinfo: (
datetime.datetime.utcfromtimestamp(x).replace(tzinfo=tzinfo)
),
TimeUnit_MILLI: lambda x, tzinfo: (
(datetime.datetime.utcfromtimestamp(x / 1e3)
.replace(tzinfo=tzinfo))
),
TimeUnit_MICRO: lambda x, tzinfo: (
(datetime.datetime.utcfromtimestamp(x / 1e6)
.replace(tzinfo=tzinfo))
),
})
else:
_DATETIME_CONVERSION_FUNCTIONS.update({
TimeUnit_SECOND: lambda x, tzinfo: pd.Timestamp(
x * 1000000000, tz=tzinfo, unit='ns',
),
TimeUnit_MILLI: lambda x, tzinfo: pd.Timestamp(
x * 1000000, tz=tzinfo, unit='ns',
),
TimeUnit_MICRO: lambda x, tzinfo: pd.Timestamp(
x * 1000, tz=tzinfo, unit='ns',
),
TimeUnit_NANO: lambda x, tzinfo: pd.Timestamp(
x, tz=tzinfo, unit='ns',
)
})
_datetime_conversion_initialized = True
return _DATETIME_CONVERSION_FUNCTIONS
cdef class TimestampValue(ArrayValue):
"""
Concrete class for timestamp array elements.
"""
@property
def value(self):
cdef CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
cdef CTimestampType* dtype = <CTimestampType*> ap.type().get()
return ap.Value(self.index)
def as_py(self):
"""
Return this value as a Pandas Timestamp instance (if available),
otherwise as a Python datetime.timedelta instance.
"""
cdef CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
cdef CTimestampType* dtype = <CTimestampType*> ap.type().get()
value = self.value
if not dtype.timezone().empty():
tzinfo = string_to_tzinfo(frombytes(dtype.timezone()))
else:
tzinfo = None
try:
converter = _datetime_conversion_functions()[dtype.unit()]
except KeyError:
raise ValueError(
'Cannot convert nanosecond timestamps without pandas'
)
return converter(value, tzinfo=tzinfo)
cdef class HalfFloatValue(ArrayValue):
"""
Concrete class for float16 array elements.
"""
def as_py(self):
"""
Return this value as a Python float.
"""
cdef CHalfFloatArray* ap = <CHalfFloatArray*> self.sp_array.get()
return PyHalf_FromHalf(ap.Value(self.index))
cdef class FloatValue(ArrayValue):
"""
Concrete class for float32 array elements.
"""
def as_py(self):
"""
Return this value as a Python float.
"""
cdef CFloatArray* ap = <CFloatArray*> self.sp_array.get()
return ap.Value(self.index)
cdef class DoubleValue(ArrayValue):
"""
Concrete class for float64 array elements.
"""
def as_py(self):
"""
Return this value as a Python float.
"""
cdef CDoubleArray* ap = <CDoubleArray*> self.sp_array.get()
return ap.Value(self.index)
cdef class DecimalValue(ArrayValue):
"""
Concrete class for decimal128 array elements.
"""
def as_py(self):
"""
Return this value as a Python Decimal.
"""
cdef:
CDecimal128Array* ap = <CDecimal128Array*> self.sp_array.get()
c_string s = ap.FormatValue(self.index)
return _pydecimal.Decimal(s.decode('utf8'))
cdef class StringValue(ArrayValue):
"""
Concrete class for string (utf8) array elements.
"""
def as_py(self):
"""
Return this value as a Python unicode string.
"""
cdef CStringArray* ap = <CStringArray*> self.sp_array.get()
return ap.GetString(self.index).decode('utf-8')
cdef class BinaryValue(ArrayValue):
"""
Concrete class for variable-sized binary array elements.
"""
def as_py(self):
"""
Return this value as a Python bytes object.
"""
cdef:
const uint8_t* ptr
int32_t length
CBinaryArray* ap = <CBinaryArray*> self.sp_array.get()
ptr = ap.GetValue(self.index, &length)
return cp.PyBytes_FromStringAndSize(<const char*>(ptr), length)
def as_buffer(self):
"""
Return a view over this value as a Buffer object.
"""
cdef:
CBinaryArray* ap = <CBinaryArray*> self.sp_array.get()
shared_ptr[CBuffer] buf
buf = SliceBuffer(ap.value_data(), ap.value_offset(self.index),
ap.value_length(self.index))
return pyarrow_wrap_buffer(buf)
cdef class ListValue(ArrayValue):
"""
Concrete class for list array elements.
"""
def __len__(self):
"""
Return the number of values.
"""
return self.length()
def __getitem__(self, i):
"""
Return the value at the given index.
"""
return self.getitem(_normalize_index(i, self.length()))
def __iter__(self):
"""
Iterate over this element's values.
"""
for i in range(len(self)):
yield self.getitem(i)
raise StopIteration
cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
self.sp_array = sp_array
self.ap = <CListArray*> sp_array.get()
self.value_type = pyarrow_wrap_data_type(self.ap.value_type())
cdef getitem(self, int64_t i):
cdef int64_t j = self.ap.value_offset(self.index) + i
return box_scalar(self.value_type, self.ap.values(), j)
cdef int64_t length(self):
return self.ap.value_length(self.index)
def as_py(self):
"""
Return this value as a Python list.
"""
cdef:
int64_t j
list result = []
for j in range(len(self)):
result.append(self.getitem(j).as_py())
return result
cdef class UnionValue(ArrayValue):
"""
Concrete class for union array elements.
"""
cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
self.sp_array = sp_array
self.ap = <CUnionArray*> sp_array.get()
cdef getitem(self, int64_t i):
cdef int8_t type_id = self.ap.raw_type_ids()[i]
cdef shared_ptr[CArray] child = self.ap.child(type_id)
if self.ap.mode() == _UnionMode_SPARSE:
return box_scalar(self.type[type_id].type, child, i)
else:
return box_scalar(self.type[type_id].type, child,
self.ap.value_offset(i))
def as_py(self):
"""
Return this value as a Python object.
The exact type depends on the underlying union member.
"""
return self.getitem(self.index).as_py()
cdef class FixedSizeBinaryValue(ArrayValue):
"""
Concrete class for fixed-size binary array elements.
"""
def as_py(self):
"""
Return this value as a Python bytes object.
"""
cdef:
CFixedSizeBinaryArray* ap
CFixedSizeBinaryType* ap_type
int32_t length
const char* data
ap = <CFixedSizeBinaryArray*> self.sp_array.get()
ap_type = <CFixedSizeBinaryType*> ap.type().get()
length = ap_type.byte_width()
data = <const char*> ap.GetValue(self.index)
return cp.PyBytes_FromStringAndSize(data, length)
cdef class StructValue(ArrayValue):
"""
Concrete class for struct array elements.
"""
cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
self.sp_array = sp_array
self.ap = <CStructArray*> sp_array.get()
def __getitem__(self, key):
"""
Return the child value for the given field name.
"""
cdef:
CStructType* type
int index
type = <CStructType*> self.type.type
index = type.GetFieldIndex(tobytes(key))
if index < 0:
raise KeyError(key)
return pyarrow_wrap_array(self.ap.field(index))[self.index]
def as_py(self):
"""
Return this value as a Python dict.
"""
cdef:
vector[shared_ptr[CField]] child_fields = self.type.type.children()
wrapped_arrays = [pyarrow_wrap_array(self.ap.field(i))
for i in range(self.ap.num_fields())]
child_names = [child.get().name() for child in child_fields]
# Return the struct as a dict
return {
frombytes(name): child_array[self.index].as_py()
for name, child_array in
zip(child_names, wrapped_arrays)
}
cdef class DictionaryValue(ArrayValue):
"""
Concrete class for dictionary-encoded array elements.
"""
def as_py(self):
"""
Return this value as a Python object.
The exact type depends on the dictionary value type.
"""
return self.dictionary_value.as_py()
@property
def index_value(self):
"""
Return this value's underlying index as a ArrayValue of the right
signed integer type.
"""
cdef CDictionaryArray* darr = <CDictionaryArray*>(self.sp_array.get())
indices = pyarrow_wrap_array(darr.indices())
return indices[self.index]
@property
def dictionary_value(self):
"""
Return this value's underlying dictionary value as a ArrayValue.
"""
cdef CDictionaryArray* darr = <CDictionaryArray*>(self.sp_array.get())
dictionary = pyarrow_wrap_array(darr.dictionary())
return dictionary[self.index_value.as_py()]
cdef dict _array_value_classes = {
_Type_BOOL: BooleanValue,
_Type_UINT8: UInt8Value,
_Type_UINT16: UInt16Value,
_Type_UINT32: UInt32Value,
_Type_UINT64: UInt64Value,
_Type_INT8: Int8Value,
_Type_INT16: Int16Value,
_Type_INT32: Int32Value,
_Type_INT64: Int64Value,
_Type_DATE32: Date32Value,
_Type_DATE64: Date64Value,
_Type_TIME32: Time32Value,
_Type_TIME64: Time64Value,
_Type_TIMESTAMP: TimestampValue,
_Type_HALF_FLOAT: HalfFloatValue,
_Type_FLOAT: FloatValue,
_Type_DOUBLE: DoubleValue,
_Type_LIST: ListValue,
_Type_UNION: UnionValue,
_Type_BINARY: BinaryValue,
_Type_STRING: StringValue,
_Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
_Type_DECIMAL: DecimalValue,
_Type_STRUCT: StructValue,
_Type_DICTIONARY: DictionaryValue,
}
cdef class ScalarValue(Scalar):
"""
The base class for scalars.
"""
def __init__(self):
raise TypeError("Do not call {}'s constructor directly."
.format(self.__class__.__name__))
cdef void init(self, const shared_ptr[CScalar]& sp_scalar):
self.sp_scalar = sp_scalar
def __repr__(self):
if hasattr(self, 'as_py'):
return repr(self.as_py())
else:
return super(Scalar, self).__repr__()
def __str__(self):
if hasattr(self, 'as_py'):
return str(self.as_py())
else:
return super(Scalar, self).__str__()
def __eq__(self, other):
if hasattr(self, 'as_py'):
if isinstance(other, ScalarValue):
other = other.as_py()
return self.as_py() == other
else:
raise NotImplemented(
"Cannot compare scalars that don't support as_py()")
def __hash__(self):
return hash(self.as_py())
cdef class UInt8Scalar(ScalarValue):
"""
Concrete class for uint8 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt8Scalar* sp = <CUInt8Scalar*> self.sp_scalar.get()
return sp.value
cdef class Int8Scalar(ScalarValue):
"""
Concrete class for int8 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt8Scalar* sp = <CInt8Scalar*> self.sp_scalar.get()
return sp.value
cdef class UInt16Scalar(ScalarValue):
"""
Concrete class for uint16 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt16Scalar* sp = <CUInt16Scalar*> self.sp_scalar.get()
return sp.value
cdef class Int16Scalar(ScalarValue):
"""
Concrete class for int16 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt16Scalar* sp = <CInt16Scalar*> self.sp_scalar.get()
return sp.value
cdef class UInt32Scalar(ScalarValue):
"""
Concrete class for uint32 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt32Scalar* sp = <CUInt32Scalar*> self.sp_scalar.get()
return sp.value
cdef class Int32Scalar(ScalarValue):
"""
Concrete class for int32 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt32Scalar* sp = <CInt32Scalar*> self.sp_scalar.get()
return sp.value
cdef class UInt64Scalar(ScalarValue):
"""
Concrete class for uint64 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CUInt64Scalar* sp = <CUInt64Scalar*> self.sp_scalar.get()
return sp.value
cdef class Int64Scalar(ScalarValue):
"""
Concrete class for int64 scalars.
"""
def as_py(self):
"""
Return this value as a Python int.
"""
cdef CInt64Scalar* sp = <CInt64Scalar*> self.sp_scalar.get()
return sp.value
cdef class FloatScalar(ScalarValue):
"""
Concrete class for float scalars.
"""
def as_py(self):
"""
Return this value as a Python float.
"""
cdef CFloatScalar* sp = <CFloatScalar*> self.sp_scalar.get()
return sp.value
cdef class DoubleScalar(ScalarValue):
"""
Concrete class for double scalars.
"""
def as_py(self):
"""
Return this value as a Python float.
"""
cdef CDoubleScalar* sp = <CDoubleScalar*> self.sp_scalar.get()
return sp.value
cdef dict _scalar_classes = {
_Type_UINT8: UInt8Scalar,
_Type_UINT16: UInt16Scalar,
_Type_UINT32: UInt32Scalar,
_Type_UINT64: UInt64Scalar,
_Type_INT8: Int8Scalar,
_Type_INT16: Int16Scalar,
_Type_INT32: Int32Scalar,
_Type_INT64: Int64Scalar,
_Type_FLOAT: FloatScalar,
_Type_DOUBLE: DoubleScalar,
}
cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array,
int64_t index):
cdef ArrayValue value
if type.type.id() == _Type_NA:
return _NULL
elif sp_array.get().IsNull(index):
return _NULL
else:
klass = _array_value_classes[type.type.id()]
value = klass.__new__(klass)
value.init(type, sp_array, index)
return value