blob: 4550c5b56cce31dbf5a445192c3e208f71336fdd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from libcpp.memory cimport make_shared
from libc.stdint cimport *
from cython.operator cimport dereference as deref
from datetime import timedelta
from pyfory.includes.libformat cimport (
CGetter, CArrayData, CMapData, CRow, CTypeId,
CSchema, CListType, CMapType, fory_schema
)
from pyfory.buffer cimport Buffer
from pyfory.includes.libutil cimport CBuffer
from libcpp.memory cimport shared_ptr
from libcpp.vector cimport vector
from datetime import datetime, date
from libc.stdint cimport *
from libcpp cimport bool as c_bool
from cpython cimport *
cdef dict reader_map = {}
cdef class Getter:
cdef:
CGetter* getter
cdef inline c_bool is_null_at(self, int i):
return self.getter.is_null_at(i)
cpdef get_boolean(self, int i):
if self.is_null_at(i):
return None
return self.getter.get_boolean(i)
cpdef get_int8(self, int i):
if self.is_null_at(i):
return None
return self.getter.get_int8(i)
cpdef get_int16(self, int i):
if self.is_null_at(i):
return None
return self.getter.get_int16(i)
cpdef get_int32(self, int i):
if self.is_null_at(i):
return None
return self.getter.get_int32(i)
cpdef get_int64(self, int i):
if self.is_null_at(i):
return None
return self.getter.get_int64(i)
cpdef get_float(self, int i):
if self.is_null_at(i):
return None
return self.getter.get_float(i)
cpdef get_double(self, int i):
if self.is_null_at(i):
return None
return self.getter.get_double(i)
cpdef get_date(self, int i):
if self.is_null_at(i):
return None
cdef int32_t days = self.getter.get_int32(i)
return date(1970, 1, 1) + timedelta(days=days)
cpdef get_datetime(self, int i):
if self.is_null_at(i):
return None
cdef int64_t timestamp = self.getter.get_int64(i)
# TimestampType represent micro seconds
return datetime.fromtimestamp(float(timestamp) / 1000000)
cpdef get_binary(self, int i):
if self.is_null_at(i):
return None
cdef unsigned char* binary_data
cdef int32_t size = self.getter.get_binary(i, &binary_data)
return binary_data[:size]
cpdef get_str(self, int i):
if self.is_null_at(i):
return None
cdef unsigned char* binary_data
cdef int32_t size = self.getter.get_binary(i, &binary_data)
return binary_data[:size].decode("UTF-8")
cpdef RowData get_struct(self, int i):
pass
cpdef ArrayData get_array_data(self, int i):
pass
cpdef MapData get_map_data(self, int i):
pass
cdef class ArrayData(Getter):
cdef:
ListType type_
shared_ptr[CArrayData] data
def __init__(self):
raise TypeError("Do not call constructor directly, use "
"factory function instead.")
@staticmethod
cdef ArrayData wrap(shared_ptr[CArrayData] data, ListType array_type):
cdef ArrayData array_data = ArrayData.__new__(ArrayData)
array_data.getter = data.get()
array_data.data = data
array_data.type_ = array_type
return array_data
@property
def num_elements(self) -> int:
return self.data.get().num_elements()
def buffer(self) -> Buffer:
return Buffer.wrap(self.data.get().buffer())
def base_offset(self) -> int:
return self.data.get().base_offset()
def size_bytes(self) -> int:
return self.data.get().size_bytes()
cpdef RowData get_struct(self, int i):
cdef DataType data_type = self.type_.value_type
if self.is_null_at(i):
return None
# Create schema from struct type fields
cdef Schema struct_schema = _struct_type_to_schema(data_type)
return RowData.wrap(self.data.get().get_struct(i), struct_schema)
cpdef ArrayData get_array_data(self, int i):
cdef DataType data_type = self.type_.value_type
if self.is_null_at(i):
return None
# data_type should be a ListType
cdef ListType list_type = <ListType>data_type
return ArrayData.wrap(self.data.get().get_array(i), list_type)
cpdef MapData get_map_data(self, int i):
cdef DataType data_type = self.type_.value_type
if self.is_null_at(i):
return None
cdef shared_ptr[CMapData] v = self.data.get().get_map(i)
cdef MapType map_type = <MapType>data_type
return MapData.wrap(v, map_type)
def __getitem__(self, i):
if i > self.num_elements or i < 0:
raise IndexError("length is {}, but index is {}"
.format(self.num_elements, i))
return self.get(i)
def get(self, int i):
cdef DataType value_type = self.type_.value_type
key = id(value_type)
reader = reader_map.get(key)
if reader is None:
reader = get_reader(value_type, type(self))
reader_map[key] = reader
if self.is_null_at(i):
return None
else:
# cdef methods don't bind self.
return reader(self, i)
def __dealloc__(self):
reader_map.pop(id(self.type_.value_type), None)
def __str__(self) -> str:
cdef:
int length = self.num_elements
int i
str result = "["
cdef DataType value_type = self.type_.value_type
getter = get_reader(value_type, type(self))
for i in range(length):
if i != 0:
result += ','
if self.is_null_at(i):
result += "null"
else:
result += str(getter(self, i))
result += ']'
return result
cdef class MapData:
cdef:
shared_ptr[CMapData] data
MapType map_type
def __init__(self):
raise TypeError("Do not call constructor directly, use "
"factory function instead.")
@staticmethod
cdef MapData wrap(shared_ptr[CMapData] data, MapType map_type):
cdef MapData map_data = MapData.__new__(MapData)
map_data.data = data
map_data.map_type = map_type
return map_data
@property
def num_elements(self) -> int:
return self.data.get().num_elements()
def buffer(self) -> Buffer:
return Buffer.wrap(self.data.get().buffer())
def base_offset(self) -> int:
return self.data.get().base_offset()
def size_bytes(self) -> int:
return self.data.get().size_bytes()
def keys_array(self):
cdef ListType array_type = list_(self.map_type.key_type)
return ArrayData.wrap(self.data.get().keys_array(), array_type)
def values_array(self):
cdef ListType array_type = list_(self.map_type.item_type)
return ArrayData.wrap(self.data.get().values_array(), array_type)
cdef keys_array_(self, ListType array_type):
return ArrayData.wrap(self.data.get().keys_array(), array_type)
cdef values_array_(self, ListType array_type):
return ArrayData.wrap(self.data.get().values_array(), array_type)
def __str__(self):
return 'Map{' + str(self.keys_array()) + ', ' + str(self.values_array()) + '}'
cdef Schema _struct_type_to_schema(DataType struct_type):
"""Convert a struct type to a schema."""
cdef vector[shared_ptr[CField]] c_fields
cdef int num = struct_type.num_fields
for i in range(num):
c_fields.push_back((<Field>struct_type.field(i)).c_field)
cdef Schema schema = Schema.__new__(Schema)
schema.c_schema = fory_schema(c_fields)
return schema
cdef class RowData(Getter):
cdef:
shared_ptr[CRow] data
Schema schema_
Buffer _buf # hold buffer reference
def __init__(self, schema, buffer, offset=0, size_in_bytes=None):
if size_in_bytes is None:
size_in_bytes = len(buffer)
if type(buffer) is not Buffer:
buffer = Buffer(buffer, offset=offset, length=size_in_bytes)
self._buf = buffer
cdef:
Buffer buf = <Buffer>buffer
shared_ptr[CRow] row = make_shared[CRow]((<Schema>schema).c_schema)
shared_ptr[CBuffer] shared_buf = make_shared[CBuffer](
buf.c_buffer.data(),
buf.c_buffer.size(),
False,
)
deref(row).point_to(shared_buf, offset, size_in_bytes)
self.data = row
self.getter = row.get()
self.schema_ = schema
@staticmethod
cdef RowData wrap(shared_ptr[CRow] data, Schema schema):
cdef RowData row_data = RowData.__new__(RowData)
row_data.data = data
row_data.getter = data.get()
row_data.schema_ = schema
return row_data
@property
def schema(self):
return self.schema_
@property
def num_fields(self) -> int:
return self.data.get().num_fields()
def buffer(self) -> Buffer:
return Buffer.wrap(self.data.get().buffer())
cpdef base_offset(self):
return self.data.get().base_offset()
cpdef size_bytes(self):
return self.data.get().size_bytes()
def to_bytes(self) -> bytes:
end_offset = self.base_offset() + self.size_bytes()
return self.buffer().to_bytes()[self.base_offset():end_offset]
cpdef RowData get_struct(self, int i):
if self.is_null_at(i):
return None
cdef DataType data_type = self.schema_.field(i).type
cdef Schema struct_schema = _struct_type_to_schema(data_type)
return RowData.wrap(self.data.get().get_struct(i), struct_schema)
cpdef ArrayData get_array_data(self, int i):
if self.is_null_at(i):
return None
cdef DataType data_type = self.schema_.field(i).type
cdef ListType list_type = <ListType>data_type
return ArrayData.wrap(self.data.get().get_array(i), list_type)
cpdef MapData get_map_data(self, int i):
if self.is_null_at(i):
return None
cdef DataType data_type = self.schema_.field(i).type
cdef shared_ptr[CMapData] v = self.data.get().get_map(i)
cdef MapType map_type = <MapType>data_type
return MapData.wrap(v, map_type)
def __getitem__(self, i):
if not isinstance(i, int):
assert type(i) is str
i = self.schema_.names.index(i)
if i > self.num_fields or i < 0:
raise IndexError("num_fields is {}, but index is {}"
.format(self.num_fields, i))
return self.get(i)
def __getattr__(self, item):
return self.__getitem__(item)
def get(self, i):
key = id(self.schema_)
readers = reader_map.get(key)
if readers is None:
readers = []
for field_index in range(self.schema_.num_fields):
readers.append(get_reader(
self.schema_.field(field_index).type, type(self)))
reader_map[key] = readers
if self.is_null_at(i):
return None
else:
return readers[i](self, i)
def __dealloc__(self):
reader_map.pop(id(self.schema_), None)
def __str__(self) -> str:
cdef:
Field field_
int num_fields = self.schema_.num_fields
int i
str result = "{"
for i in range(num_fields):
if i != 0:
result += ','
field_ = self.schema_.field(i)
getter = get_reader(field_.type, type(self))
result += field_.name
result += '='
if self.is_null_at(i):
result += "null"
else:
result += str(getter(self, i))
result += "}"
return result
def assert_type(i, data_type, type_cls):
if not isinstance(data_type, type_cls):
raise TypeError("type for {0} is {1}, isn't {2}".
format(i, data_type, type_cls))
def get_reader(data_type, type_):
"""get the appropriate reader method based on the Fory data type."""
cdef CTypeId type_id = (<DataType>data_type).id
if type_id == CTypeId.BOOL:
return type_.get_boolean
elif type_id == CTypeId.INT8:
return type_.get_int8
elif type_id == CTypeId.INT16:
return type_.get_int16
elif type_id == CTypeId.INT32:
return type_.get_int32
elif type_id == CTypeId.INT64:
return type_.get_int64
elif type_id == CTypeId.FLOAT32:
return type_.get_float
elif type_id == CTypeId.FLOAT64:
return type_.get_double
elif type_id == CTypeId.DATE:
return type_.get_date
elif type_id == CTypeId.TIMESTAMP:
return type_.get_datetime
elif type_id == CTypeId.BINARY:
return type_.get_binary
elif type_id == CTypeId.STRING:
return type_.get_str
elif type_id == CTypeId.STRUCT:
return type_.get_struct
elif type_id == CTypeId.LIST:
return type_.get_array_data
elif type_id == CTypeId.MAP:
return type_.get_map_data
raise TypeError("Unsupported type: " + str(data_type))