blob: 23dc0ee24b01ede4a0fd7793fca3054a8915dc42 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A pure Python implementation of stream.pyx.
For internal use only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
import struct
from typing import List
class OutputStream(object):
"""For internal use only; no backwards-compatibility guarantees.
A pure Python implementation of stream.OutputStream."""
def __init__(self):
self.data = [] # type: List[bytes]
self.byte_count = 0
def write(self, b, nested=False):
# type: (bytes, bool) -> None
assert isinstance(b, bytes)
if nested:
self.write_var_int64(len(b))
self.data.append(b)
self.byte_count += len(b)
def write_byte(self, val):
self.data.append(chr(val).encode('latin-1'))
self.byte_count += 1
def write_var_int64(self, v):
# type: (int) -> None
if v < 0:
v += 1 << 64
if v <= 0:
raise ValueError('Value too large (negative).')
while True:
bits = v & 0x7F
v >>= 7
if v:
bits |= 0x80
self.write_byte(bits)
if not v:
break
def write_bigendian_int64(self, v):
self.write(struct.pack('>q', v))
def write_bigendian_uint64(self, v):
self.write(struct.pack('>Q', v))
def write_bigendian_int32(self, v):
self.write(struct.pack('>i', v))
def write_bigendian_double(self, v):
self.write(struct.pack('>d', v))
def get(self):
# type: () -> bytes
return b''.join(self.data)
def size(self):
# type: () -> int
return self.byte_count
def _clear(self):
# type: () -> None
self.data = []
self.byte_count = 0
class ByteCountingOutputStream(OutputStream):
"""For internal use only; no backwards-compatibility guarantees.
A pure Python implementation of stream.ByteCountingOutputStream."""
def __init__(self):
# Note that we don't actually use any of the data initialized by our super.
super(ByteCountingOutputStream, self).__init__()
self.count = 0
def write(self, byte_array, nested=False):
# type: (bytes, bool) -> None
blen = len(byte_array)
if nested:
self.write_var_int64(blen)
self.count += blen
def write_byte(self, _):
self.count += 1
def get_count(self):
return self.count
def get(self):
raise NotImplementedError
def __str__(self):
return '<%s %s>' % (self.__class__.__name__, self.count)
class InputStream(object):
"""For internal use only; no backwards-compatibility guarantees.
A pure Python implementation of stream.InputStream."""
def __init__(self, data):
# type: (bytes) -> None
self.data = data
self.pos = 0
def size(self):
return len(self.data) - self.pos
def read(self, size):
# type: (int) -> bytes
self.pos += size
return self.data[self.pos - size:self.pos]
def read_all(self, nested):
# type: (bool) -> bytes
return self.read(self.read_var_int64() if nested else self.size())
def read_byte(self):
# type: () -> int
self.pos += 1
return self.data[self.pos - 1]
def read_var_int64(self):
shift = 0
result = 0
while True:
byte = self.read_byte()
if byte < 0:
raise RuntimeError('VarLong not terminated.')
bits = byte & 0x7F
if shift >= 64 or (shift >= 63 and bits > 1):
raise RuntimeError('VarLong too long.')
result |= bits << shift
shift += 7
if not byte & 0x80:
break
if result >= 1 << 63:
result -= 1 << 64
return result
def read_bigendian_int64(self):
return struct.unpack('>q', self.read(8))[0]
def read_bigendian_uint64(self):
return struct.unpack('>Q', self.read(8))[0]
def read_bigendian_int32(self):
return struct.unpack('>i', self.read(4))[0]
def read_bigendian_double(self):
return struct.unpack('>d', self.read(8))[0]
def get_varint_size(v):
"""For internal use only; no backwards-compatibility guarantees.
Returns the size of the given integer value when encode as a VarInt."""
if v < 0:
v += 1 << 64
if v <= 0:
raise ValueError('Value too large (negative).')
varint_size = 0
while True:
varint_size += 1
v >>= 7
if not v:
break
return varint_size