blob: 8da068ebe097090fc81759bf17f99dff8d873463 [file] [log] [blame]
from __future__ import absolute_import
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, sys, traceback
from . import common
from proton import *
from threading import Thread, Event
from time import sleep, time
from .common import Skipped
class Test(common.Test):
def setUp(self):
self.server_credit = 10
self.server_received = 0
self.server_finite_credit = False
self.server = Messenger("server")
self.server.timeout = self.timeout
self.server.start()
self.port = common.free_tcp_port()
self.server.subscribe("amqp://~127.0.0.1:%d" % self.port)
self.server_thread = Thread(name="server-thread", target=self.run_server)
self.server_thread.daemon = True
self.server_is_running_event = Event()
self.running = True
self.server_thread_started = False
self.client = Messenger("client")
self.client.timeout = self.timeout
def start(self):
self.server_thread_started = True
self.server_thread.start()
self.server_is_running_event.wait(self.timeout)
self.client.start()
def _safelyStopClient(self):
self.server.interrupt()
self.client.stop()
self.client = None
def tearDown(self):
try:
if self.running:
if not self.server_thread_started: self.start()
# send a message to cause the server to promptly exit
self.running = False
self._safelyStopClient()
finally:
self.server_thread.join(self.timeout)
self.server = None
REJECT_ME = "*REJECT-ME*"
class MessengerTest(Test):
def run_server(self):
if self.server_finite_credit:
self._run_server_finite_credit()
else:
self._run_server_recv()
def _run_server_recv(self):
""" Use recv() to replenish credit each time the server waits
"""
msg = Message()
try:
while self.running:
self.server_is_running_event.set()
try:
self.server.recv(self.server_credit)
self.process_incoming(msg)
except Interrupt:
pass
finally:
self.server.stop()
self.running = False
def _run_server_finite_credit(self):
""" Grant credit once, process until credit runs out
"""
msg = Message()
self.server_is_running_event.set()
try:
self.server.recv(self.server_credit)
while self.running:
try:
# do not grant additional credit (eg. call recv())
self.process_incoming(msg)
self.server.work()
except Interrupt:
break
finally:
self.server.stop()
self.running = False
def process_incoming(self, msg):
while self.server.incoming:
self.server.get(msg)
self.server_received += 1
if msg.body == REJECT_ME:
self.server.reject()
else:
self.server.accept()
self.dispatch(msg)
def dispatch(self, msg):
if msg.reply_to:
msg.address = msg.reply_to
self.server.put(msg)
self.server.settle()
def testSendReceive(self, size=None, address_size=None):
self.start()
msg = Message()
if address_size:
msg.address="amqp://127.0.0.1:%d/%s" % (self.port, "x"*address_size)
else:
msg.address="amqp://127.0.0.1:%d" % self.port
msg.reply_to = "~"
msg.subject="Hello World!"
body = "First the world, then the galaxy!"
if size is not None:
while len(body) < size:
body = 2*body
body = body[:size]
msg.body = body
self.client.put(msg)
self.client.send()
reply = Message()
self.client.recv(1)
assert self.client.incoming == 1, self.client.incoming
self.client.get(reply)
assert reply.subject == "Hello World!"
rbod = reply.body
assert rbod == body, (rbod, body)
def testSendReceive1K(self):
self.testSendReceive(1024)
def testSendReceive2K(self):
self.testSendReceive(2*1024)
def testSendReceive4K(self):
self.testSendReceive(4*1024)
def testSendReceive10K(self):
self.testSendReceive(10*1024)
def testSendReceive100K(self):
self.testSendReceive(100*1024)
def testSendReceive1M(self):
self.testSendReceive(1024*1024)
def testSendReceiveLargeAddress(self):
self.testSendReceive(address_size=2048)
# PROTON-285 - prevent continually failing test
def xtestSendBogus(self):
self.start()
msg = Message()
msg.address="totally-bogus-address"
try:
self.client.put(msg)
assert False, "Expecting MessengerException"
except MessengerException:
exc = sys.exc_info()[1]
err = str(exc)
assert "unable to send to address: totally-bogus-address" in err, err
def testOutgoingWindow(self):
self.server.incoming_window = 10
self.start()
msg = Message()
msg.address="amqp://127.0.0.1:%d" % self.port
msg.subject="Hello World!"
trackers = []
for i in range(10):
trackers.append(self.client.put(msg))
self.client.send()
for t in trackers:
assert self.client.status(t) is None
# reduce outgoing_window to 5 and then try to send 10 messages
self.client.outgoing_window = 5
trackers = []
for i in range(10):
trackers.append(self.client.put(msg))
for i in range(5):
t = trackers[i]
assert self.client.status(t) is None, (t, self.client.status(t))
for i in range(5, 10):
t = trackers[i]
assert self.client.status(t) is PENDING, (t, self.client.status(t))
self.client.send()
for i in range(5):
t = trackers[i]
assert self.client.status(t) is None
for i in range(5, 10):
t = trackers[i]
assert self.client.status(t) is ACCEPTED
def testReject(self, process_incoming=None):
if process_incoming:
self.process_incoming = process_incoming
self.server.incoming_window = 10
self.start()
msg = Message()
msg.address="amqp://127.0.0.1:%d" % self.port
msg.subject="Hello World!"
self.client.outgoing_window = 10
trackers = []
rejected = []
for i in range(10):
if i == 5:
msg.body = REJECT_ME
else:
msg.body = "Yay!"
trackers.append(self.client.put(msg))
if msg.body == REJECT_ME:
rejected.append(trackers[-1])
self.client.send()
for t in trackers:
if t in rejected:
assert self.client.status(t) is REJECTED, (t, self.client.status(t))
else:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
def testRejectIndividual(self):
self.testReject(self.reject_individual)
def reject_individual(self, msg):
if self.server.incoming < 10:
self.server.work(0)
return
while self.server.incoming:
t = self.server.get(msg)
if msg.body == REJECT_ME:
self.server.reject(t)
self.dispatch(msg)
self.server.accept()
def testIncomingWindow(self):
self.server.incoming_window = 10
self.server.outgoing_window = 10
self.start()
msg = Message()
msg.address="amqp://127.0.0.1:%d" % self.port
msg.reply_to = "~"
msg.subject="Hello World!"
self.client.outgoing_window = 10
trackers = []
for i in range(10):
trackers.append(self.client.put(msg))
self.client.send()
for t in trackers:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
self.client.incoming_window = 10
remaining = 10
trackers = []
while remaining:
self.client.recv(remaining)
while self.client.incoming:
t = self.client.get()
trackers.append(t)
self.client.accept(t)
remaining -= 1
for t in trackers:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
def testIncomingQueueBiggerThanWindow(self, size=10):
self.server.outgoing_window = size
self.client.incoming_window = size
self.start()
msg = Message()
msg.address = "amqp://127.0.0.1:%d" % self.port
msg.reply_to = "~"
msg.subject = "Hello World!"
for i in range(2*size):
self.client.put(msg)
trackers = []
while len(trackers) < 2*size:
self.client.recv(2*size - len(trackers))
while self.client.incoming:
t = self.client.get(msg)
assert self.client.status(t) is SETTLED, (t, self.client.status(t))
trackers.append(t)
for t in trackers[:size]:
assert self.client.status(t) is None, (t, self.client.status(t))
for t in trackers[size:]:
assert self.client.status(t) is SETTLED, (t, self.client.status(t))
self.client.accept()
for t in trackers[:size]:
assert self.client.status(t) is None, (t, self.client.status(t))
for t in trackers[size:]:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
def testIncomingQueueBiggerThanSessionWindow(self):
self.testIncomingQueueBiggerThanWindow(2048)
def testBuffered(self):
self.client.outgoing_window = 1000
self.client.incoming_window = 1000
self.start();
assert self.server_received == 0
buffering = 0
count = 100
for i in range(count):
msg = Message()
msg.address="amqp://127.0.0.1:%d" % self.port
msg.subject="Hello World!"
msg.body = "First the world, then the galaxy!"
t = self.client.put(msg)
buffered = self.client.buffered(t)
# allow transition from False to True, but not back
if buffered:
buffering += 1
else:
assert not buffering, ("saw %s buffered deliveries before?" % buffering)
while self.client.outgoing:
last = self.client.outgoing
self.client.send()
#print "sent ", last - self.client.outgoing
assert self.server_received == count
def test_proton222(self):
self.start()
msg = Message()
msg.address="amqp://127.0.0.1:%d" % self.port
msg.subject="Hello World!"
msg.body = "First the world, then the galaxy!"
assert self.server_received == 0
self.client.put(msg)
self.client.send()
# ensure the server got the message without requiring client to stop first
deadline = time() + 10
while self.server_received == 0:
assert time() < deadline, "Server did not receive message!"
sleep(.1)
assert self.server_received == 1
def testUnlimitedCredit(self):
""" Bring up two links. Verify credit is granted to each link by
transferring a message over each.
"""
self.server_credit = -1
self.start()
msg = Message()
msg.address="amqp://127.0.0.1:%d/XXX" % self.port
msg.reply_to = "~"
msg.subject="Hello World!"
body = "First the world, then the galaxy!"
msg.body = body
self.client.put(msg)
self.client.send()
reply = Message()
self.client.recv(1)
assert self.client.incoming == 1
self.client.get(reply)
assert reply.subject == "Hello World!"
rbod = reply.body
assert rbod == body, (rbod, body)
msg = Message()
msg.address="amqp://127.0.0.1:%d/YYY" % self.port
msg.reply_to = "~"
msg.subject="Hello World!"
body = "First the world, then the galaxy!"
msg.body = body
self.client.put(msg)
self.client.send()
reply = Message()
self.client.recv(1)
assert self.client.incoming == 1
self.client.get(reply)
assert reply.subject == "Hello World!"
rbod = reply.body
assert rbod == body, (rbod, body)
def _DISABLE_test_proton268(self):
""" Reproducer for JIRA Proton-268 """
""" DISABLED: Causes failure on Jenkins, appears to be unrelated to fix """
self.server_credit = 2048
self.start()
msg = Message()
msg.address="amqp://127.0.0.1:%d" % self.port
msg.body = "X" * 1024
for x in range( 100 ):
self.client.put( msg )
self.client.send()
try:
self.client.stop()
except Timeout:
assert False, "Timeout waiting for client stop()"
# need to restart client, as tearDown() uses it to stop server
self.client.start()
def testRoute(self):
# anonymous cipher not supported on Windows
if os.name == "nt" or not common.isSSLPresent():
domain = "amqp"
else:
domain = "amqps"
port = common.free_tcp_port()
self.server.subscribe(domain + "://~0.0.0.0:%d" % port)
self.start()
self.client.route("route1", "amqp://127.0.0.1:%d" % self.port)
self.client.route("route2", domain + "://127.0.0.1:%d" % port)
msg = Message()
msg.address = "route1"
msg.reply_to = "~"
msg.body = "test"
self.client.put(msg)
self.client.recv(1)
reply = Message()
self.client.get(reply)
msg = Message()
msg.address = "route2"
msg.reply_to = "~"
msg.body = "test"
self.client.put(msg)
self.client.recv(1)
self.client.get(reply)
assert reply.body == "test"
def testDefaultRoute(self):
self.start()
self.client.route("*", "amqp://127.0.0.1:%d" % self.port)
msg = Message()
msg.address = "asdf"
msg.body = "test"
msg.reply_to = "~"
self.client.put(msg)
self.client.recv(1)
reply = Message()
self.client.get(reply)
assert reply.body == "test"
def testDefaultRouteSubstitution(self):
self.start()
self.client.route("*", "amqp://127.0.0.1:%d/$1" % self.port)
msg = Message()
msg.address = "asdf"
msg.body = "test"
msg.reply_to = "~"
self.client.put(msg)
self.client.recv(1)
reply = Message()
self.client.get(reply)
assert reply.body == "test"
def testIncomingRoute(self):
self.start()
port = common.free_tcp_port()
self.client.route("in", "amqp://~0.0.0.0:%d" % port)
self.client.subscribe("in")
msg = Message()
msg.address = "amqp://127.0.0.1:%d" %self.port
msg.reply_to = "amqp://127.0.0.1:%d" % port
msg.body = "test"
self.client.put(msg)
self.client.recv(1)
reply = Message()
self.client.get(reply)
assert reply.body == "test"
def echo_address(self, msg):
while self.server.incoming:
self.server.get(msg)
msg.body = msg.address
self.dispatch(msg)
def _testRewrite(self, original, rewritten):
self.start()
self.process_incoming = self.echo_address
self.client.route("*", "amqp://127.0.0.1:%d" % self.port)
msg = Message()
msg.address = original
msg.body = "test"
msg.reply_to = "~"
self.client.put(msg)
assert msg.address == original
self.client.recv(1)
assert self.client.incoming == 1
echo = Message()
self.client.get(echo)
assert echo.body == rewritten, (echo.body, rewritten)
assert msg.address == original
def testDefaultRewriteH(self):
self._testRewrite("original", "original")
def testDefaultRewriteUH(self):
self._testRewrite("user@original", "original")
def testDefaultRewriteUPH(self):
self._testRewrite("user:pass@original", "original")
def testDefaultRewriteHP(self):
self._testRewrite("original:123", "original:123")
def testDefaultRewriteUHP(self):
self._testRewrite("user@original:123", "original:123")
def testDefaultRewriteUPHP(self):
self._testRewrite("user:pass@original:123", "original:123")
def testDefaultRewriteHN(self):
self._testRewrite("original/name", "original/name")
def testDefaultRewriteUHN(self):
self._testRewrite("user@original/name", "original/name")
def testDefaultRewriteUPHN(self):
self._testRewrite("user:pass@original/name", "original/name")
def testDefaultRewriteHPN(self):
self._testRewrite("original:123/name", "original:123/name")
def testDefaultRewriteUHPN(self):
self._testRewrite("user@original:123/name", "original:123/name")
def testDefaultRewriteUPHPN(self):
self._testRewrite("user:pass@original:123/name", "original:123/name")
def testDefaultRewriteSH(self):
self._testRewrite("amqp://original", "amqp://original")
def testDefaultRewriteSUH(self):
self._testRewrite("amqp://user@original", "amqp://original")
def testDefaultRewriteSUPH(self):
self._testRewrite("amqp://user:pass@original", "amqp://original")
def testDefaultRewriteSHP(self):
self._testRewrite("amqp://original:123", "amqp://original:123")
def testDefaultRewriteSUHP(self):
self._testRewrite("amqp://user@original:123", "amqp://original:123")
def testDefaultRewriteSUPHP(self):
self._testRewrite("amqp://user:pass@original:123", "amqp://original:123")
def testDefaultRewriteSHN(self):
self._testRewrite("amqp://original/name", "amqp://original/name")
def testDefaultRewriteSUHN(self):
self._testRewrite("amqp://user@original/name", "amqp://original/name")
def testDefaultRewriteSUPHN(self):
self._testRewrite("amqp://user:pass@original/name", "amqp://original/name")
def testDefaultRewriteSHPN(self):
self._testRewrite("amqp://original:123/name", "amqp://original:123/name")
def testDefaultRewriteSUHPN(self):
self._testRewrite("amqp://user@original:123/name", "amqp://original:123/name")
def testDefaultRewriteSUPHPN(self):
self._testRewrite("amqp://user:pass@original:123/name", "amqp://original:123/name")
def testRewriteSupress(self):
self.client.rewrite("*", None)
self._testRewrite("asdf", None)
def testRewrite(self):
self.client.rewrite("a", "b")
self._testRewrite("a", "b")
def testRewritePattern(self):
self.client.rewrite("amqp://%@*", "amqp://$2")
self._testRewrite("amqp://foo@bar", "amqp://bar")
def testRewriteToAt(self):
self.client.rewrite("amqp://%/*", "$2@$1")
self._testRewrite("amqp://domain/name", "name@domain")
def testRewriteOverrideDefault(self):
self.client.rewrite("*", "$1")
self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host")
def testCreditBlockingRebalance(self):
""" The server is given a fixed amount of credit, and runs until that
credit is exhausted.
"""
self.server_finite_credit = True
self.server_credit = 11
self.start()
# put one message out on "Link1" - since there are no other links, it
# should get all the credit (10 after sending)
msg = Message()
msg.address="amqp://127.0.0.1:%d/Link1" % self.port
msg.subject="Hello World!"
body = "First the world, then the galaxy!"
msg.body = body
msg.reply_to = "~"
self.client.put(msg)
self.client.send()
self.client.recv(1)
assert self.client.incoming == 1
# Now attempt to exhaust credit using a different link
for i in range(10):
msg.address="amqp://127.0.0.1:%d/Link2" % self.port
self.client.put(msg)
self.client.send()
deadline = time() + self.timeout
count = 0
while count < 11 and time() < deadline:
self.client.recv(-1)
while self.client.incoming:
self.client.get(msg)
count += 1
assert count == 11, count
# now attempt to send one more. There isn't enough credit, so it should
# not be sent
self.client.timeout = 1
msg.address="amqp://127.0.0.1:%d/Link2" % self.port
self.client.put(msg)
try:
self.client.send()
assert False, "expected client to time out in send()"
except Timeout:
pass
assert self.client.outgoing == 1
class NBMessengerTest(common.Test):
def setUp(self):
self.client = Messenger("client")
self.client2 = Messenger("client2")
self.server = Messenger("server")
self.messengers = [self.client, self.client2, self.server]
self.client.blocking = False
self.client2.blocking = False
self.server.blocking = False
self.server.start()
self.client.start()
self.client2.start()
port = common.free_tcp_port()
self.address = "amqp://127.0.0.1:%d" % port
self.server.subscribe("amqp://~0.0.0.0:%d" % port)
def _pump(self, timeout, work_triggers_exit):
for msgr in self.messengers:
if msgr.work(timeout) and work_triggers_exit:
return True
return False
def pump(self, timeout=0):
while self._pump(0, True): pass
self._pump(timeout, False)
while self._pump(0, True): pass
def tearDown(self):
self.server.stop()
self.client.stop()
self.client2.stop()
self.pump()
assert self.server.stopped
assert self.client.stopped
assert self.client2.stopped
def testSmoke(self, count=1):
self.server.recv()
msg = Message()
msg.address = self.address
for i in range(count):
msg.body = "Hello %s" % i
self.client.put(msg)
msg2 = Message()
for i in range(count):
if self.server.incoming == 0:
self.pump()
assert self.server.incoming > 0, self.server.incoming
self.server.get(msg2)
assert msg2.body == "Hello %s" % i, (msg2.body, i)
assert self.client.outgoing == 0, self.client.outgoing
assert self.server.incoming == 0, self.client.incoming
def testSmoke1024(self):
self.testSmoke(1024)
def testSmoke4096(self):
self.testSmoke(4096)
def testPushback(self):
self.server.recv()
msg = Message()
msg.address = self.address
for i in range(16):
for i in range(1024):
self.client.put(msg)
self.pump()
if self.client.outgoing > 0:
break
assert self.client.outgoing > 0
def testRecvBeforeSubscribe(self):
self.client.recv()
self.client.subscribe(self.address + "/foo")
self.pump()
msg = Message()
msg.address = "amqp://client/foo"
msg.body = "Hello World!"
self.server.put(msg)
assert self.client.incoming == 0
self.pump(self.delay)
assert self.client.incoming == 1
msg2 = Message()
self.client.get(msg2)
assert msg2.address == msg.address
assert msg2.body == msg.body
def testCreditAutoBackpressure(self):
""" Verify that use of automatic credit (pn_messenger_recv(-1)) does not
fill the incoming queue indefinitely. If the receiver does not 'get' the
message, eventually the sender will block. See PROTON-350 """
self.server.recv()
msg = Message()
msg.address = self.address
deadline = time() + self.timeout
while time() < deadline:
old = self.server.incoming
for j in range(1001):
self.client.put(msg)
self.pump()
if old == self.server.incoming:
break;
assert old == self.server.incoming, "Backpressure not active!"
def testCreditRedistribution(self):
""" Verify that a fixed amount of credit will redistribute to new
links.
"""
self.server.recv( 5 )
# first link will get all credit
msg1 = Message()
msg1.address = self.address + "/msg1"
self.client.put(msg1)
self.pump()
assert self.server.incoming == 1, self.server.incoming
assert self.server.receiving == 4, self.server.receiving
# no credit left over for this link
msg2 = Message()
msg2.address = self.address + "/msg2"
self.client.put(msg2)
self.pump()
assert self.server.incoming == 1, self.server.incoming
assert self.server.receiving == 4, self.server.receiving
# eventually, credit will rebalance and the new link will send
deadline = time() + self.timeout
while time() < deadline:
sleep(.1)
self.pump()
if self.server.incoming == 2:
break;
assert self.server.incoming == 2, self.server.incoming
assert self.server.receiving == 3, self.server.receiving
def testCreditReclaim(self):
""" Verify that credit is reclaimed when a link with outstanding credit is
torn down.
"""
self.server.recv( 9 )
# first link will get all credit
msg1 = Message()
msg1.address = self.address + "/msg1"
self.client.put(msg1)
self.pump()
assert self.server.incoming == 1, self.server.incoming
assert self.server.receiving == 8, self.server.receiving
# no credit left over for this link
msg2 = Message()
msg2.address = self.address + "/msg2"
self.client.put(msg2)
self.pump()
assert self.server.incoming == 1, self.server.incoming
assert self.server.receiving == 8, self.server.receiving
# and none for this new client
msg3 = Message()
msg3.address = self.address + "/msg3"
self.client2.put(msg3)
self.pump()
# eventually, credit will rebalance and all links will
# send a message
deadline = time() + self.timeout
while time() < deadline:
sleep(.1)
self.pump()
if self.server.incoming == 3:
break;
assert self.server.incoming == 3, self.server.incoming
assert self.server.receiving == 6, self.server.receiving
# now tear down client two, this should cause its outstanding credit to be
# made available to the other links
self.client2.stop()
self.pump()
for i in range(4):
self.client.put(msg1)
self.client.put(msg2)
# should exhaust all credit
deadline = time() + self.timeout
while time() < deadline:
sleep(.1)
self.pump()
if self.server.incoming == 9:
break;
assert self.server.incoming == 9, self.server.incoming
assert self.server.receiving == 0, self.server.receiving
def testCreditReplenish(self):
""" When extra credit is available it should be granted to the first
link that can use it.
"""
# create three links
msg = Message()
for i in range(3):
msg.address = self.address + "/%d" % i
self.client.put(msg)
self.server.recv( 50 ) # 50/3 = 16 per link + 2 extra
self.pump()
assert self.server.incoming == 3, self.server.incoming
assert self.server.receiving == 47, self.server.receiving
# 47/3 = 15 per link, + 2 extra
# verify one link can send 15 + the two extra (17)
for i in range(17):
msg.address = self.address + "/0"
self.client.put(msg)
self.pump()
assert self.server.incoming == 20, self.server.incoming
assert self.server.receiving == 30, self.server.receiving
# now verify that the remaining credit (30) will eventually rebalance
# across all links (10 per link)
for j in range(10):
for i in range(3):
msg.address = self.address + "/%d" % i
self.client.put(msg)
deadline = time() + self.timeout
while time() < deadline:
sleep(.1)
self.pump()
if self.server.incoming == 50:
break
assert self.server.incoming == 50, self.server.incoming
assert self.server.receiving == 0, self.server.receiving
from select import select
class Pump:
def __init__(self, *messengers):
self.messengers = messengers
self.selectables = []
def pump_once(self):
for m in self.messengers:
while True:
sel = m.selectable()
if sel:
self.selectables.append(sel)
else:
break
reading = []
writing = []
for sel in self.selectables[:]:
if sel.is_terminal:
sel.release()
self.selectables.remove(sel)
else:
if sel.reading:
reading.append(sel)
if sel.writing:
writing.append(sel)
readable, writable, _ = select(reading, writing, [], 0.1)
count = 0
for s in readable:
s.readable()
count += 1
for s in writable:
s.writable()
count += 1
return count
def pump(self):
while self.pump_once(): pass
class SelectableMessengerTest(common.Test):
def testSelectable(self, count = 1):
if os.name=="nt":
# Conflict between native OS select() in Pump and IOCP based pn_selector_t
# makes this fail on Windows (see PROTON-668).
raise Skipped("Invalid test on Windows with IOCP.")
mrcv = Messenger()
mrcv.passive = True
port = common.free_tcp_port()
mrcv.subscribe("amqp://~0.0.0.0:%d" % port)
msnd = Messenger()
msnd.passive = True
m = Message()
m.address = "amqp://127.0.0.1:%d" % port
for i in range(count):
m.body = u"Hello World! %s" % i
msnd.put(m)
p = Pump(msnd, mrcv)
p.pump()
assert msnd.outgoing == count
assert mrcv.incoming == 0
mrcv.recv()
mc = Message()
try:
for i in range(count):
while mrcv.incoming == 0:
p.pump()
assert mrcv.incoming > 0, (count, msnd.outgoing, mrcv.incoming)
mrcv.get(mc)
assert mc.body == u"Hello World! %s" % i, (i, mc.body)
finally:
mrcv.stop()
msnd.stop()
assert not mrcv.stopped
assert not msnd.stopped
p.pump()
assert mrcv.stopped
assert msnd.stopped
def testSelectable16(self):
self.testSelectable(count=16)
def testSelectable1024(self):
self.testSelectable(count=1024)
def testSelectable4096(self):
self.testSelectable(count=4096)
class IdleTimeoutTest(common.Test):
def testIdleTimeout(self):
"""
Verify that a Messenger connection is kept alive using empty idle frames
when a idle_timeout is advertised by the remote peer.
"""
if "java" in sys.platform:
raise Skipped()
idle_timeout_secs = self.delay
try:
idle_server = common.TestServer(idle_timeout=idle_timeout_secs)
idle_server.timeout = self.timeout
idle_server.start()
idle_client = Messenger("idle_client")
idle_client.timeout = self.timeout
idle_client.start()
idle_client.subscribe("amqp://%s:%s/foo" %
(idle_server.host, idle_server.port))
idle_client.work(idle_timeout_secs/10)
# wait up to 3x the idle timeout and hence verify that everything stays
# connected during that time by virtue of no Exception being raised
duration = 3 * idle_timeout_secs
deadline = time() + duration
while time() <= deadline:
idle_client.work(idle_timeout_secs/10)
continue
# confirm link is still active
assert not idle_server.conditions, idle_server.conditions
finally:
try:
idle_client.stop()
except:
pass
try:
idle_server.stop()
except:
pass