blob: 7fa1854c57b18517f29eb67e3cfdb598294d6cb6 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cython: language_level=3
"""Compiled version of the Stream objects used by CoderImpl.
For internal use only; no backwards-compatibility guarantees.
"""
cimport libc.stdlib
cimport libc.string
cdef class OutputStream(object):
"""An output string stream implementation supporting write() and get()."""
#TODO(robertwb): Consider using raw C++ streams.
def __cinit__(self):
self.buffer_size = 1024
self.pos = 0
self.data = <char*>libc.stdlib.malloc(self.buffer_size)
assert self.data, "OutputStream malloc failed."
def __dealloc__(self):
if self.data:
libc.stdlib.free(self.data)
cpdef write(self, bytes b, bint nested=False):
cdef size_t blen = len(b)
if nested:
self.write_var_int64(blen)
if self.buffer_size < self.pos + blen:
self.extend(blen)
libc.string.memcpy(self.data + self.pos, <char*>b, blen)
self.pos += blen
cpdef write_byte(self, unsigned char val):
if self.buffer_size < self.pos + 1:
self.extend(1)
self.data[self.pos] = val
self.pos += 1
cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
"""Encode a long using variable-length encoding to a stream."""
cdef libc.stdint.uint64_t v = signed_v
cdef long bits
while True:
bits = v & 0x7F
v >>= 7
if v:
bits |= 0x80
self.write_byte(bits)
if not v:
break
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
self.write_bigendian_uint64(signed_v)
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t v):
if self.buffer_size < self.pos + 8:
self.extend(8)
self.data[self.pos ] = <unsigned char>(v >> 56)
self.data[self.pos + 1] = <unsigned char>(v >> 48)
self.data[self.pos + 2] = <unsigned char>(v >> 40)
self.data[self.pos + 3] = <unsigned char>(v >> 32)
self.data[self.pos + 4] = <unsigned char>(v >> 24)
self.data[self.pos + 5] = <unsigned char>(v >> 16)
self.data[self.pos + 6] = <unsigned char>(v >> 8)
self.data[self.pos + 7] = <unsigned char>(v )
self.pos += 8
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
cdef libc.stdint.uint32_t v = signed_v
if self.buffer_size < self.pos + 4:
self.extend(4)
self.data[self.pos ] = <unsigned char>(v >> 24)
self.data[self.pos + 1] = <unsigned char>(v >> 16)
self.data[self.pos + 2] = <unsigned char>(v >> 8)
self.data[self.pos + 3] = <unsigned char>(v )
self.pos += 4
cpdef write_bigendian_double(self, double d):
self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0])
cpdef bytes get(self):
return self.data[:self.pos]
cpdef size_t size(self) except? -1:
return self.pos
cdef extend(self, size_t missing):
while missing > self.buffer_size - self.pos:
self.buffer_size *= 2
self.data = <char*>libc.stdlib.realloc(self.data, self.buffer_size)
assert self.data, "OutputStream realloc failed."
cpdef _clear(self):
self.pos = 0
cdef class ByteCountingOutputStream(OutputStream):
"""An output string stream implementation that only counts the bytes.
This implementation counts the number of bytes it "writes" but
doesn't actually write them anyway. Thus it has write() but not
get(). get_count() returns how many bytes were written.
This is useful for sizing an encoding.
"""
def __cinit__(self):
self.count = 0
cpdef write(self, bytes b, bint nested=False):
cdef size_t blen = len(b)
if nested:
self.write_var_int64(blen)
self.count += blen
cpdef write_byte(self, unsigned char _):
self.count += 1
cpdef write_bigendian_int64(self, libc.stdint.int64_t _):
self.count += 8
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t _):
self.count += 8
cpdef write_bigendian_int32(self, libc.stdint.int32_t _):
self.count += 4
cpdef size_t get_count(self):
return self.count
cpdef bytes get(self):
raise NotImplementedError
def __str__(self):
return '<%s %s>' % (self.__class__.__name__, self.count)
cdef class InputStream(object):
"""An input string stream implementation supporting read() and size()."""
def __init__(self, all):
self.allc = self.all = all
cpdef bytes read(self, size_t size):
self.pos += size
return self.allc[self.pos - size : self.pos]
cpdef long read_byte(self) except? -1:
self.pos += 1
# Note: Some C++ compilers treats the char array below as a signed char.
# This causes incorrect coder behavior unless explicitly cast to an
# unsigned char here.
return <long>(<unsigned char> self.allc[self.pos - 1])
cpdef ssize_t size(self) except? -1:
return len(self.all) - self.pos
cpdef bytes read_all(self, bint nested=False):
return self.read(<ssize_t>self.read_var_int64() if nested else self.size())
cpdef libc.stdint.int64_t read_var_int64(self) except? -1:
"""Decode a variable-length encoded long from a stream."""
cdef long byte
cdef long bits
cdef long shift = 0
cdef libc.stdint.int64_t result = 0
while True:
byte = self.read_byte()
if byte < 0:
raise RuntimeError('VarInt not terminated.')
bits = byte & 0x7F
if (shift >= sizeof(long) * 8 or
(shift >= (sizeof(long) * 8 - 1) and bits > 1)):
raise RuntimeError('VarLong too long.')
result |= bits << shift
shift += 7
if not (byte & 0x80):
break
return result
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
return self.read_bigendian_uint64()
cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1:
self.pos += 8
return (<unsigned char>self.allc[self.pos - 1]
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 2] << 8
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 3] << 16
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 4] << 24
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 5] << 32
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 6] << 40
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 7] << 48
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 8] << 56)
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1:
self.pos += 4
return (<unsigned char>self.allc[self.pos - 1]
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 2] << 8
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 3] << 16
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 4] << 24)
cpdef double read_bigendian_double(self) except? -1:
cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
return (<double*><char*>&as_long)[0]
cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
"""Returns the size of the given integer value when encode as a VarInt."""
cdef libc.stdint.int64_t varint_size = 0
cdef libc.stdint.uint64_t bits = value
while True:
varint_size += 1
bits >>= 7
if not bits:
break
return varint_size