blob: 60ef8ee32235a9f673be8905f63482f97abadfa0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from libcpp.memory cimport shared_ptr, dynamic_pointer_cast
from datetime import datetime, date
from libc.stdint cimport *
from libcpp cimport bool as c_bool
from libcpp.vector cimport vector
import cython
from cpython cimport *
from pyfory.includes.libformat cimport (
CWriter, CRowWriter, CArrayWriter, CBuffer, CTypeId,
CSchema, CField, CListType, CMapType, fory_schema, fory_list
)
from pyfory.includes.libutil cimport allocate_buffer
cimport pyfory.includes.libformat as libformat
def create_row_encoder(Schema schema):
return RowEncoder.create(schema)
cdef class Encoder:
cdef:
CWriter* writer
cdef write(self, int i, value):
pass
cdef read(self, Getter data, int i):
pass
cdef class RowEncoder(Encoder):
cdef:
readonly Schema schema_
int initial_buffer_size
shared_ptr[CSchema] c_schema
CWriter* parent_writer
CRowWriter* row_writer
list encoders
c_bool is_root
def __init__(self):
raise TypeError("Do not call constructor directly, use "
"factory function instead.")
@staticmethod
cdef create(Schema schema, CWriter* parent_writer=NULL, initial_buffer_size=16):
cdef RowEncoder encoder = RowEncoder.__new__(RowEncoder)
encoder.schema_ = schema
encoder.c_schema = schema.c_schema
encoder.initial_buffer_size = initial_buffer_size
encoder.parent_writer = parent_writer
if parent_writer == NULL:
encoder.row_writer = new CRowWriter(encoder.c_schema)
else:
encoder.row_writer = new CRowWriter(encoder.c_schema, parent_writer)
encoder.encoders = []
cdef:
Field field_
for i in range(schema.num_fields):
field_ = schema.field(i)
encoder.encoders.append(create_converter(field_, encoder.row_writer))
return encoder
@property
def schema(self):
return self.schema_
# Special methods of extension types must be declared with def, not cdef.
def __dealloc__(self):
del self.row_writer
cpdef RowData to_row(self, value):
if value is None:
raise ValueError("value shouldn't be None")
cdef shared_ptr[CBuffer] buf
if not allocate_buffer(self.initial_buffer_size, &buf):
raise MemoryError("out of memory")
self.row_writer.set_buffer(buf)
self.row_writer.reset()
return self.write_row(value)
cpdef from_row(self, RowData row):
return self.decode(row)
cdef RowData write_row(self, value):
cdef:
Field field_
int num_fields = self.schema_.num_fields
int i
# we don't use __dict__, because if user implements __getattr__/__setattr__
# or use descriptor, the key in __dict__ may be not same as in schema's
# field name, and __slot__ also needs extra check.
# We don't support Mapping subclass, because isinstance cost too much time.
if type(value) is not dict:
for i in range(num_fields):
field_ = self.schema_.field(i)
field_value = getattr(value, field_.name, None)
if field_value is None:
self.row_writer.set_null_at(i)
else:
self.row_writer.set_not_null_at(i)
(<Encoder>self.encoders[i]).write(i, field_value)
else:
for i in range(num_fields):
field_ = self.schema_.field(i)
field_value = value.get(field_.name)
if field_value is None:
self.row_writer.set_null_at(i)
else:
self.row_writer.set_not_null_at(i)
(<Encoder>self.encoders[i]).write(i, field_value)
cdef shared_ptr[libformat.CRow] row = self.row_writer.to_row()
return RowData.wrap(row, self.schema_)
cdef decode(self, RowData row):
cdef:
int num_fields = self.schema_.num_fields
int i
from pyfory.format import get_cls_by_schema
cls = get_cls_by_schema(self.schema_)
obj = cls.__new__(cls)
for i in range(num_fields):
field_ = self.schema_.field(i)
field_name = field_.name
if not row.is_null_at(i):
setattr(obj, field_name, (<Encoder>self.encoders[i]).read(row, i))
else:
setattr(obj, field_name, None)
return obj
cdef write(self, int i, value):
cdef int offset = self.parent_writer.cursor()
self.row_writer.reset()
self.write_row(value)
cdef int size = self.parent_writer.cursor() - offset
self.parent_writer.set_offset_and_size(i, offset, size)
cdef read(self, Getter data, int i):
struct_data = data.get_struct(i)
if struct_data is not None:
return self.decode(struct_data)
else:
return None
cdef class ArrayWriter(Encoder):
cdef:
ListType list_type
CWriter* parent_writer
CArrayWriter* array_writer
object elem_encoder
def __init__(self):
raise TypeError("Do not call ArrayWriter's constructor directly, use "
"factory function instead.")
@staticmethod
cdef ArrayWriter create(ListType list_type, CWriter* parent_writer):
cdef:
ArrayWriter encoder = ArrayWriter.__new__(ArrayWriter)
cdef shared_ptr[CListType] c_list_type = \
dynamic_pointer_cast[CListType, libformat.CDataType](list_type.c_type)
encoder.parent_writer = parent_writer
encoder.list_type = list_type
encoder.array_writer = new CArrayWriter(c_list_type, parent_writer)
# Create encoder for value type
cdef Field value_field = list_type.value_field
encoder.elem_encoder = create_converter(value_field, encoder.array_writer)
return encoder
def __dealloc__(self):
del self.array_writer
cdef void write_array(self, value):
"""If value don't have __iter__/__len__, raise TypeError"""
if value is None:
raise ValueError("value shouldn't be None")
# only support max to 32-bit int, so we don't use Py_ssize_t,
# use int and let cython check overflow instead.
cdef:
int length = len(value)
int i
self.array_writer.reset(length)
it = iter(value)
for i in range(length):
elem = next(it)
if elem is None:
self.array_writer.set_null_at(i)
else:
self.array_writer.set_not_null_at(i)
(<Encoder>self.elem_encoder).write(i, elem)
cdef decode(self, ArrayData array_data):
cdef:
int num_elements = array_data.data.get().num_elements()
int i
arr = []
for i in range(num_elements):
if not array_data.is_null_at(i):
arr.append((<Encoder>self.elem_encoder).read(array_data, i))
else:
arr.append(None)
return arr
cdef write(self, int i, value):
cdef int offset = self.parent_writer.cursor()
self.write_array(value)
cdef int size = self.parent_writer.cursor() - offset
self.parent_writer.set_offset_and_size(i, offset, size)
cdef read(self, Getter data, int i):
array_data = data.get_array_data(i)
if array_data is not None:
return self.decode(array_data)
else:
return None
cdef class MapWriter(Encoder):
cdef:
MapType map_type
CWriter* parent_writer
ArrayWriter keys_encoder
ArrayWriter values_encoder
def __init__(self):
raise TypeError("Do not call MapWriter's constructor directly, use "
"factory function instead.")
@staticmethod
cdef MapWriter create(MapType map_type, CWriter* parent_writer):
cdef MapWriter encoder = MapWriter.__new__(MapWriter)
encoder.map_type = map_type
encoder.parent_writer = parent_writer
# Create list types for keys and values
cdef ListType keys_list_type = list_(map_type.key_type)
cdef ListType values_list_type = list_(map_type.item_type)
encoder.keys_encoder = ArrayWriter.create(keys_list_type, parent_writer)
encoder.values_encoder = ArrayWriter.create(values_list_type, parent_writer)
return encoder
cdef void write_map(self, value):
"""if value has keys/values methods, we take it as a dict,
else raise TypeError"""
if value is None:
raise ValueError("value shouldn't be None")
cdef int offset = self.parent_writer.cursor()
self.parent_writer.write_directly(-1) # increase cursor by 8
self.keys_encoder.write_array(value.keys())
cdef int keys_size_bytes = self.parent_writer.cursor() - offset - 8
self.parent_writer.write_directly(offset, keys_size_bytes)
self.values_encoder.write_array(value.values())
cdef decode(self, MapData map_data):
cdef:
int num_elements = map_data.num_elements
int i
dict_obj = {}
key_arr = self.keys_encoder.decode(
map_data.keys_array_(self.keys_encoder.list_type))
value_arr = self.values_encoder.decode(
map_data.values_array_(self.values_encoder.list_type))
return dict(zip(key_arr, value_arr))
cdef write(self, int i, value):
cdef int offset = self.parent_writer.cursor()
self.write_map(value)
cdef int size = self.parent_writer.cursor() - offset
self.parent_writer.set_offset_and_size(i, offset, size)
cdef read(self, Getter data, int i):
map_data = data.get_map_data(i)
if map_data is not None:
return self.decode(map_data)
else:
return None
# no need to check numeric overflow, cython will check it
# cython will check type for automatic cast
# cython: checked-type-casts
# When a parameter of a Python function is declared to have a C data type,
# it is passed in as a Python object and automatically converted to a C value,
# if possible. Automatic conversion is currently only possible for numeric types,
# string types and structs (composed recursively of any of these types).
# So you can declare parameter with extension_types, because it's a python object
@cython.internal
cdef class BooleanWriter(Encoder):
cdef write(self, int i, value):
cdef c_bool v = value
self.writer.write(i, v)
cdef read(self, Getter data, int i):
return data.get_boolean(i)
@cython.internal
cdef class Int8Writer(Encoder):
cdef write(self, int i, value):
cdef int8_t v = value
self.writer.write(i, v)
cdef read(self, Getter data, int i):
return data.get_int8(i)
@cython.internal
cdef class Int16Writer(Encoder):
cdef write(self, int i, value):
cdef int16_t v = value
self.writer.write(i, v)
cdef read(self, Getter data, int i):
return data.get_int16(i)
@cython.internal
cdef class Int32Writer(Encoder):
cdef write(self, int i, value):
cdef int32_t v = value
self.writer.write(i, v)
cdef read(self, Getter data, int i):
return data.get_int32(i)
@cython.internal
cdef class Int64Writer(Encoder):
cdef write(self, int i, value):
cdef int64_t v = value
self.writer.write(i, v)
cdef read(self, Getter data, int i):
return data.get_int64(i)
@cython.internal
cdef class FloatWriter(Encoder):
cdef write(self, int i, value):
cdef float v = value
self.writer.write(i, v)
cdef read(self, Getter data, int i):
return data.get_float(i)
@cython.internal
cdef class DoubleWriter(Encoder):
cdef write(self, int i, value):
cdef double v = value
self.writer.write(i, v)
cdef read(self, Getter data, int i):
return data.get_double(i)
@cython.internal
cdef class DateWriter(Encoder):
cdef write(self, int i, value):
if not isinstance(value, date):
raise TypeError("{} should be {} instead of {}".format(
value, date, type(value)))
cdef int32_t days = (value - date(1970, 1, 1)).days
self.writer.write(i, days)
cdef read(self, Getter data, int i):
return data.get_date(i)
@cython.internal
cdef class TimestampWriter(Encoder):
cdef write(self, int i, value):
if not isinstance(value, datetime):
raise TypeError("{} should be {} instead of {}".format(
value, datetime, type(value)))
# TimestampType represent micro seconds
cdef int64_t timestamp = int(value.timestamp() * 1000000)
self.writer.write(i, timestamp)
cdef read(self, Getter data, int i):
return data.get_datetime(i)
@cython.internal
cdef class BinaryWriter(Encoder):
cdef write(self, int i, value):
# support bytes, bytearray, array of unsigned char
cdef const unsigned char[:] data = value
cdef int32_t length = data.nbytes
self.writer.write_bytes(i, &data[0], length)
cdef read(self, Getter data, int i):
return data.get_binary(i)
@cython.internal
cdef class StrWriter(Encoder):
cdef write(self, int i, value):
cdef unsigned char* data
if PyUnicode_Check(value):
encoded = PyUnicode_AsEncodedString(value, "UTF-8", "encode to utf-8 error")
data = encoded
self.writer.write_bytes(i, data, len(encoded))
else:
raise TypeError("value should be unicode, but get type of {}"
.format(type(value)))
cdef read(self, Getter data, int i):
return data.get_str(i)
cdef create_converter(Field field_, CWriter* writer):
"""Create an encoder for the given field type."""
cdef:
RowEncoder row_encoder
ArrayWriter array_encoder
MapWriter map_encoder
DataType data_type = field_.type
CTypeId type_id = data_type.id
ListType list_type
MapType map_type
if type_id == CTypeId.BOOL:
return create_atomic_encoder(BooleanWriter, writer)
elif type_id == CTypeId.INT8:
return create_atomic_encoder(Int8Writer, writer)
elif type_id == CTypeId.INT16:
return create_atomic_encoder(Int16Writer, writer)
elif type_id == CTypeId.INT32:
return create_atomic_encoder(Int32Writer, writer)
elif type_id == CTypeId.INT64:
return create_atomic_encoder(Int64Writer, writer)
elif type_id == CTypeId.FLOAT32:
return create_atomic_encoder(FloatWriter, writer)
elif type_id == CTypeId.FLOAT64:
return create_atomic_encoder(DoubleWriter, writer)
elif type_id == CTypeId.DATE:
return create_atomic_encoder(DateWriter, writer)
elif type_id == CTypeId.TIMESTAMP:
return create_atomic_encoder(TimestampWriter, writer)
elif type_id == CTypeId.BINARY:
return create_atomic_encoder(BinaryWriter, writer)
elif type_id == CTypeId.STRING:
return create_atomic_encoder(StrWriter, writer)
elif type_id == CTypeId.STRUCT:
# Create schema from struct type fields
struct_schema = _struct_type_to_schema(data_type)
row_encoder = RowEncoder.create(struct_schema, writer)
return row_encoder
elif type_id == CTypeId.LIST:
list_type = <ListType>data_type
array_encoder = ArrayWriter.create(list_type, writer)
return array_encoder
elif type_id == CTypeId.MAP:
map_type = <MapType>data_type
map_encoder = MapWriter.create(map_type, writer)
return map_encoder
raise TypeError("Unsupported type: " + str(data_type))
cdef create_atomic_encoder(cls, CWriter* writer):
cdef Encoder encoder = cls.__new__(cls)
encoder.writer = writer
return encoder