blob: 9652cdba9f0bbf5e5bd4028017725700296fc57f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Optimized implementations of various schema columner types."""
# pytype: skip-file
import numpy as np
cimport numpy as np
from apache_beam.coders import coder_impl
from apache_beam.coders.coder_impl cimport RowColumnEncoder, OutputStream, InputStream
from apache_beam.portability.api import schema_pb2
cdef class AtomicTypeRowColumnEncoder(RowColumnEncoder):
cdef original
cdef contiguous
def __init__(self, column):
self.original = column
self.contiguous = np.ascontiguousarray(column)
def null_flags(self):
return None
def finalize_write(self):
if self.original is not self.contiguous:
self.original[:] = self.contiguous
cdef class FloatFloat32RowColumnEncoder(AtomicTypeRowColumnEncoder):
cdef np.float32_t* data
def __init__(self, unused_coder, column):
super(FloatFloat32RowColumnEncoder, self).__init__(column)
cdef np.float32_t[::1] view = self.contiguous
self.data = &view[0]
cdef bint encode_to_stream(self, size_t index, OutputStream stream) except -1:
stream.write_bigendian_float(self.data[index])
cdef bint decode_from_stream(self, size_t index, InputStream stream) except -1:
self.data[index] = stream.read_bigendian_float()
FloatFloat32RowColumnEncoder.register(schema_pb2.FLOAT, np.float32().dtype)
cdef class FloatFloat64RowColumnEncoder(AtomicTypeRowColumnEncoder):
cdef np.float64_t* data
def __init__(self, unused_coder, column):
super(FloatFloat64RowColumnEncoder, self).__init__(column)
cdef np.float64_t[::1] view = self.contiguous
self.data = &view[0]
cdef bint encode_to_stream(self, size_t index, OutputStream stream) except -1:
stream.write_bigendian_float(self.data[index])
cdef bint decode_from_stream(self, size_t index, InputStream stream) except -1:
self.data[index] = stream.read_bigendian_float()
FloatFloat64RowColumnEncoder.register(schema_pb2.FLOAT, np.float64().dtype)
cdef class DoubleFloat32RowColumnEncoder(AtomicTypeRowColumnEncoder):
cdef np.float32_t* data
def __init__(self, unused_coder, column):
super(DoubleFloat32RowColumnEncoder, self).__init__(column)
cdef np.float32_t[::1] view = self.contiguous
self.data = &view[0]
cdef bint encode_to_stream(self, size_t index, OutputStream stream) except -1:
stream.write_bigendian_double(self.data[index])
cdef bint decode_from_stream(self, size_t index, InputStream stream) except -1:
self.data[index] = stream.read_bigendian_double()
DoubleFloat32RowColumnEncoder.register(schema_pb2.DOUBLE, np.float32().dtype)
cdef class DoubleFloat64RowColumnEncoder(AtomicTypeRowColumnEncoder):
cdef np.float64_t* data
def __init__(self, unused_coder, column):
super(DoubleFloat64RowColumnEncoder, self).__init__(column)
cdef np.float64_t[::1] view = self.contiguous
self.data = &view[0]
cdef bint encode_to_stream(self, size_t index, OutputStream stream) except -1:
stream.write_bigendian_double(self.data[index])
cdef bint decode_from_stream(self, size_t index, InputStream stream) except -1:
self.data[index] = stream.read_bigendian_double()
DoubleFloat64RowColumnEncoder.register(schema_pb2.DOUBLE, np.float64().dtype)
cdef class Int32Int32RowColumnEncoder(AtomicTypeRowColumnEncoder):
cdef np.int32_t* data
def __init__(self, unused_coder, column):
super(Int32Int32RowColumnEncoder, self).__init__(column)
cdef np.int32_t[::1] view = self.contiguous
self.data = &view[0]
cdef bint encode_to_stream(self, size_t index, OutputStream stream) except -1:
stream.write_var_int64(self.data[index])
cdef bint decode_from_stream(self, size_t index, InputStream stream) except -1:
self.data[index] = stream.read_var_int64()
Int32Int32RowColumnEncoder.register(schema_pb2.INT32, np.int32().dtype)
Int32Int32RowColumnEncoder.register(schema_pb2.INT32, np.int64().dtype)
cdef class Int64Int64RowColumnEncoder(AtomicTypeRowColumnEncoder):
cdef np.int64_t* data
def __init__(self, unused_coder, column):
super(Int64Int64RowColumnEncoder, self).__init__(column)
cdef np.int64_t[::1] view = self.contiguous
self.data = &view[0]
cdef bint encode_to_stream(self, size_t index, OutputStream stream) except -1:
stream.write_var_int64(self.data[index])
cdef bint decode_from_stream(self, size_t index, InputStream stream) except -1:
self.data[index] = stream.read_var_int64()
Int64Int64RowColumnEncoder.register(schema_pb2.INT64, np.int64().dtype)
cdef class BoolRowColumnEncoder(AtomicTypeRowColumnEncoder):
cdef np.uint8_t* data
def __init__(self, unused_coder, column):
super(BoolRowColumnEncoder, self).__init__(column)
self.contiguous = self.contiguous.astype(np.uint8)
cdef np.uint8_t[::1] view = self.contiguous
self.data = &view[0]
cdef bint encode_to_stream(self, size_t index, OutputStream stream) except -1:
stream.write_byte(self.data[index])
cdef bint decode_from_stream(self, size_t index, InputStream stream) except -1:
self.data[index] = stream.read_byte()
BoolRowColumnEncoder.register(schema_pb2.BOOLEAN, np.int8().dtype)
BoolRowColumnEncoder.register(schema_pb2.BOOLEAN, np.uint8().dtype)
BoolRowColumnEncoder.register(schema_pb2.BOOLEAN, np.bool_().dtype)