blob: 3b5df6cf97f453fc7e9d7122d3b61cb04a8cd072 [file] [log] [blame]
"""license: Apache License 2.0, see LICENSE for more details."""
"""Zookeeper Serializers, Deserializers, and NamedTuple objects"""
from collections import namedtuple
import struct
from kazoo.exceptions import EXCEPTIONS
from kazoo.protocol.states import ZnodeStat
from kazoo.security import ACL
from kazoo.security import Id
# Struct objects with formats compiled
bool_struct = struct.Struct('B')
int_struct = struct.Struct('!i')
int_int_struct = struct.Struct('!ii')
int_int_long_struct = struct.Struct('!iiq')
int_long_int_long_struct = struct.Struct('!iqiq')
multiheader_struct = struct.Struct('!iBi')
reply_header_struct = struct.Struct('!iqi')
stat_struct = struct.Struct('!qqqqiiiqiiq')
try: # pragma: nocover
basestring
except NameError:
basestring = str
def read_string(buffer, offset):
"""Reads an int specified buffer into a string and returns the
string and the new offset in the buffer"""
length = int_struct.unpack_from(buffer, offset)[0]
offset += int_struct.size
if length < 0:
return None, offset
else:
index = offset
offset += length
return buffer[index:index + length].decode('utf-8'), offset
def read_acl(bytes, offset):
perms = int_struct.unpack_from(bytes, offset)[0]
offset += int_struct.size
scheme, offset = read_string(bytes, offset)
id, offset = read_string(bytes, offset)
return ACL(perms, Id(scheme, id)), offset
def write_string(bytes):
if not bytes:
return int_struct.pack(-1)
else:
utf8_str = bytes.encode('utf-8')
return int_struct.pack(len(utf8_str)) + utf8_str
def write_buffer(bytes):
if bytes is None:
return int_struct.pack(-1)
else:
return int_struct.pack(len(bytes)) + bytes
def read_buffer(bytes, offset):
length = int_struct.unpack_from(bytes, offset)[0]
offset += int_struct.size
if length < 0:
return None, offset
else:
index = offset
offset += length
return bytes[index:index + length], offset
class Close(namedtuple('Close', '')):
type = -11
@classmethod
def serialize(cls):
return b''
CloseInstance = Close()
class Ping(namedtuple('Ping', '')):
type = 11
@classmethod
def serialize(cls):
return b''
PingInstance = Ping()
class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
' time_out session_id passwd read_only')):
type = None
def serialize(self):
b = bytearray()
b.extend(int_long_int_long_struct.pack(
self.protocol_version, self.last_zxid_seen, self.time_out,
self.session_id))
b.extend(write_buffer(self.passwd))
b.extend([1 if self.read_only else 0])
return b
@classmethod
def deserialize(cls, bytes, offset):
proto_version, timeout, session_id = int_int_long_struct.unpack_from(
bytes, offset)
offset += int_int_long_struct.size
password, offset = read_buffer(bytes, offset)
try:
read_only = bool_struct.unpack_from(bytes, offset)[0] is 1
offset += bool_struct.size
except struct.error:
read_only = False
return cls(proto_version, 0, timeout, session_id, password,
read_only), offset
class Create(namedtuple('Create', 'path data acl flags')):
type = 1
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(len(self.acl)))
for acl in self.acl:
b.extend(int_struct.pack(acl.perms) +
write_string(acl.id.scheme) + write_string(acl.id.id))
b.extend(int_struct.pack(self.flags))
return b
@classmethod
def deserialize(cls, bytes, offset):
return read_string(bytes, offset)[0]
class Delete(namedtuple('Delete', 'path version')):
type = 2
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.version))
return b
@classmethod
def deserialize(self, bytes, offset):
return True
class Exists(namedtuple('Exists', 'path watcher')):
type = 3
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend([1 if self.watcher else 0])
return b
@classmethod
def deserialize(cls, bytes, offset):
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return stat if stat.czxid != -1 else None
class GetData(namedtuple('GetData', 'path watcher')):
type = 4
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend([1 if self.watcher else 0])
return b
@classmethod
def deserialize(cls, bytes, offset):
data, offset = read_buffer(bytes, offset)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return data, stat
class SetData(namedtuple('SetData', 'path data version')):
type = 5
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(self.version))
return b
@classmethod
def deserialize(cls, bytes, offset):
return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
class GetACL(namedtuple('GetACL', 'path')):
type = 6
def serialize(self):
return bytearray(write_string(self.path))
@classmethod
def deserialize(cls, bytes, offset):
count = int_struct.unpack_from(bytes, offset)[0]
offset += int_struct.size
if count == -1: # pragma: nocover
return []
acls = []
for c in range(count):
acl, offset = read_acl(bytes, offset)
acls.append(acl)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return acls, stat
class SetACL(namedtuple('SetACL', 'path acls version')):
type = 7
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(len(self.acls)))
for acl in self.acls:
b.extend(int_struct.pack(acl.perms) +
write_string(acl.id.scheme) + write_string(acl.id.id))
b.extend(int_struct.pack(self.version))
return b
@classmethod
def deserialize(cls, bytes, offset):
return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
class GetChildren(namedtuple('GetChildren', 'path watcher')):
type = 8
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend([1 if self.watcher else 0])
return b
@classmethod
def deserialize(cls, bytes, offset):
count = int_struct.unpack_from(bytes, offset)[0]
offset += int_struct.size
if count == -1: # pragma: nocover
return []
children = []
for c in range(count):
child, offset = read_string(bytes, offset)
children.append(child)
return children
class Sync(namedtuple('Sync', 'path')):
type = 9
def serialize(self):
return write_string(self.path)
@classmethod
def deserialize(cls, buffer, offset):
return read_string(buffer, offset)[0]
class GetChildren2(namedtuple('GetChildren2', 'path watcher')):
type = 12
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend([1 if self.watcher else 0])
return b
@classmethod
def deserialize(cls, bytes, offset):
count = int_struct.unpack_from(bytes, offset)[0]
offset += int_struct.size
if count == -1: # pragma: nocover
return []
children = []
for c in range(count):
child, offset = read_string(bytes, offset)
children.append(child)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return children, stat
class CheckVersion(namedtuple('CheckVersion', 'path version')):
type = 13
def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.version))
return b
class Transaction(namedtuple('Transaction', 'operations')):
type = 14
def serialize(self):
b = bytearray()
for op in self.operations:
b.extend(MultiHeader(op.type, False, -1).serialize() +
op.serialize())
return b + multiheader_struct.pack(-1, True, -1)
@classmethod
def deserialize(cls, bytes, offset):
header = MultiHeader(None, False, None)
results = []
response = None
while not header.done:
if header.type == Create.type:
response, offset = read_string(bytes, offset)
elif header.type == Delete.type:
response = True
elif header.type == SetData.type:
response = ZnodeStat._make(
stat_struct.unpack_from(bytes, offset))
offset += stat_struct.size
elif header.type == CheckVersion.type:
response = True
elif header.type == -1:
err = int_struct.unpack_from(bytes, offset)[0]
offset += int_struct.size
response = EXCEPTIONS[err]()
if response:
results.append(response)
header, offset = MultiHeader.deserialize(bytes, offset)
return results
@staticmethod
def unchroot(client, response):
resp = []
for result in response:
if isinstance(result, basestring):
resp.append(client.unchroot(result))
else:
resp.append(result)
return resp
class Auth(namedtuple('Auth', 'auth_type scheme auth')):
type = 100
def serialize(self):
return (int_struct.pack(self.auth_type) + write_string(self.scheme) +
write_string(self.auth))
class Watch(namedtuple('Watch', 'type state path')):
@classmethod
def deserialize(cls, bytes, offset):
"""Given bytes and the current bytes offset, return the
type, state, path, and new offset"""
type, state = int_int_struct.unpack_from(bytes, offset)
offset += int_int_struct.size
path, offset = read_string(bytes, offset)
return cls(type, state, path), offset
class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')):
@classmethod
def deserialize(cls, bytes, offset):
"""Given bytes and the current bytes offset, return a
:class:`ReplyHeader` instance and the new offset"""
new_offset = offset + reply_header_struct.size
return cls._make(
reply_header_struct.unpack_from(bytes, offset)), new_offset
class MultiHeader(namedtuple('MultiHeader', 'type done err')):
def serialize(self):
b = bytearray()
b.extend(int_struct.pack(self.type))
b.extend([1 if self.done else 0])
b.extend(int_struct.pack(self.err))
return b
@classmethod
def deserialize(cls, bytes, offset):
t, done, err = multiheader_struct.unpack_from(bytes, offset)
offset += multiheader_struct.size
return cls(t, done is 1, err), offset