blob: 5c687898dcdb11e6e190a1e12b1181b277f40140 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
An AMQP client implementation that uses a custom delegate for
interacting with the server.
"""
import os, threading
from peer import Peer, Channel, Closed
from delegate import Delegate
from util import get_client_properties_with_defaults
from connection08 import Connection, Frame, connect
from spec08 import load
from queue import Queue
from reference import ReferenceId, References
from saslmech.finder import get_sasl_mechanism
from saslmech.sasl import SaslException
class Client:
def __init__(self, host, port, spec = None, vhost = None):
self.host = host
self.port = port
if spec:
self.spec = spec
else:
from specs_config import amqp_spec_0_9
self.spec = load(amqp_spec_0_9)
self.structs = StructFactory(self.spec)
self.sessions = {}
self.mechanism = None
self.response = None
self.locale = None
self.sasl = None
self.vhost = vhost
if self.vhost == None:
self.vhost = "/"
self.queues = {}
self.lock = threading.Lock()
self.closed = False
self.reason = None
self.started = threading.Event()
self.peer = None
def wait(self):
self.started.wait()
if self.closed:
raise Closed(self.reason)
def queue(self, key):
self.lock.acquire()
try:
try:
q = self.queues[key]
except KeyError:
q = Queue(0)
self.queues[key] = q
finally:
self.lock.release()
return q
def start(self, response=None, mechanism=None, locale="en_US", tune_params=None,
username=None, password=None,
client_properties=None, connection_options=None, sasl_options = None,
channel_options=None):
if response is not None and (username is not None or password is not None):
raise RuntimeError("client must not specify both response and (username, password).")
if response is not None:
self.response = response
authzid, self.username, self.password = response.split("\0")
else:
self.username = username
self.password = password
self.mechanism = mechanism
self.locale = locale
self.tune_params = tune_params
self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties, version_property_key="version")
self.sasl_options = sasl_options
self.socket = connect(self.host, self.port, connection_options)
self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), Session, channel_options)
self.conn.init()
self.peer.start()
self.wait()
self.channel(0).connection_open(self.vhost)
def channel(self, id):
self.lock.acquire()
try:
ssn = self.peer.channel(id)
ssn.client = self
self.sessions[id] = ssn
finally:
self.lock.release()
return ssn
def session(self):
self.lock.acquire()
try:
id = None
for i in xrange(1, 64*1024):
if not self.sessions.has_key(i):
id = i
break
finally:
self.lock.release()
if id == None:
raise RuntimeError("out of channels")
else:
return self.channel(id)
def close(self):
if self.peer:
try:
if not self.closed:
channel = self.channel(0);
if channel and not channel._closed:
try:
channel.connection_close(reply_code=200)
except:
pass
self.closed = True
finally:
self.peer.stop()
class ClientDelegate(Delegate):
def __init__(self, client):
Delegate.__init__(self)
self.client = client
def connection_start(self, ch, msg):
if self.client.mechanism is None and self.client.response is not None:
# Supports users passing the response argument
self.client.mechanism = "PLAIN"
serverSupportedMechs = msg.frame.args[3].split()
if self.client.mechanism is None:
self.client.sasl = get_sasl_mechanism(serverSupportedMechs, self.client.username, self.client.password,
sasl_options=self.client.sasl_options)
else:
if self.client.mechanism not in serverSupportedMechs:
raise SaslException("sasl negotiation failed: no mechanism agreed. Client requested: '%s' Server supports: %s"
% (self.client.mechanism, serverSupportedMechs))
self.client.sasl = get_sasl_mechanism([self.client.mechanism], self.client.username, self.client.password,
sasl_options=self.client.sasl_options)
if self.client.sasl is None:
raise SaslException("sasl negotiation failed: no mechanism agreed. Client requested: '%s' Server supports: %s"
% (self.client.mechanism, serverSupportedMechs))
self.client.mechanism = self.client.sasl.mechanismName()
if self.client.response is None:
self.client.response = self.client.sasl.initialResponse()
msg.start_ok(mechanism=self.client.mechanism,
response=self.client.response or "",
locale=self.client.locale,
client_properties=self.client.client_properties)
def connection_secure(self, ch, msg):
msg.secure_ok(response=self.client.sasl.response(msg.challenge))
def connection_tune(self, ch, msg):
if self.client.tune_params:
#todo: just override the params, i.e. don't require them
# all to be included in tune_params
msg.tune_ok(**self.client.tune_params)
else:
msg.tune_ok(*msg.frame.args)
self.client.started.set()
def message_transfer(self, ch, msg):
self.client.queue(msg.destination).put(msg)
def message_open(self, ch, msg):
ch.references.open(msg.reference)
def message_close(self, ch, msg):
ch.references.close(msg.reference)
def message_append(self, ch, msg):
ch.references.get(msg.reference).append(msg.bytes)
def message_acquired(self, ch, msg):
ch.control_queue.put(msg)
def basic_deliver(self, ch, msg):
self.client.queue(msg.consumer_tag).put(msg)
def channel_pong(self, ch, msg):
msg.ok()
def channel_close(self, ch, msg):
ch.closed(msg)
def channel_flow(self, ch, msg):
# On resuming we don't want to send a message before flow-ok has been sent.
# Therefore, we send flow-ok before we set the flow_control flag.
if msg.active:
msg.flow_ok()
ch.set_flow_control(not msg.active)
# On suspending we don't want to send a message after flow-ok has been sent.
# Therefore, we send flow-ok after we set the flow_control flag.
if not msg.active:
msg.flow_ok()
def session_ack(self, ch, msg):
pass
def session_closed(self, ch, msg):
ch.closed(msg)
def connection_close(self, ch, msg):
self.client.peer.closed(msg)
def execution_complete(self, ch, msg):
ch.completion.complete(msg.cumulative_execution_mark)
def execution_result(self, ch, msg):
future = ch.futures[msg.command_id]
future.put_response(ch, msg.data)
def closed(self, reason):
self.client.closed = True
self.client.reason = reason
self.client.started.set()
class StructFactory:
def __init__(self, spec):
self.spec = spec
self.factories = {}
def __getattr__(self, name):
if self.factories.has_key(name):
return self.factories[name]
elif self.spec.domains.byname.has_key(name):
f = lambda *args, **kwargs: self.struct(name, *args, **kwargs)
self.factories[name] = f
return f
else:
raise AttributeError(name)
def struct(self, name, *args, **kwargs):
return self.spec.struct(name, *args, **kwargs)
class Session(Channel):
def __init__(self, *args):
Channel.__init__(self, *args)
self.references = References()
self.client = None
def open(self):
self.session_open()
def close(self):
self.session_close()
self.client.lock.acquire()
try:
del self.client.sessions[self.id]
finally:
self.client.lock.release()