| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from codec010 import StringCodec |
| from framer import * |
| from logging import getLogger |
| |
| log = getLogger("qpid.io.seg") |
| |
| class Segment: |
| |
| def __init__(self, first, last, type, track, channel, payload): |
| self.id = None |
| self.offset = None |
| self.first = first |
| self.last = last |
| self.type = type |
| self.track = track |
| self.channel = channel |
| self.payload = payload |
| |
| def decode(self, spec): |
| segs = spec["segment_type"] |
| choice = segs.choices[self.type] |
| return getattr(self, "decode_%s" % choice.name)(spec) |
| |
| def decode_control(self, spec): |
| sc = StringCodec(spec, self.payload) |
| return sc.read_control() |
| |
| def decode_command(self, spec): |
| sc = StringCodec(spec, self.payload) |
| hdr, cmd = sc.read_command() |
| cmd.id = self.id |
| return hdr, cmd |
| |
| def decode_header(self, spec): |
| sc = StringCodec(spec, self.payload) |
| values = [] |
| while len(sc.encoded) > 0: |
| values.append(sc.read_struct32()) |
| return values |
| |
| def decode_body(self, spec): |
| return self.payload |
| |
| def __str__(self): |
| return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, |
| self.track, self.channel, self.payload) |
| |
| def __repr__(self): |
| return str(self) |
| |
| class Assembler(Framer): |
| |
| def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD): |
| Framer.__init__(self, sock) |
| self.max_payload = max_payload |
| self.fragments = {} |
| |
| def read_segment(self): |
| while True: |
| frame = self.read_frame() |
| |
| key = (frame.channel, frame.track) |
| seg = self.fragments.get(key) |
| if seg == None: |
| seg = Segment(frame.isFirstSegment(), frame.isLastSegment(), |
| frame.type, frame.track, frame.channel, "") |
| self.fragments[key] = seg |
| |
| seg.payload += frame.payload |
| |
| if frame.isLastFrame(): |
| self.fragments.pop(key) |
| log.debug("RECV %s", seg) |
| return seg |
| |
| def write_segment(self, segment): |
| remaining = segment.payload |
| |
| first = True |
| while first or remaining: |
| payload = remaining[:self.max_payload] |
| remaining = remaining[self.max_payload:] |
| |
| flags = 0 |
| if first: |
| flags |= FIRST_FRM |
| first = False |
| if not remaining: |
| flags |= LAST_FRM |
| if segment.first: |
| flags |= FIRST_SEG |
| if segment.last: |
| flags |= LAST_SEG |
| |
| frame = Frame(flags, segment.type, segment.track, segment.channel, |
| payload) |
| self.write_frame(frame) |
| |
| log.debug("SENT %s", segment) |