| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import os, connection, session |
| from util import notify, get_client_properties_with_defaults |
| from datatypes import RangedSet |
| from exceptions import VersionError, Closed |
| from logging import getLogger |
| from ops import Control |
| import sys |
| from qpid import sasl |
| |
| log = getLogger("qpid.io.ctl") |
| |
| class Delegate: |
| |
| def __init__(self, connection, delegate=session.client): |
| self.connection = connection |
| self.delegate = delegate |
| |
| def received(self, op): |
| ssn = self.connection.attached.get(op.channel) |
| if ssn is None: |
| ch = connection.Channel(self.connection, op.channel) |
| else: |
| ch = ssn.channel |
| |
| if isinstance(op, Control): |
| log.debug("RECV %s", op) |
| getattr(self, op.NAME)(ch, op) |
| elif ssn is None: |
| ch.session_detached() |
| else: |
| ssn.received(op) |
| |
| def connection_close(self, ch, close): |
| self.connection.close_code = (close.reply_code, close.reply_text) |
| ch.connection_close_ok() |
| raise Closed(close.reply_text) |
| |
| def connection_close_ok(self, ch, close_ok): |
| self.connection.opened = False |
| self.connection.closed = True |
| notify(self.connection.condition) |
| |
| def connection_heartbeat(self, ch, hrt): |
| pass |
| |
| def session_attach(self, ch, a): |
| try: |
| self.connection.attach(a.name, ch, self.delegate, a.force) |
| ch.session_attached(a.name) |
| except connection.ChannelBusy: |
| ch.session_detached(a.name) |
| except connection.SessionBusy: |
| ch.session_detached(a.name) |
| |
| def session_attached(self, ch, a): |
| notify(ch.session.condition) |
| |
| def session_detach(self, ch, d): |
| #send back the confirmation of detachment before removing the |
| #channel from the attached set; this avoids needing to hold the |
| #connection lock during the sending of this control and ensures |
| #that if the channel is immediately reused for a new session the |
| #attach request will follow the detached notification. |
| ch.session_detached(d.name) |
| ssn = self.connection.detach(d.name, ch) |
| |
| def session_detached(self, ch, d): |
| self.connection.detach(d.name, ch) |
| |
| def session_request_timeout(self, ch, rt): |
| ch.session_timeout(rt.timeout); |
| |
| def session_command_point(self, ch, cp): |
| ssn = ch.session |
| ssn.receiver.next_id = cp.command_id |
| ssn.receiver.next_offset = cp.command_offset |
| |
| def session_completed(self, ch, cmp): |
| ch.session.sender.completed(cmp.commands) |
| if cmp.timely_reply: |
| ch.session_known_completed(cmp.commands) |
| notify(ch.session.condition) |
| |
| def session_known_completed(self, ch, kn_cmp): |
| ch.session.receiver.known_completed(kn_cmp.commands) |
| |
| def session_flush(self, ch, f): |
| rcv = ch.session.receiver |
| if f.expected: |
| if rcv.next_id == None: |
| exp = None |
| else: |
| exp = RangedSet(rcv.next_id) |
| ch.session_expected(exp) |
| if f.confirmed: |
| ch.session_confirmed(rcv._completed) |
| if f.completed: |
| ch.session_completed(rcv._completed) |
| |
| class Server(Delegate): |
| |
| def start(self): |
| self.connection.read_header() |
| # XXX |
| self.connection.write_header(0, 10) |
| connection.Channel(self.connection, 0).connection_start(mechanisms=["ANONYMOUS"]) |
| |
| def connection_start_ok(self, ch, start_ok): |
| ch.connection_tune(channel_max=65535) |
| |
| def connection_tune_ok(self, ch, tune_ok): |
| pass |
| |
| def connection_open(self, ch, open): |
| self.connection.opened = True |
| ch.connection_open_ok() |
| notify(self.connection.condition) |
| |
| class Client(Delegate): |
| |
| def __init__(self, connection, username=None, password=None, |
| mechanism=None, heartbeat=None, **kwargs): |
| Delegate.__init__(self, connection) |
| provided_client_properties = kwargs.get("client_properties") |
| self.client_properties=get_client_properties_with_defaults(provided_client_properties) |
| |
| ## |
| ## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to |
| ## use. If it's None, then any mechanism is acceptable. |
| ## |
| self.acceptableMechanisms = None |
| if mechanism: |
| self.acceptableMechanisms = mechanism.split(" ") |
| self.heartbeat = heartbeat |
| self.username = username |
| self.password = password |
| |
| self.sasl = sasl.Client() |
| if username and len(username) > 0: |
| self.sasl.setAttr("username", str(username)) |
| if password and len(password) > 0: |
| self.sasl.setAttr("password", str(password)) |
| self.sasl.setAttr("service", str(kwargs.get("service", "qpidd"))) |
| if "host" in kwargs: |
| self.sasl.setAttr("host", str(kwargs["host"])) |
| if "min_ssf" in kwargs: |
| self.sasl.setAttr("minssf", kwargs["min_ssf"]) |
| if "max_ssf" in kwargs: |
| self.sasl.setAttr("maxssf", kwargs["max_ssf"]) |
| self.sasl.init() |
| |
| def start(self): |
| # XXX |
| cli_major = 0 |
| cli_minor = 10 |
| self.connection.write_header(cli_major, cli_minor) |
| magic, _, _, major, minor = self.connection.read_header() |
| if not (magic == "AMQP" and major == cli_major and minor == cli_minor): |
| raise VersionError("client: %s-%s, server: %s-%s" % |
| (cli_major, cli_minor, major, minor)) |
| |
| def connection_start(self, ch, start): |
| mech_list = "" |
| for mech in start.mechanisms: |
| if (not self.acceptableMechanisms) or mech in self.acceptableMechanisms: |
| mech_list += str(mech) + " " |
| mech = None |
| initial = None |
| try: |
| mech, initial = self.sasl.start(mech_list) |
| except Exception, e: |
| raise Closed(str(e)) |
| ch.connection_start_ok(client_properties=self.client_properties, |
| mechanism=mech, response=initial) |
| |
| def connection_secure(self, ch, secure): |
| resp = None |
| try: |
| resp = self.sasl.step(secure.challenge) |
| except Exception, e: |
| raise Closed(str(e)) |
| ch.connection_secure_ok(response=resp) |
| |
| def connection_tune(self, ch, tune): |
| ch.connection_tune_ok(heartbeat=self.heartbeat) |
| ch.connection_open() |
| self.connection.user_id = self.sasl.auth_username() |
| self.connection.security_layer_tx = self.sasl |
| |
| def connection_open_ok(self, ch, open_ok): |
| self.connection.security_layer_rx = self.sasl |
| self.connection.opened = True |
| notify(self.connection.condition) |
| |
| def connection_heartbeat(self, ch, hrt): |
| ch.connection_heartbeat() |