| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| A Connection class containing socket code that uses the spec metadata |
| to read and write Frame objects. This could be used by a client, |
| server, or even a proxy implementation. |
| """ |
| |
| import socket, codec, errno, qpid |
| from cStringIO import StringIO |
| from codec import EOF |
| from compat import SHUT_RDWR |
| from exceptions import VersionError |
| from logging import getLogger, DEBUG |
| |
| log = getLogger("qpid.connection08") |
| |
| class SockIO: |
| |
| def __init__(self, sock): |
| self.sock = sock |
| |
| def write(self, buf): |
| if log.isEnabledFor(DEBUG): |
| log.debug("OUT: %r", buf) |
| self.sock.sendall(buf) |
| |
| def read(self, n): |
| data = "" |
| while len(data) < n: |
| try: |
| s = self.sock.recv(n - len(data)) |
| except socket.error: |
| break |
| if len(s) == 0: |
| break |
| data += s |
| if log.isEnabledFor(DEBUG): |
| log.debug("IN: %r", data) |
| return data |
| |
| def flush(self): |
| pass |
| |
| def close(self): |
| try: |
| try: |
| self.sock.shutdown(SHUT_RDWR) |
| except socket.error, e: |
| if (e.errno == errno.ENOTCONN): |
| pass |
| else: |
| raise |
| finally: |
| self.sock.close() |
| |
| class _OldSSLSock: |
| """This is a wrapper around old (<=2.5) Python SSLObjects""" |
| |
| def __init__(self, sock, keyFile, certFile): |
| self._sock = sock |
| self._sslObj = socket.ssl(self._sock, self._keyFile, self._certFile) |
| self._keyFile = keyFile |
| self._certFile = certFile |
| |
| def sendall(self, buf): |
| while buf: |
| bytesWritten = self._sslObj.write(buf) |
| buf = buf[bytesWritten:] |
| |
| def recv(self, n): |
| return self._sslObj.read(n) |
| |
| def shutdown(self, how): |
| self._sock.shutdown(how) |
| |
| def close(self): |
| self._sock.close() |
| self._sslObj = None |
| |
| def getpeercert(self): |
| raise socket.error("This version of Python does not support SSL hostname verification. Please upgrade.") |
| |
| |
| def connect(host, port, options = None): |
| sock = socket.socket() |
| sock.connect((host, port)) |
| sock.setblocking(1) |
| |
| if options and options.get("ssl", False): |
| log.debug("Wrapping socket for SSL") |
| ssl_certfile = options.get("ssl_certfile", None) |
| ssl_keyfile = options.get("ssl_keyfile", ssl_certfile) |
| ssl_trustfile = options.get("ssl_trustfile", None) |
| ssl_require_trust = options.get("ssl_require_trust", True) |
| ssl_verify_hostname = not options.get("ssl_skip_hostname_check", False) |
| |
| try: |
| # Python 2.6 and 2.7 |
| from ssl import wrap_socket, CERT_REQUIRED, CERT_NONE |
| try: |
| # Python 2.7.9 and newer |
| from ssl import match_hostname as verify_hostname |
| except ImportError: |
| # Before Python 2.7.9 we roll our own |
| from qpid.messaging.transports import verify_hostname |
| |
| if ssl_require_trust or ssl_verify_hostname: |
| validate = CERT_REQUIRED |
| else: |
| validate = CERT_NONE |
| sock = wrap_socket(sock, |
| keyfile=ssl_keyfile, |
| certfile=ssl_certfile, |
| ca_certs=ssl_trustfile, |
| cert_reqs=validate) |
| except ImportError, e: |
| # Python 2.5 and older |
| if ssl_verify_hostname: |
| log.error("Your version of Python does not support ssl hostname verification. Please upgrade your version of Python.") |
| raise e |
| sock = _OldSSLSock(sock, ssl_keyfile, ssl_certfile) |
| |
| if ssl_verify_hostname: |
| verify_hostname(sock.getpeercert(), host) |
| |
| return SockIO(sock) |
| |
| def listen(host, port, predicate = lambda: True): |
| sock = socket.socket() |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.bind((host, port)) |
| sock.listen(5) |
| while predicate(): |
| s, a = sock.accept() |
| yield SockIO(s) |
| |
| class FramingError(Exception): |
| pass |
| |
| class Connection: |
| |
| def __init__(self, io, spec): |
| self.codec = codec.Codec(io, spec) |
| self.spec = spec |
| self.FRAME_END = self.spec.constants.byname["frame_end"].id |
| self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) |
| self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) |
| self.io = io |
| |
| def flush(self): |
| self.codec.flush() |
| |
| INIT="!4s4B" |
| |
| def init(self): |
| self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, |
| self.spec.minor) |
| |
| def tini(self): |
| self.codec.unpack(Connection.INIT) |
| |
| def write_8_0(self, frame): |
| c = self.codec |
| c.encode_octet(self.spec.constants.byname[frame.type].id) |
| c.encode_short(frame.channel) |
| body = StringIO() |
| enc = codec.Codec(body, self.spec) |
| frame.encode(enc) |
| enc.flush() |
| c.encode_longstr(body.getvalue()) |
| c.encode_octet(self.FRAME_END) |
| |
| def read_8_0(self): |
| c = self.codec |
| tid = c.decode_octet() |
| try: |
| type = self.spec.constants.byid[tid].name |
| except KeyError: |
| if tid == ord('A') and c.unpack("!3s") == "MQP": |
| _, _, major, minor = c.unpack("4B") |
| raise VersionError("client: %s-%s, server: %s-%s" % |
| (self.spec.major, self.spec.minor, major, minor)) |
| else: |
| raise FramingError("unknown frame type: %s" % tid) |
| try: |
| channel = c.decode_short() |
| body = c.decode_longstr() |
| dec = codec.Codec(StringIO(body), self.spec) |
| frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) |
| frame.channel = channel |
| end = c.decode_octet() |
| if end != self.FRAME_END: |
| garbage = "" |
| while end != self.FRAME_END: |
| garbage += chr(end) |
| end = c.decode_octet() |
| raise FramingError("frame error: expected %r, got %r" % (self.FRAME_END, garbage)) |
| return frame |
| except EOF: |
| # An EOF caught here can indicate an error decoding the frame, |
| # rather than that a disconnection occurred,so it's worth logging it. |
| log.exception("Error occurred when reading frame with tid %s" % tid) |
| raise |
| |
| def write_0_9(self, frame): |
| self.write_8_0(frame) |
| |
| def read_0_9(self): |
| return self.read_8_0() |
| |
| def write_0_91(self, frame): |
| self.write_8_0(frame) |
| |
| def read_0_91(self): |
| return self.read_8_0() |
| |
| def write_0_10(self, frame): |
| c = self.codec |
| flags = 0 |
| if frame.bof: flags |= 0x08 |
| if frame.eof: flags |= 0x04 |
| if frame.bos: flags |= 0x02 |
| if frame.eos: flags |= 0x01 |
| |
| c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1 |
| c.encode_octet(self.spec.constants.byname[frame.type].id) |
| body = StringIO() |
| enc = codec.Codec(body, self.spec) |
| frame.encode(enc) |
| enc.flush() |
| frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size) |
| c.encode_short(frame_size) |
| c.encode_octet(0) # Reserved |
| c.encode_octet(frame.subchannel & 0x0f) |
| c.encode_short(frame.channel) |
| c.encode_long(0) # Reserved |
| c.write(body.getvalue()) |
| c.encode_octet(self.FRAME_END) |
| |
| def read_0_10(self): |
| c = self.codec |
| flags = c.decode_octet() # TODO: currently ignoring flags |
| framing_version = (flags & 0xc0) >> 6 |
| if framing_version != 0: |
| raise FramingError("frame error: unknown framing version") |
| type = self.spec.constants.byid[c.decode_octet()].name |
| frame_size = c.decode_short() |
| if frame_size < 12: # TODO: Magic number (frame header size) |
| raise FramingError("frame error: frame size too small") |
| reserved1 = c.decode_octet() |
| field = c.decode_octet() |
| subchannel = field & 0x0f |
| channel = c.decode_short() |
| reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0 |
| if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0: |
| raise FramingError("frame error: reserved bits not all zero") |
| body_size = frame_size - 12 # TODO: Magic number (frame header size) |
| body = c.read(body_size) |
| dec = codec.Codec(StringIO(body), self.spec) |
| try: |
| frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) |
| except EOF: |
| raise FramingError("truncated frame body: %r" % body) |
| frame.channel = channel |
| frame.subchannel = subchannel |
| end = c.decode_octet() |
| if end != self.FRAME_END: |
| garbage = "" |
| while end != self.FRAME_END: |
| garbage += chr(end) |
| end = c.decode_octet() |
| raise FramingError("frame error: expected %r, got %r" % (self.FRAME_END, garbage)) |
| return frame |
| |
| def write_99_0(self, frame): |
| self.write_0_10(frame) |
| |
| def read_99_0(self): |
| return self.read_0_10() |
| |
| def close(self): |
| self.io.close(); |
| |
| class Frame: |
| |
| DECODERS = {} |
| |
| class __metaclass__(type): |
| |
| def __new__(cls, name, bases, dict): |
| for attr in ("encode", "decode", "type"): |
| if not dict.has_key(attr): |
| raise TypeError("%s must define %s" % (name, attr)) |
| dict["decode"] = staticmethod(dict["decode"]) |
| if dict.has_key("__init__"): |
| __init__ = dict["__init__"] |
| def init(self, *args, **kwargs): |
| args = list(args) |
| self.init(args, kwargs) |
| __init__(self, *args, **kwargs) |
| dict["__init__"] = init |
| t = type.__new__(cls, name, bases, dict) |
| if t.type != None: |
| Frame.DECODERS[t.type] = t |
| return t |
| |
| type = None |
| |
| def init(self, args, kwargs): |
| self.channel = kwargs.pop("channel", 0) |
| self.subchannel = kwargs.pop("subchannel", 0) |
| self.bos = True |
| self.eos = True |
| self.bof = True |
| self.eof = True |
| |
| def encode(self, enc): abstract |
| |
| def decode(spec, dec, size): abstract |
| |
| class Method(Frame): |
| |
| type = "frame_method" |
| |
| def __init__(self, method, args): |
| if len(args) != len(method.fields): |
| argspec = ["%s: %s" % (f.name, f.type) |
| for f in method.fields] |
| raise TypeError("%s.%s expecting (%s), got %s" % |
| (method.klass.name, method.name, ", ".join(argspec), |
| args)) |
| self.method = method |
| self.method_type = method |
| self.args = args |
| self.eof = not method.content |
| |
| def encode(self, c): |
| version = (c.spec.major, c.spec.minor) |
| if version == (0, 10) or version == (99, 0): |
| c.encode_octet(self.method.klass.id) |
| c.encode_octet(self.method.id) |
| else: |
| c.encode_short(self.method.klass.id) |
| c.encode_short(self.method.id) |
| for field, arg in zip(self.method.fields, self.args): |
| c.encode(field.type, arg) |
| |
| def decode(spec, c, size): |
| version = (c.spec.major, c.spec.minor) |
| if version == (0, 10) or version == (99, 0): |
| klass = spec.classes.byid[c.decode_octet()] |
| meth = klass.methods.byid[c.decode_octet()] |
| else: |
| klass = spec.classes.byid[c.decode_short()] |
| meth = klass.methods.byid[c.decode_short()] |
| args = tuple([c.decode(f.type) for f in meth.fields]) |
| return Method(meth, args) |
| |
| def __str__(self): |
| return "[%s] %s %s" % (self.channel, self.method, |
| ", ".join([str(a) for a in self.args])) |
| |
| class Request(Frame): |
| |
| type = "frame_request" |
| |
| def __init__(self, id, response_mark, method): |
| self.id = id |
| self.response_mark = response_mark |
| self.method = method |
| self.method_type = method.method_type |
| self.args = method.args |
| |
| def encode(self, enc): |
| enc.encode_longlong(self.id) |
| enc.encode_longlong(self.response_mark) |
| # reserved |
| enc.encode_long(0) |
| self.method.encode(enc) |
| |
| def decode(spec, dec, size): |
| id = dec.decode_longlong() |
| mark = dec.decode_longlong() |
| # reserved |
| dec.decode_long() |
| method = Method.decode(spec, dec, size - 20) |
| return Request(id, mark, method) |
| |
| def __str__(self): |
| return "[%s] Request(%s) %s" % (self.channel, self.id, self.method) |
| |
| class Response(Frame): |
| |
| type = "frame_response" |
| |
| def __init__(self, id, request_id, batch_offset, method): |
| self.id = id |
| self.request_id = request_id |
| self.batch_offset = batch_offset |
| self.method = method |
| self.method_type = method.method_type |
| self.args = method.args |
| |
| def encode(self, enc): |
| enc.encode_longlong(self.id) |
| enc.encode_longlong(self.request_id) |
| enc.encode_long(self.batch_offset) |
| self.method.encode(enc) |
| |
| def decode(spec, dec, size): |
| id = dec.decode_longlong() |
| request_id = dec.decode_longlong() |
| batch_offset = dec.decode_long() |
| method = Method.decode(spec, dec, size - 20) |
| return Response(id, request_id, batch_offset, method) |
| |
| def __str__(self): |
| return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) |
| |
| def uses_struct_encoding(spec): |
| return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) |
| |
| class Header(Frame): |
| |
| type = "frame_header" |
| |
| def __init__(self, klass, weight, size, properties): |
| self.klass = klass |
| self.weight = weight |
| self.size = size |
| self.properties = properties |
| self.eof = size == 0 |
| self.bof = False |
| |
| def __getitem__(self, name): |
| return self.properties[name] |
| |
| def __setitem__(self, name, value): |
| self.properties[name] = value |
| |
| def __delitem__(self, name): |
| del self.properties[name] |
| |
| def encode(self, c): |
| if uses_struct_encoding(c.spec): |
| self.encode_structs(c) |
| else: |
| self.encode_legacy(c) |
| |
| def encode_structs(self, c): |
| # XXX |
| structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type), |
| qpid.Struct(c.spec.domains.byname["message_properties"].type)] |
| |
| # XXX |
| props = self.properties.copy() |
| for k in self.properties: |
| for s in structs: |
| if s.exists(k): |
| s.set(k, props.pop(k)) |
| if props: |
| raise TypeError("no such property: %s" % (", ".join(props))) |
| |
| # message properties store the content-length now, and weight is |
| # deprecated |
| if self.size != None: |
| structs[1].content_length = self.size |
| |
| for s in structs: |
| c.encode_long_struct(s) |
| |
| def encode_legacy(self, c): |
| c.encode_short(self.klass.id) |
| c.encode_short(self.weight) |
| c.encode_longlong(self.size) |
| |
| # property flags |
| nprops = len(self.klass.fields) |
| flags = 0 |
| for i in range(nprops): |
| f = self.klass.fields.items[i] |
| flags <<= 1 |
| if self.properties.get(f.name) != None: |
| flags |= 1 |
| # the last bit indicates more flags |
| if i > 0 and (i % 15) == 0: |
| flags <<= 1 |
| if nprops > (i + 1): |
| flags |= 1 |
| c.encode_short(flags) |
| flags = 0 |
| flags <<= ((16 - (nprops % 15)) % 16) |
| c.encode_short(flags) |
| |
| # properties |
| for f in self.klass.fields: |
| v = self.properties.get(f.name) |
| if v != None: |
| c.encode(f.type, v) |
| |
| def decode(spec, c, size): |
| if uses_struct_encoding(spec): |
| return Header.decode_structs(spec, c, size) |
| else: |
| return Header.decode_legacy(spec, c, size) |
| |
| def decode_structs(spec, c, size): |
| structs = [] |
| start = c.nread |
| while c.nread - start < size: |
| structs.append(c.decode_long_struct()) |
| |
| # XXX |
| props = {} |
| length = None |
| for s in structs: |
| for f in s.type.fields: |
| if s.has(f.name): |
| props[f.name] = s.get(f.name) |
| if f.name == "content_length": |
| length = s.get(f.name) |
| return Header(None, 0, length, props) |
| |
| decode_structs = staticmethod(decode_structs) |
| |
| def decode_legacy(spec, c, size): |
| klass = spec.classes.byid[c.decode_short()] |
| weight = c.decode_short() |
| size = c.decode_longlong() |
| |
| # property flags |
| bits = [] |
| while True: |
| flags = c.decode_short() |
| for i in range(15, 0, -1): |
| if flags >> i & 0x1 != 0: |
| bits.append(True) |
| else: |
| bits.append(False) |
| if flags & 0x1 == 0: |
| break |
| |
| # properties |
| properties = {} |
| for b, f in zip(bits, klass.fields): |
| if b: |
| # Note: decode returns a unicode u'' string but only |
| # plain '' strings can be used as keywords so we need to |
| # stringify the names. |
| properties[str(f.name)] = c.decode(f.type) |
| return Header(klass, weight, size, properties) |
| |
| decode_legacy = staticmethod(decode_legacy) |
| |
| def __str__(self): |
| return "%s %s %s %s" % (self.klass, self.weight, self.size, |
| self.properties) |
| |
| class Body(Frame): |
| |
| type = "frame_body" |
| |
| def __init__(self, content): |
| self.content = content |
| self.eof = True |
| self.bof = False |
| |
| def encode(self, enc): |
| enc.write(self.content) |
| |
| def decode(spec, dec, size): |
| return Body(dec.read(size)) |
| |
| def __str__(self): |
| return "Body(%r)" % self.content |
| |
| # TODO: |
| # OOB_METHOD = "frame_oob_method" |
| # OOB_HEADER = "frame_oob_header" |
| # OOB_BODY = "frame_oob_body" |
| # TRACE = "frame_trace" |
| # HEARTBEAT = "frame_heartbeat" |