| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| # cython: language_level = 3 |
| # cython: infer_types = True |
| # cython: profile=True |
| # cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True |
| from libc.stdint cimport int32_t, int64_t |
| from libc.stdlib cimport free, malloc |
| |
| import datetime |
| import decimal |
| import pickle |
| from typing import List, Union |
| |
| from cloudpickle import cloudpickle |
| import pyarrow as pa |
| |
| from pyflink.common import Row, RowKind |
| from pyflink.common.time import Instant |
| from pyflink.datastream.window import CountWindow, TimeWindow |
| from pyflink.fn_execution.ResettableIO import ResettableIO |
| from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas |
| |
| ROW_KIND_BIT_SIZE = 2 |
| |
| cdef class InternalRow: |
| def __cinit__(self, list values, InternalRowKind row_kind): |
| self.values = values |
| self.row_kind = row_kind |
| self.field_names = [] |
| |
| cpdef object to_row(self): |
| row = Row() |
| row._values = self.values |
| row.set_field_names(self.field_names) |
| row.set_row_kind(RowKind(self.row_kind)) |
| return row |
| |
| @staticmethod |
| def from_row(row: Row) -> InternalRow: |
| cdef InternalRow internal_row |
| internal_row = InternalRow(row._values, row.get_row_kind().value) |
| internal_row.field_names = row._fields |
| return internal_row |
| |
| cdef bint is_retract_msg(self): |
| return self.row_kind == InternalRowKind.UPDATE_BEFORE or \ |
| self.row_kind == InternalRowKind.DELETE |
| |
| cdef bint is_accumulate_msg(self): |
| return self.row_kind == InternalRowKind.UPDATE_AFTER or \ |
| self.row_kind == InternalRowKind.INSERT |
| |
| def __eq__(self, other): |
| if not other: |
| return False |
| return self.values == other.values |
| |
| def __getitem__(self, item): |
| return self.values[item] |
| |
| def __iter__(self): |
| return iter(self.values) |
| |
| def __repr__(self): |
| return "InternalRow(%s, %s)" % (self.row_kind, self.values) |
| |
| cdef class MaskUtils: |
| """ |
| A util class used to encode mask value. |
| """ |
| |
| def __cinit__(self, field_count): |
| self._mask = <bint*> malloc((field_count + ROW_KIND_BIT_SIZE) * sizeof(bint)) |
| if self._mask == NULL: |
| raise MemoryError() |
| |
| self._mask_byte_search_table = <unsigned char*> malloc(8 * sizeof(unsigned char)) |
| if self._mask_byte_search_table == NULL: |
| raise MemoryError() |
| self._mask_byte_search_table[0] = 0x80 |
| self._mask_byte_search_table[1] = 0x40 |
| self._mask_byte_search_table[2] = 0x20 |
| self._mask_byte_search_table[3] = 0x10 |
| self._mask_byte_search_table[4] = 0x08 |
| self._mask_byte_search_table[5] = 0x04 |
| self._mask_byte_search_table[6] = 0x02 |
| self._mask_byte_search_table[7] = 0x01 |
| |
| self._row_kind_byte_table = <unsigned char*> malloc(8 * sizeof(unsigned char)) |
| if self._row_kind_byte_table == NULL: |
| raise MemoryError() |
| self._row_kind_byte_table[0] = 0x00 |
| self._row_kind_byte_table[1] = 0x80 |
| self._row_kind_byte_table[2] = 0x40 |
| self._row_kind_byte_table[3] = 0xC0 |
| |
| def __init__(self, field_count): |
| self._field_count = field_count |
| self._leading_complete_bytes_num = (self._field_count + ROW_KIND_BIT_SIZE) // 8 |
| self._remaining_bits_num = (self._field_count + ROW_KIND_BIT_SIZE) % 8 |
| |
| cdef void write_mask(self, list value, unsigned char row_kind_value, |
| OutputStream output_stream): |
| cdef size_t field_pos, index, i |
| cdef unsigned char*bit_map_byte_search_table |
| cdef unsigned char b |
| field_pos = 0 |
| bit_map_byte_search_table = self._mask_byte_search_table |
| |
| # first byte contains the row kind bits |
| b = self._row_kind_byte_table[row_kind_value] |
| for i in range(0, 8 - ROW_KIND_BIT_SIZE): |
| if field_pos + i < self._field_count and value[field_pos + i] is None: |
| b |= bit_map_byte_search_table[i + ROW_KIND_BIT_SIZE] |
| field_pos += 8 - ROW_KIND_BIT_SIZE |
| output_stream.write_byte(b) |
| |
| for _ in range(1, self._leading_complete_bytes_num): |
| b = 0x00 |
| for i in range(8): |
| if value[field_pos + i] is None: |
| b |= bit_map_byte_search_table[i] |
| field_pos += 8 |
| output_stream.write_byte(b) |
| |
| if self._leading_complete_bytes_num >= 1 and self._remaining_bits_num: |
| b = 0x00 |
| for i in range(self._remaining_bits_num): |
| if value[field_pos + i] is None: |
| b |= bit_map_byte_search_table[i] |
| output_stream.write_byte(b) |
| |
| cdef bint*read_mask(self, InputStream input_stream): |
| cdef size_t field_pos, i |
| cdef unsigned char b |
| field_pos = 0 |
| for _ in range(self._leading_complete_bytes_num): |
| b = input_stream.read_byte() |
| for i in range(8): |
| self._mask[field_pos] = (b & self._mask_byte_search_table[i]) > 0 |
| field_pos += 1 |
| |
| if self._remaining_bits_num: |
| b = input_stream.read_byte() |
| for i in range(self._remaining_bits_num): |
| self._mask[field_pos] = (b & self._mask_byte_search_table[i]) > 0 |
| field_pos += 1 |
| return self._mask |
| |
| def __dealloc__(self): |
| if self._mask != NULL: |
| free(self._mask) |
| if self._mask_byte_search_table != NULL: |
| free(self._mask_byte_search_table) |
| if self._row_kind_byte_table != NULL: |
| free(self._row_kind_byte_table) |
| |
| cdef class LengthPrefixBaseCoderImpl: |
| """ |
| LengthPrefixBaseCoderImpl will be used in Operations and other coders will be the field coder of |
| LengthPrefixBaseCoderImpl. |
| """ |
| |
| def __init__(self, field_coder: FieldCoderImpl): |
| self._field_coder = field_coder |
| self._data_out_stream = OutputStream() |
| |
| cpdef encode_to_stream(self, value, LengthPrefixOutputStream output_stream): |
| pass |
| |
| cpdef decode_from_stream(self, LengthPrefixInputStream input_stream): |
| pass |
| |
| cdef void _write_data_to_output_stream(self, LengthPrefixOutputStream output_stream): |
| cdef OutputStream data_out_stream |
| data_out_stream = self._data_out_stream |
| output_stream.write(data_out_stream.buffer, data_out_stream.pos) |
| data_out_stream.pos = 0 |
| |
| cdef class FieldCoderImpl: |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| """ |
| Encodes `value` to the output stream. |
| |
| :param value: The output data |
| :param out_stream: Output Stream |
| """ |
| pass |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| """ |
| Decodes data from the input stream. |
| |
| :param in_stream: Input Stream |
| :param size: The size data of input stream will be decoded. If the size is set 0, it means |
| the coder won't take use of the length to decode the data from input stream. |
| :return: The decoded object. |
| """ |
| pass |
| |
| cpdef bytes encode(self, value): |
| """ |
| Encodes `value` to a bytes |
| |
| :param value: The output data |
| :return: The encoded data. |
| """ |
| cdef OutputStream out_stream |
| out_stream = OutputStream() |
| self.encode_to_stream(value, out_stream) |
| return out_stream.buffer[0:out_stream.pos] |
| |
| cpdef decode(self, encoded): |
| """ |
| Decodes an object from the bytes |
| |
| :param encoded: The bytes |
| :return: The decoded object. |
| """ |
| cdef InputStream input_stream |
| input_stream = InputStream() |
| input_stream._input_data = <char*> encoded |
| return self.decode_from_stream(input_stream, len(encoded)) |
| |
| cdef class InputStreamWrapper: |
| def __cinit__(self, value_coder: ValueCoderImpl, input_stream: LengthPrefixInputStream): |
| self._value_coder = value_coder |
| self._input_stream = input_stream |
| |
| cpdef bint has_next(self): |
| return self._input_stream.available() |
| |
| cpdef next(self): |
| return self._value_coder.decode_from_stream(self._input_stream) |
| |
| cdef class IterableCoderImpl(LengthPrefixBaseCoderImpl): |
| """ |
| Encodes iterable data to output stream. The output mode will decide whether write a special end |
| message 0x00 to output stream after encoding data. |
| """ |
| |
| def __cinit__(self, *args, **kwargs): |
| self._end_message = <char*> malloc(1) |
| if self._end_message == NULL: |
| raise MemoryError() |
| self._end_message[0] = 0x00 |
| |
| def __init__(self, field_coder: FieldCoderImpl, separated_with_end_message: bool): |
| super(IterableCoderImpl, self).__init__(field_coder) |
| self._separated_with_end_message = separated_with_end_message |
| |
| cpdef encode_to_stream(self, value, LengthPrefixOutputStream output_stream): |
| if value: |
| for item in value: |
| self._field_coder.encode_to_stream(item, self._data_out_stream) |
| self._write_data_to_output_stream(output_stream) |
| |
| # write end message |
| if self._separated_with_end_message: |
| output_stream.write(self._end_message, 1) |
| |
| cpdef decode_from_stream(self, LengthPrefixInputStream input_stream): |
| return InputStreamWrapper(ValueCoderImpl(self._field_coder), input_stream) |
| |
| def __dealloc__(self): |
| if self._end_message != NULL: |
| free(self._end_message) |
| |
| cdef class ValueCoderImpl(LengthPrefixBaseCoderImpl): |
| """ |
| Encodes a single data to output stream. |
| """ |
| |
| def __init__(self, field_coder: FieldCoderImpl): |
| super(ValueCoderImpl, self).__init__(field_coder) |
| |
| cpdef encode_to_stream(self, value, LengthPrefixOutputStream output_stream): |
| self._field_coder.encode_to_stream(value, self._data_out_stream) |
| self._write_data_to_output_stream(output_stream) |
| |
| cpdef decode_from_stream(self, LengthPrefixInputStream input_stream): |
| cdef char*input_data |
| cdef char*temp |
| cdef size_t size |
| cdef long long addr |
| cdef InputStream data_input_stream |
| # set input_data pointer to the input data |
| size = input_stream.read(&input_data) |
| |
| # create InputStream |
| data_input_stream = InputStream() |
| data_input_stream._input_data = input_data |
| |
| return self._field_coder.decode_from_stream(data_input_stream, size) |
| |
| cdef class FlattenRowCoderImpl(FieldCoderImpl): |
| """ |
| A coder for flatten row (List) object (without field names and row kind value is 0). |
| """ |
| |
| def __init__(self, field_coders: List[FieldCoderImpl]): |
| self._field_coders = field_coders # type: List[FieldCoderImpl] |
| self._field_count = len(self._field_coders) |
| self._mask_utils = MaskUtils(self._field_count) |
| self._reuse_flatten_row = [None for i in range(self._field_count)] |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef list list_value, field_coders |
| cdef size_t i |
| cdef FieldCoderImpl field_coder |
| |
| list_value = <list> value |
| |
| # encode mask value |
| self._mask_utils.write_mask(list_value, 0, out_stream) |
| |
| # encode every field value |
| field_coders = self._field_coders |
| for i in range(self._field_count): |
| item = list_value[i] |
| if item is not None: |
| field_coder = <FieldCoderImpl> field_coders[i] |
| field_coder.encode_to_stream(item, out_stream) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef bint*mask |
| cdef size_t i |
| cdef list flatten_row, field_coders |
| cdef FieldCoderImpl field_coder |
| |
| # read mask |
| mask = self._mask_utils.read_mask(in_stream) |
| # skip ROW_KIND_BIT_SIZE to data mask |
| (<bint**> &mask)[0] += ROW_KIND_BIT_SIZE |
| |
| # decode field data |
| flatten_row = self._reuse_flatten_row |
| field_coders = self._field_coders |
| for i in range(self._field_count): |
| if mask[i]: |
| flatten_row[i] = None |
| else: |
| field_coder = <FieldCoderImpl> field_coders[i] |
| flatten_row[i] = field_coder.decode_from_stream(in_stream, 0) |
| return flatten_row |
| |
| def __repr__(self): |
| return 'FlattenRowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders) |
| |
| cdef class RowCoderImpl(FieldCoderImpl): |
| """ |
| A coder for `Row` or `InternalRow` object. |
| """ |
| |
| def __init__(self, field_coders: List[FieldCoderImpl], field_names: List[str]): |
| self._field_coders = field_coders # type: List[FieldCoderImpl] |
| self._field_count = len(self._field_coders) |
| self._field_names = field_names # type: List[str] |
| self._mask_utils = MaskUtils(self._field_count) |
| |
| cpdef encode_to_stream(self, value: Union[Row, InternalRow], OutputStream out_stream): |
| cdef list list_values |
| cdef size_t i |
| cdef unsigned char row_kind_value |
| cdef FieldCoderImpl field_coder |
| |
| if isinstance(value, InternalRow): |
| # the type is InternalRow |
| list_values = <list> value.values |
| row_kind_value = <unsigned char> value.row_kind |
| else: |
| # the type is Row |
| list_values = <list> value._values |
| row_kind_value = <unsigned char> value.get_row_kind().value |
| # encode mask value |
| self._mask_utils.write_mask(list_values, row_kind_value, out_stream) |
| |
| # encode every field value |
| for i in range(self._field_count): |
| item = list_values[i] |
| if item is not None: |
| field_coder = <FieldCoderImpl> self._field_coders[i] |
| field_coder.encode_to_stream(item, out_stream) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef bint*mask |
| cdef unsigned char row_kind_value = 0 |
| cdef size_t i |
| cdef FieldCoderImpl field_coder |
| cdef list field_values |
| mask = self._mask_utils.read_mask(in_stream) |
| field_values = [] |
| for i in range(self._field_count): |
| if mask[i + ROW_KIND_BIT_SIZE]: |
| field_values.append(None) |
| else: |
| field_coder = <FieldCoderImpl> self._field_coders[i] |
| field_values.append(field_coder.decode_from_stream(in_stream, 0)) |
| row = Row() |
| row._values = field_values |
| for i in range(ROW_KIND_BIT_SIZE): |
| row_kind_value += mask[i] * 2 ** i |
| row.set_field_names(self._field_names) |
| row.set_row_kind(RowKind(row_kind_value)) |
| return row |
| |
| def __repr__(self): |
| return 'RowCoderImpl[%s, %s]' % \ |
| (', '.join(str(c) for c in self._field_coders), self._field_names) |
| |
| cdef class ArrowCoderImpl(FieldCoderImpl): |
| """ |
| A coder for arrow format data. |
| """ |
| |
| def __init__(self, schema, row_type, timezone): |
| self._schema = schema |
| self._field_types = row_type.field_types() |
| self._timezone = timezone |
| self._resettable_io = ResettableIO() |
| self._batch_reader = self._load_from_stream(self._resettable_io) |
| |
| cpdef encode_to_stream(self, cols, OutputStream out_stream): |
| self._resettable_io.set_output_stream(out_stream) |
| batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema) |
| batch_writer.write_batch( |
| pandas_to_arrow(self._schema, self._timezone, self._field_types, cols)) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return self.decode_one_batch_from_stream(in_stream, size) |
| |
| cdef list decode_one_batch_from_stream(self, InputStream in_stream, size_t size): |
| self._resettable_io.set_input_bytes(in_stream.read(size)) |
| # there is only one arrow batch in the underlying input stream |
| return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)]) |
| |
| def _load_from_stream(self, stream): |
| while stream.readable(): |
| reader = pa.ipc.open_stream(stream) |
| yield reader.read_next_batch() |
| |
| def __repr__(self): |
| return 'ArrowCoderImpl[%s]' % self._schema |
| |
| cdef class OverWindowArrowCoderImpl(FieldCoderImpl): |
| """ |
| A coder for over window with arrow format data. |
| The data structure: [window data][arrow format data]. |
| """ |
| def __init__(self, arrow_coder_impl: ArrowCoderImpl): |
| self._arrow_coder = arrow_coder_impl |
| self._int_coder = IntCoderImpl() |
| |
| cpdef encode_to_stream(self, cols, OutputStream out_stream): |
| self._arrow_coder.encode_to_stream(cols, out_stream) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef int32_t window_num, window_size |
| cdef list window_boundaries_and_arrow_data |
| window_num = self._int_coder.decode_from_stream(in_stream, 0) |
| size -= 4 |
| window_boundaries_and_arrow_data = [] |
| for _ in range(window_num): |
| window_size = self._int_coder.decode_from_stream(in_stream, 0) |
| size -= 4 |
| window_boundaries_and_arrow_data.append( |
| [self._int_coder.decode_from_stream(in_stream, 0) |
| for _ in range(window_size)]) |
| size -= 4 * window_size |
| window_boundaries_and_arrow_data.append( |
| self._arrow_coder.decode_one_batch_from_stream(in_stream, size)) |
| return window_boundaries_and_arrow_data |
| |
| def __repr__(self): |
| return 'OverWindowArrowCoderImpl[%s]' % self._arrow_coder |
| |
| |
| cdef class TinyIntCoderImpl(FieldCoderImpl): |
| """ |
| A coder for tiny int value (from -128 to 127). |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_int8(value) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_int8() |
| |
| cdef class SmallIntCoderImpl(FieldCoderImpl): |
| """ |
| A coder for small int value (from -32,768 to 32,767). |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_int16(value) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_int16() |
| |
| cdef class IntCoderImpl(FieldCoderImpl): |
| """ |
| A coder for int value (from -2,147,483,648 to 2,147,483,647). |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_int32(value) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_int32() |
| |
| cdef class BigIntCoderImpl(FieldCoderImpl): |
| """ |
| A coder for big int value (from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807). |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_int64(value) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_int64() |
| |
| cdef class BooleanCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a boolean value. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_byte(value) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return not not in_stream.read_byte() |
| |
| cdef class FloatCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a float value (4-byte single precision floating point number). |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_float(value) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_float() |
| |
| cdef class DoubleCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a double value (8-byte double precision floating point number). |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_double(value) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_double() |
| |
| cdef class BinaryCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a bytes value. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_bytes(value, len(value)) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_bytes() |
| |
| cdef class CharCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a str value. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef bytes value_bytes = value.encode('utf-8') |
| out_stream.write_bytes(value_bytes, len(value_bytes)) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return in_stream.read_bytes().decode("utf-8") |
| |
| cdef class BigDecimalCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a big decimal value (without fixed precision and scale). |
| """ |
| |
| def __init__(self): |
| self._value_coder = CharCoderImpl() |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| self._value_coder.encode_to_stream(str(value), out_stream) |
| |
| cpdef decode_from_stream(self, InputStream input_stream, size_t size): |
| return decimal.Decimal(self._value_coder.decode_from_stream(input_stream, 0)) |
| |
| cdef class DecimalCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a decimal value (with fixed precision and scale). |
| """ |
| |
| def __init__(self, precision, scale): |
| self._context = decimal.Context(prec=precision) |
| self._scale_format = decimal.Decimal(10) ** -scale |
| self._value_coder = CharCoderImpl() |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| user_context = decimal.getcontext() |
| decimal.setcontext(self._context) |
| self._value_coder.encode_to_stream(str(value.quantize(self._scale_format)), out_stream) |
| decimal.setcontext(user_context) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| user_context = decimal.getcontext() |
| decimal.setcontext(self._context) |
| result = decimal.Decimal(self._value_coder.decode_from_stream(in_stream, 0)) \ |
| .quantize(self._scale_format) |
| decimal.setcontext(user_context) |
| return result |
| |
| cdef class DateCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a datetime.date value. |
| """ |
| |
| def __init__(self): |
| self._EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal() |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_int32(value.toordinal() - self._EPOCH_ORDINAL) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return datetime.date.fromordinal(in_stream.read_int32() + self._EPOCH_ORDINAL) |
| |
| cdef class TimeCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a datetime.time value. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef int32_t hour, minute, seconds, microsecond, milliseconds |
| hour = value.hour |
| minute = value.minute |
| seconds = value.second |
| microsecond = value.microsecond |
| milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 + microsecond // 1000 |
| out_stream.write_int32(milliseconds) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef int32_t value, seconds, milliseconds, minutes, hours |
| value = in_stream.read_int32() |
| seconds = value // 1000 |
| milliseconds = value % 1000 |
| minutes = seconds // 60 |
| seconds %= 60 |
| hours = minutes // 60 |
| minutes %= 60 |
| return datetime.time(hours, minutes, seconds, milliseconds * 1000) |
| |
| cdef class TimestampCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a datetime.datetime value. |
| """ |
| |
| def __init__(self, precision): |
| self._is_compact = precision <= 3 |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef int32_t microseconds_of_second, nanoseconds |
| cdef int64_t timestamp_seconds, timestamp_milliseconds |
| timestamp_seconds = <int64_t> (value.replace(tzinfo=datetime.timezone.utc).timestamp()) |
| microseconds_of_second = value.microsecond |
| timestamp_milliseconds = timestamp_seconds * 1000 + microseconds_of_second // 1000 |
| nanoseconds = microseconds_of_second % 1000 * 1000 |
| if self._is_compact: |
| out_stream.write_int64(timestamp_milliseconds) |
| else: |
| out_stream.write_int64(timestamp_milliseconds) |
| out_stream.write_int32(nanoseconds) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return self._decode_timestamp_data_from_stream(in_stream) |
| |
| cdef _decode_timestamp_data_from_stream(self, InputStream in_stream): |
| cdef int64_t milliseconds |
| cdef int32_t nanoseconds, seconds, microseconds |
| if self._is_compact: |
| milliseconds = in_stream.read_int64() |
| nanoseconds = 0 |
| else: |
| milliseconds = in_stream.read_int64() |
| nanoseconds = in_stream.read_int32() |
| seconds = milliseconds // 1000 |
| microseconds = milliseconds % 1000 * 1000 + nanoseconds // 1000 |
| return datetime.datetime.utcfromtimestamp(seconds).replace(microsecond=microseconds) |
| |
| cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl): |
| """ |
| A coder for a datetime.datetime with time zone value. |
| """ |
| |
| def __init__(self, precision, timezone): |
| super(LocalZonedTimestampCoderImpl, self).__init__(precision) |
| self._timezone = timezone |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return self._timezone.localize(self._decode_timestamp_data_from_stream(in_stream)) |
| |
| cdef class InstantCoderImpl(FieldCoderImpl): |
| """ |
| A coder for Instant. |
| """ |
| |
| def __init__(self): |
| self._null_seconds = -9223372036854775808 |
| self._null_nanos = -2147483648 |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| if value is None: |
| out_stream.write_int64(self._null_seconds) |
| out_stream.write_int32(self._null_nanos) |
| else: |
| out_stream.write_int64(value.seconds) |
| out_stream.write_int32(value.nanos) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef int64_t seconds |
| cdef int32_t nanos |
| seconds = in_stream.read_int64() |
| nanos = in_stream.read_int32() |
| if seconds == self._null_seconds and nanos == self._null_nanos: |
| return None |
| else: |
| return Instant(seconds, nanos) |
| |
| |
| cdef class CloudPickleCoderImpl(FieldCoderImpl): |
| """ |
| A coder used with cloudpickle for all kinds of python object. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef bytes pickled_bytes |
| pickled_bytes = cloudpickle.dumps(value) |
| out_stream.write_bytes(pickled_bytes, len(pickled_bytes)) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef bytes pickled_bytes |
| pickled_bytes = in_stream.read_bytes() |
| return cloudpickle.loads(pickled_bytes) |
| |
| |
| cdef class PickleCoderImpl(FieldCoderImpl): |
| """ |
| A coder used with pickle for all kinds of python object. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef bytes pickled_bytes |
| pickled_bytes = pickle.dumps(value) |
| out_stream.write_bytes(pickled_bytes, len(pickled_bytes)) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef bytes pickled_bytes |
| pickled_bytes = in_stream.read_bytes() |
| return pickle.loads(pickled_bytes) |
| |
| cdef class GenericArrayCoderImpl(FieldCoderImpl): |
| """ |
| A coder for basic array value (the element of array could be null). |
| """ |
| |
| def __init__(self, elem_coder: FieldCoderImpl): |
| self._elem_coder = elem_coder |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef int32_t length, i |
| cdef list list_value = value |
| length = len(value) |
| out_stream.write_int32(length) |
| for i in range(length): |
| item = list_value[i] |
| if item is None: |
| out_stream.write_byte(False) |
| else: |
| out_stream.write_byte(True) |
| self._elem_coder.encode_to_stream(item, out_stream) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef int32_t length |
| length = in_stream.read_int32() |
| return [self._elem_coder.decode_from_stream(in_stream, 0) if in_stream.read_byte() |
| else None for _ in range(length)] |
| |
| cdef class PrimitiveArrayCoderImpl(FieldCoderImpl): |
| """ |
| A coder for primitive array value (the element of array won't be null). |
| """ |
| |
| def __init__(self, elem_coder: FieldCoderImpl): |
| self._elem_coder = elem_coder |
| |
| cpdef encode_to_stream(self, value, OutputStream output_stream): |
| cdef int32_t length, i |
| cdef list list_value = value |
| length = len(value) |
| output_stream.write_int32(length) |
| for i in range(length): |
| self._elem_coder.encode_to_stream(list_value[i], output_stream) |
| |
| cpdef decode_from_stream(self, InputStream input_stream, size_t size): |
| cdef int32_t length |
| length = input_stream.read_int32() |
| return [self._elem_coder.decode_from_stream(input_stream, 0) for _ in range(length)] |
| |
| cdef class MapCoderImpl(FieldCoderImpl): |
| """ |
| A coder for map value (dict with same type key and same type value). |
| """ |
| |
| def __init__(self, key_coder: FieldCoderImpl, value_coder: FieldCoderImpl): |
| self._key_coder = key_coder |
| self._value_coder = value_coder |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef int32_t length |
| length = len(value) |
| out_stream.write_int32(length) |
| items = value.items() |
| for k, v in items: |
| self._key_coder.encode_to_stream(k, out_stream) |
| if v is None: |
| out_stream.write_byte(True) |
| else: |
| out_stream.write_byte(False) |
| self._value_coder.encode_to_stream(v, out_stream) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef int32_t length |
| cdef dict map_value |
| length = in_stream.read_int32() |
| map_value = {} |
| for _ in range(length): |
| key = self._key_coder.decode_from_stream(in_stream, 0) |
| if in_stream.read_byte(): |
| map_value[key] = None |
| else: |
| map_value[key] = self._value_coder.decode_from_stream(in_stream, 0) |
| return map_value |
| |
| cdef class TupleCoderImpl(FieldCoderImpl): |
| """ |
| A coder for a tuple value. |
| """ |
| |
| def __init__(self, field_coders): |
| self._field_coders = field_coders |
| self._field_count = len(field_coders) |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| cdef tuple tuple_value = value |
| for i in range(self._field_count): |
| self._field_coders[i].encode_to_stream(tuple_value[i], out_stream) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef int32_t i |
| cdef list decoded_list |
| decoded_list = [self._field_coders[i].decode_from_stream(in_stream, 0) |
| for i in range(self._field_count)] |
| return tuple(decoded_list) |
| |
| cdef class TimeWindowCoderImpl(FieldCoderImpl): |
| """ |
| A coder for TimeWindow. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_int64(value.start) |
| out_stream.write_int64(value.end) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| cdef int64_t start, end |
| start = in_stream.read_int64() |
| end = in_stream.read_int64() |
| return TimeWindow(start, end) |
| |
| cdef class CountWindowCoderImpl(FieldCoderImpl): |
| """ |
| A coder for CountWindow. |
| """ |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| out_stream.write_int64(value.id) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return CountWindow(in_stream.read_int64()) |
| |
| cdef class DataViewFilterCoderImpl(FieldCoderImpl): |
| """ |
| A coder for CountWindow. |
| """ |
| def __init__(self, udf_data_view_specs): |
| self._udf_data_view_specs = udf_data_view_specs |
| self._pickle_coder = PickleCoderImpl() |
| |
| cpdef encode_to_stream(self, value, OutputStream out_stream): |
| self._pickle_coder.encode_to_stream(self._filter_data_views(value), out_stream) |
| |
| cpdef decode_from_stream(self, InputStream in_stream, size_t size): |
| return self._pickle_coder.decode_from_stream(in_stream, size) |
| |
| def _filter_data_views(self, row): |
| i = 0 |
| for specs in self._udf_data_view_specs: |
| for spec in specs: |
| row[i][spec.field_index] = None |
| i += 1 |
| return row |
| |
| |