blob: 512fa62189e74306bf60e0a1f85a8291da5c5dda [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from threading import *
from unittest import TestCase
from qpid.util import connect, listen
from qpid.connection import *
from qpid.datatypes import Message
from qpid.testlib import testrunner
from qpid.delegates import Server
from qpid.queue import Queue
from qpid.spec010 import load
from qpid.session import Delegate
PORT = 1234
class TestServer:
def __init__(self, queue):
self.queue = queue
def connection(self, connection):
return Server(connection, delegate=self.session)
def session(self, session):
session.auto_sync = False
return TestSession(session, self.queue)
class TestSession(Delegate):
def __init__(self, session, queue):
self.session = session
self.queue = queue
def execution_sync(self, es):
pass
def queue_query(self, qq):
return qq._type.result.type.new((qq.queue,), {})
def message_transfer(self, cmd, headers, body):
if cmd.destination == "echo":
m = Message(body)
m.headers = headers
self.session.message_transfer(cmd.destination, cmd.accept_mode,
cmd.acquire_mode, m)
elif cmd.destination == "abort":
self.session.channel.connection.sock.close()
else:
self.queue.put((cmd, headers, body))
class ConnectionTest(TestCase):
def setUp(self):
self.spec = load(testrunner.get_spec_file("amqp.0-10.xml"))
self.queue = Queue()
self.running = True
started = Event()
def run():
ts = TestServer(self.queue)
for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
conn = Connection(s, self.spec, ts.connection)
try:
conn.start(5)
except Closed:
pass
self.server = Thread(target=run)
self.server.setDaemon(True)
self.server.start()
started.wait(3)
assert started.isSet()
def tearDown(self):
self.running = False
connect("0.0.0.0", PORT).close()
self.server.join(3)
def connect(self):
return Connection(connect("0.0.0.0", PORT), self.spec)
def test(self):
c = self.connect()
c.start(10)
ssn1 = c.session("test1", timeout=10)
ssn2 = c.session("test2", timeout=10)
assert ssn1 == c.sessions["test1"]
assert ssn2 == c.sessions["test2"]
assert ssn1.channel != None
assert ssn2.channel != None
assert ssn1 in c.attached.values()
assert ssn2 in c.attached.values()
ssn1.close(5)
assert ssn1.channel == None
assert ssn1 not in c.attached.values()
assert ssn2 in c.sessions.values()
ssn2.close(5)
assert ssn2.channel == None
assert ssn2 not in c.attached.values()
assert ssn2 not in c.sessions.values()
ssn = c.session("session", timeout=10)
assert ssn.channel != None
assert ssn in c.sessions.values()
destinations = ("one", "two", "three")
for d in destinations:
ssn.message_transfer(d)
for d in destinations:
cmd, header, body = self.queue.get(10)
assert cmd.destination == d
assert header == None
assert body == None
msg = Message("this is a test")
ssn.message_transfer("four", message=msg)
cmd, header, body = self.queue.get(10)
assert cmd.destination == "four"
assert header == None
assert body == msg.body
qq = ssn.queue_query("asdf")
assert qq.queue == "asdf"
c.close(5)
def testCloseGet(self):
c = self.connect()
c.start(10)
ssn = c.session("test", timeout=10)
echos = ssn.incoming("echo")
for i in range(10):
ssn.message_transfer("echo", message=Message("test%d" % i))
ssn.auto_sync=False
ssn.message_transfer("abort")
for i in range(10):
m = echos.get(timeout=10)
assert m.body == "test%d" % i
try:
m = echos.get(timeout=10)
assert False
except Closed, e:
pass
def testCloseListen(self):
c = self.connect()
c.start(10)
ssn = c.session("test", timeout=10)
echos = ssn.incoming("echo")
messages = []
exceptions = []
condition = Condition()
def listener(m): messages.append(m)
def exc_listener(e):
exceptions.append(e)
condition.acquire()
condition.notify()
condition.release()
echos.listen(listener, exc_listener)
for i in range(10):
ssn.message_transfer("echo", message=Message("test%d" % i))
ssn.auto_sync=False
ssn.message_transfer("abort")
condition.acquire()
condition.wait(10)
condition.release()
for i in range(10):
m = messages.pop(0)
assert m.body == "test%d" % i
assert len(exceptions) == 1
def testSync(self):
c = self.connect()
c.start(10)
s = c.session("test")
s.auto_sync = False
s.message_transfer("echo", message=Message("test"))
s.sync(10)