| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from libcpp.memory cimport shared_ptr, dynamic_pointer_cast |
| from datetime import datetime, date |
| from libc.stdint cimport * |
| from libcpp cimport bool as c_bool |
| import cython |
| import pyarrow as pa |
| from cpython cimport * |
| from pyfury.includes.libformat cimport CWriter, CRowWriter, CArrayWriter, CBuffer |
| from pyfury.includes.libutil cimport AllocateBuffer |
| from pyarrow.lib cimport Schema, DataType, ListType, MapType, Field |
| from pyarrow.lib cimport CSchema, CDataType, CListType |
| from pyarrow import types |
| |
| cimport pyfury.includes.libformat as libformat |
| cimport pyarrow.lib as libpa |
| |
| |
| def create_row_encoder(Schema schema): |
| return RowEncoder.create(schema) |
| |
| |
| cdef class Encoder: |
| cdef: |
| CWriter* writer |
| |
| cdef write(self, int i, value): |
| pass |
| cdef read(self, Getter data, int i): |
| pass |
| |
| |
| cdef class RowEncoder(Encoder): |
| cdef: |
| readonly Schema schema |
| int initial_buffer_size |
| shared_ptr[CSchema] sp_schema |
| CSchema* c_schema |
| CWriter* parent_writer |
| CRowWriter* row_writer |
| list encoders |
| c_bool is_root |
| |
| def __init__(self): |
| raise TypeError("Do not call constructor directly, use " |
| "factory function instead.") |
| |
| @staticmethod |
| cdef create(Schema schema, CWriter* parent_writer=NULL, initial_buffer_size=16): |
| cdef RowEncoder encoder = RowEncoder.__new__(RowEncoder) |
| encoder.schema = schema |
| encoder.sp_schema = schema.sp_schema |
| encoder.c_schema = schema.schema |
| encoder.initial_buffer_size = initial_buffer_size |
| encoder.parent_writer = parent_writer |
| |
| if parent_writer == NULL: |
| encoder.row_writer = new CRowWriter(encoder.sp_schema) |
| else: |
| encoder.row_writer = new CRowWriter(encoder.sp_schema, parent_writer) |
| |
| encoder.encoders = [] |
| cdef: |
| Field field |
| for i in range(len(schema)): |
| field = schema.field(i) |
| encoder.encoders.append(create_converter(field, encoder.row_writer)) |
| return encoder |
| |
| # Special methods of extension types must be declared with def, not cdef. |
| def __dealloc__(self): |
| del self.row_writer |
| |
| cpdef RowData to_row(self, value): |
| if value is None: |
| raise ValueError("value shouldn't be None") |
| |
| cdef shared_ptr[CBuffer] buf |
| if not AllocateBuffer(self.initial_buffer_size, &buf): |
| raise MemoryError("out of memory") |
| self.row_writer.SetBuffer(buf) |
| self.row_writer.Reset() |
| return self.write_row(value) |
| |
| cpdef from_row(self, RowData row): |
| return self.decode(row) |
| |
| cdef RowData write_row(self, value): |
| cdef: |
| Field field |
| int num_fields = len(self.schema) |
| int i |
| # we don't use __dict__, because if user implements __getattr__/__setattr__ |
| # or use descriptor, the key in __dict__ may be not same as in schema's |
| # field name, and __slot__ also needs extra check. |
| # We don't support Mapping subclass, because isinstance cost too much time. |
| if type(value) is not dict: |
| for i in range(num_fields): |
| field = self.schema.field(i) |
| field_value = getattr(value, field.name, None) |
| if field_value is None: |
| self.row_writer.SetNullAt(i) |
| else: |
| self.row_writer.SetNotNullAt(i) |
| (<Encoder>self.encoders[i]).write(i, field_value) |
| else: |
| for i in range(num_fields): |
| field = self.schema.field(i) |
| field_value = value.get(field.name) |
| if field_value is None: |
| self.row_writer.SetNullAt(i) |
| else: |
| self.row_writer.SetNotNullAt(i) |
| (<Encoder>self.encoders[i]).write(i, field_value) |
| cdef shared_ptr[libformat.CRow] row = self.row_writer.ToRow() |
| return RowData.wrap(row, self.schema) |
| |
| cdef decode(self, RowData row): |
| cdef: |
| int num_fields = len(self.schema) |
| int i |
| from pyfury.format import get_cls_by_schema |
| cls = get_cls_by_schema(self.schema) |
| obj = cls.__new__(cls) |
| for i in range(num_fields): |
| field = self.schema.field(i) |
| field_name = field.name |
| if not row.is_null_at(i): |
| setattr(obj, field_name, (<Encoder>self.encoders[i]).read(row, i)) |
| else: |
| setattr(obj, field_name, None) |
| return obj |
| |
| cdef write(self, int i, value): |
| cdef int offset = self.parent_writer.cursor() |
| self.row_writer.Reset() |
| self.write_row(value) |
| cdef int size = self.parent_writer.cursor() - offset |
| self.parent_writer.SetOffsetAndSize(i, offset, size) |
| |
| cdef read(self, Getter data, int i): |
| struct_data = data.get_struct(i) |
| if struct_data is not None: |
| return self.decode(struct_data) |
| else: |
| return None |
| |
| |
| cdef class ArrayWriter(Encoder): |
| cdef: |
| ListType list_type |
| CWriter* parent_writer |
| CArrayWriter* array_writer |
| object elem_encoder |
| |
| def __init__(self): |
| raise TypeError("Do not call ArrayWriter's constructor directly, use " |
| "factory function instead.") |
| |
| # All constructor arguments will be passed as Python objects, |
| # This implies that non-convertible C types such as pointers or |
| # C++ objects cannot be passed into the constructor from Cython code. |
| # use a factory function instead. |
| # special_methods#initialisation-methods-cinit-and-init |
| # extension_types#existing-pointers-instantiation |
| @staticmethod |
| cdef ArrayWriter create(ListType list_type, CWriter* parent_writer): |
| cdef: |
| ArrayWriter encoder = ArrayWriter.__new__(ArrayWriter) |
| shared_ptr[CDataType] c_type = libpa.pyarrow_unwrap_data_type(list_type) |
| cdef: |
| shared_ptr[CListType] c_list_type = \ |
| dynamic_pointer_cast[CListType, CDataType](c_type) |
| |
| encoder.parent_writer = parent_writer |
| encoder.list_type = list_type |
| libpa.pyarrow_unwrap_array(list_type) |
| encoder.array_writer = new CArrayWriter( |
| c_list_type, parent_writer) |
| encoder.elem_encoder =\ |
| create_converter(list_type.value_field, encoder.array_writer) |
| return encoder |
| |
| def __dealloc__(self): |
| del self.array_writer |
| |
| cdef void write_array(self, value): |
| """If value don't have __iter__/__len__, raise TypeError""" |
| if value is None: |
| raise ValueError("value shouldn't be None") |
| # only support max to 32-bit int, so we don't use Py_ssize_t, |
| # use int and let cython check overflow instead. |
| cdef: |
| int length = len(value) |
| int i |
| self.array_writer.Reset(length) |
| it = iter(value) |
| for i in range(length): |
| elem = next(it) |
| if elem is None: |
| self.array_writer.SetNullAt(i) |
| else: |
| self.array_writer.SetNotNullAt(i) |
| (<Encoder>self.elem_encoder).write(i, elem) |
| |
| cdef decode(self, ArrayData array_data): |
| cdef: |
| int num_elements = array_data.data.get().num_elements() |
| int i |
| arr = [] |
| for i in range(num_elements): |
| if not array_data.is_null_at(i): |
| arr.append((<Encoder>self.elem_encoder).read(array_data, i)) |
| else: |
| arr.append(None) |
| return arr |
| |
| cdef write(self, int i, value): |
| cdef int offset = self.parent_writer.cursor() |
| self.write_array(value) |
| cdef int size = self.parent_writer.cursor() - offset |
| self.parent_writer.SetOffsetAndSize(i, offset, size) |
| |
| cdef read(self, Getter data, int i): |
| array_data = data.get_array_data(i) |
| if array_data is not None: |
| return self.decode(array_data) |
| else: |
| return None |
| |
| cdef class MapWriter(Encoder): |
| cdef: |
| MapType map_type |
| CWriter* parent_writer |
| ArrayWriter keys_encoder |
| ArrayWriter values_encoder |
| |
| def __init__(self): |
| raise TypeError("Do not call MapWriter's constructor directly, use " |
| "factory function instead.") |
| |
| @staticmethod |
| cdef MapWriter create(MapType map_type, CWriter* parent_writer): |
| cdef MapWriter encoder = MapWriter.__new__(MapWriter) |
| encoder.map_type = map_type |
| encoder.parent_writer = parent_writer |
| encoder.keys_encoder = ArrayWriter.create( |
| pa.list_(map_type.key_type), parent_writer) |
| encoder.values_encoder = ArrayWriter.create( |
| pa.list_(map_type.item_type), parent_writer) |
| return encoder |
| |
| cdef void write_map(self, value): |
| """if value has keys/values methods, we take it as a dict, |
| else raise TypeError""" |
| if value is None: |
| raise ValueError("value shouldn't be None") |
| cdef int offset = self.parent_writer.cursor() |
| self.parent_writer.WriteDirectly(-1) # increase cursor by 8 |
| self.keys_encoder.write_array(value.keys()) |
| cdef int keys_size_bytes = self.parent_writer.cursor() - offset - 8 |
| self.parent_writer.WriteDirectly(offset, keys_size_bytes) |
| self.values_encoder.write_array(value.values()) |
| |
| cdef decode(self, MapData map_data): |
| cdef: |
| int num_elements = map_data.num_elements |
| int i |
| dict_obj = {} |
| key_arr = self.keys_encoder.decode( |
| map_data.keys_array_(self.keys_encoder.list_type)) |
| value_arr = self.values_encoder.decode( |
| map_data.values_array_(self.values_encoder.list_type)) |
| return dict(zip(key_arr, value_arr)) |
| |
| cdef write(self, int i, value): |
| cdef int offset = self.parent_writer.cursor() |
| self.write_map(value) |
| cdef int size = self.parent_writer.cursor() - offset |
| self.parent_writer.SetOffsetAndSize(i, offset, size) |
| |
| cdef read(self, Getter data, int i): |
| map_data = data.get_map_data(i) |
| if map_data is not None: |
| return self.decode(map_data) |
| else: |
| return None |
| |
| |
| # no need to check numeric overflow, cython will check it |
| # cython will check type for automatic cast |
| # cython: checked-type-casts |
| |
| # When a parameter of a Python function is declared to have a C data type, |
| # it is passed in as a Python object and automatically converted to a C value, |
| # if possible. Automatic conversion is currently only possible for numeric types, |
| # string types and structs (composed recursively of any of these types). |
| # So you can declare parameter with extension_types, because it's a python object |
| @cython.internal |
| cdef class BooleanWriter(Encoder): |
| cdef write(self, int i, value): |
| cdef c_bool v = value |
| self.writer.Write(i, v) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_boolean(i) |
| |
| |
| @cython.internal |
| cdef class Int8Writer(Encoder): |
| cdef write(self, int i, value): |
| cdef int8_t v = value |
| self.writer.Write(i, v) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_int8(i) |
| |
| |
| @cython.internal |
| cdef class Int16Writer(Encoder): |
| cdef write(self, int i, value): |
| cdef int16_t v = value |
| self.writer.Write(i, v) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_int16(i) |
| |
| |
| @cython.internal |
| cdef class Int32Writer(Encoder): |
| cdef write(self, int i, value): |
| cdef int32_t v = value |
| self.writer.Write(i, v) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_int32(i) |
| |
| |
| @cython.internal |
| cdef class Int64Writer(Encoder): |
| cdef write(self, int i, value): |
| cdef int64_t v = value |
| self.writer.Write(i, v) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_int64(i) |
| |
| |
| @cython.internal |
| cdef class FloatWriter(Encoder): |
| cdef write(self, int i, value): |
| cdef float v = value |
| self.writer.Write(i, v) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_float(i) |
| |
| |
| @cython.internal |
| cdef class DoubleWriter(Encoder): |
| cdef write(self, int i, value): |
| cdef double v = value |
| self.writer.Write(i, v) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_double(i) |
| |
| |
| @cython.internal |
| cdef class DateWriter(Encoder): |
| cdef write(self, int i, value): |
| if not isinstance(value, date): |
| raise TypeError("{} should be {} instead of {}".format( |
| value, date, type(value))) |
| cdef int32_t days = (value - date(1970, 1, 1)).days |
| self.writer.Write(i, days) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_date(i) |
| |
| |
| @cython.internal |
| cdef class TimestampWriter(Encoder): |
| cdef write(self, int i, value): |
| if not isinstance(value, datetime): |
| raise TypeError("{} should be {} instead of {}".format( |
| value, datetime, type(value))) |
| # TimestampType represent micro seconds |
| cdef int64_t timestamp = int(value.timestamp() * 1000000) |
| self.writer.Write(i, timestamp) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_datetime(i) |
| |
| |
| @cython.internal |
| cdef class BinaryWriter(Encoder): |
| cdef write(self, int i, value): |
| # support bytes, bytearray, array of unsigned char |
| cdef const unsigned char[:] data = value |
| cdef int32_t length = data.nbytes |
| self.writer.WriteBytes(i, &data[0], length) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_binary(i) |
| |
| |
| @cython.internal |
| cdef class StrWriter(Encoder): |
| cdef write(self, int i, value): |
| cdef unsigned char* data |
| if PyUnicode_Check(value): |
| encoded = PyUnicode_AsEncodedString(value, "UTF-8", "encode to utf-8 error") |
| data = encoded |
| self.writer.WriteBytes(i, data, len(encoded)) |
| else: |
| raise TypeError("value should be unicode, but get type of {}" |
| .format(type(value))) |
| |
| cdef read(self, Getter data, int i): |
| return data.get_str(i) |
| |
| |
| cdef create_converter(Field field, CWriter* writer): |
| import pyarrow as pa |
| cdef: |
| RowEncoder row_encoder |
| ArrayWriter array_encoder |
| MapWriter map_encoder |
| DataType data_type = field.type |
| |
| if types.is_boolean(data_type): |
| return create_atomic_encoder(BooleanWriter, writer) |
| elif types.is_int8(data_type): |
| return create_atomic_encoder(Int8Writer, writer) |
| elif types.is_int16(data_type): |
| return create_atomic_encoder(Int16Writer, writer) |
| elif types.is_int32(data_type): |
| return create_atomic_encoder(Int32Writer, writer) |
| elif types.is_int64(data_type): |
| return create_atomic_encoder(Int64Writer, writer) |
| elif types.is_float32(data_type): |
| return create_atomic_encoder(FloatWriter, writer) |
| elif types.is_float64(data_type): |
| return create_atomic_encoder(DoubleWriter, writer) |
| elif types.is_date32(data_type): |
| return create_atomic_encoder(DateWriter, writer) |
| elif types.is_timestamp(data_type): |
| return create_atomic_encoder(TimestampWriter, writer) |
| elif types.is_binary(data_type): |
| return create_atomic_encoder(BinaryWriter, writer) |
| elif types.is_string(data_type): |
| return create_atomic_encoder(StrWriter, writer) |
| elif types.is_struct(data_type): |
| row_encoder = RowEncoder.create(pa.schema( |
| list(data_type), metadata=field.metadata), writer) |
| return row_encoder |
| elif types.is_list(data_type): |
| array_encoder = ArrayWriter.create(data_type, writer) |
| return array_encoder |
| elif types.is_map(data_type): |
| map_encoder = MapWriter.create(data_type, writer) |
| return map_encoder |
| raise TypeError("Unsupported type: " + str(data_type)) |
| |
| |
| cdef create_atomic_encoder(cls, CWriter* writer): |
| cdef Encoder encoder = cls.__new__(cls) |
| encoder.writer = writer |
| return encoder |