blob: 6847285f693e5ae656a802eae1edc06fb4236cfe [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import time
from threading import *
from unittest import TestCase
from qpid.util import connect, listen
from qpid.connection import *
from qpid.datatypes import Message
from qpid.delegates import Server
from qpid.queue import Queue
from qpid.session import Delegate
from qpid.ops import QueueQueryResult
PORT = 1234
class TestServer:
def __init__(self, queue):
self.queue = queue
def connection(self, connection):
return Server(connection, delegate=self.session)
def session(self, session):
session.auto_sync = False
return TestSession(session, self.queue)
class TestSession(Delegate):
def __init__(self, session, queue):
self.session = session
self.queue = queue
def execution_sync(self, es):
pass
def queue_query(self, qq):
return QueueQueryResult(qq.queue)
def message_transfer(self, cmd):
if cmd.destination == "echo":
m = Message(cmd.payload)
m.headers = cmd.headers
self.session.message_transfer(cmd.destination, cmd.accept_mode,
cmd.acquire_mode, m)
elif cmd.destination == "abort":
self.session.channel.connection.sock.close()
elif cmd.destination == "heartbeat":
self.session.channel.connection_heartbeat()
else:
self.queue.put(cmd)
class ConnectionTest(TestCase):
def setUp(self):
self.queue = Queue()
self.running = True
started = Event()
def run():
ts = TestServer(self.queue)
for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
conn = Connection(s, delegate=ts.connection)
try:
conn.start(5)
except Closed:
pass
self.server = Thread(target=run)
self.server.setDaemon(True)
self.server.start()
started.wait(3)
assert started.isSet()
def tearDown(self):
self.running = False
connect("127.0.0.1", PORT).close()
self.server.join(3)
def connect(self, **kwargs):
return Connection(connect("127.0.0.1", PORT), **kwargs)
def test(self):
c = self.connect()
c.start(10)
ssn1 = c.session("test1", timeout=10)
ssn2 = c.session("test2", timeout=10)
assert ssn1 == c.sessions["test1"]
assert ssn2 == c.sessions["test2"]
assert ssn1.channel != None
assert ssn2.channel != None
assert ssn1 in c.attached.values()
assert ssn2 in c.attached.values()
ssn1.close(5)
assert ssn1.channel == None
assert ssn1 not in c.attached.values()
assert ssn2 in c.sessions.values()
ssn2.close(5)
assert ssn2.channel == None
assert ssn2 not in c.attached.values()
assert ssn2 not in c.sessions.values()
ssn = c.session("session", timeout=10)
assert ssn.channel != None
assert ssn in c.sessions.values()
destinations = ("one", "two", "three")
for d in destinations:
ssn.message_transfer(d)
for d in destinations:
cmd = self.queue.get(10)
assert cmd.destination == d
assert cmd.headers == None
assert cmd.payload == None
msg = Message("this is a test")
ssn.message_transfer("four", message=msg)
cmd = self.queue.get(10)
assert cmd.destination == "four"
assert cmd.headers == None
assert cmd.payload == msg.body
qq = ssn.queue_query("asdf")
assert qq.queue == "asdf"
c.close(5)
def testCloseGet(self):
c = self.connect()
c.start(10)
ssn = c.session("test", timeout=10)
echos = ssn.incoming("echo")
for i in range(10):
ssn.message_transfer("echo", message=Message("test%d" % i))
ssn.auto_sync=False
ssn.message_transfer("abort")
for i in range(10):
m = echos.get(timeout=10)
assert m.body == "test%d" % i
try:
m = echos.get(timeout=10)
assert False
except Closed, e:
pass
def testCloseListen(self):
c = self.connect()
c.start(10)
ssn = c.session("test", timeout=10)
echos = ssn.incoming("echo")
messages = []
exceptions = []
condition = Condition()
def listener(m): messages.append(m)
def exc_listener(e):
condition.acquire()
exceptions.append(e)
condition.notify()
condition.release()
echos.listen(listener, exc_listener)
for i in range(10):
ssn.message_transfer("echo", message=Message("test%d" % i))
ssn.auto_sync=False
ssn.message_transfer("abort")
condition.acquire()
start = time.time()
elapsed = 0
while not exceptions and elapsed < 10:
condition.wait(10 - elapsed)
elapsed = time.time() - start
condition.release()
for i in range(10):
m = messages.pop(0)
assert m.body == "test%d" % i
assert len(exceptions) == 1
def testSync(self):
c = self.connect()
c.start(10)
s = c.session("test")
s.auto_sync = False
s.message_transfer("echo", message=Message("test"))
s.sync(10)
def testHeartbeat(self):
c = self.connect(heartbeat=10)
c.start(10)
s = c.session("test")
s.channel.connection_heartbeat()
s.message_transfer("heartbeat")