blob: cfbc72b5b4f6766ab192f7d3ec58faa7fd5663c0 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, common
from proton import *
# future test areas
# + different permutations of setup
# - creating deliveries and calling input/output before opening the session/link
# + shrinking output_size down to something small? should the enginge buffer?
# + resuming
# - locally and remotely created deliveries with the same tag
OUTPUT_SIZE = 10*1024
def pump(t1, t2):
while True:
out1 = t1.output(OUTPUT_SIZE)
out2 = t2.output(OUTPUT_SIZE)
if out1 or out2:
if out1:
n = t2.input(out1)
assert n is None or n == len(out1), (n, out1, len(out1))
if out2:
n = t1.input(out2)
assert n is None or n == len(out2), (n, out2, len(out2))
else:
return
class Test(common.Test):
def __init__(self, *args):
common.Test.__init__(self, *args)
self._wires = []
def connection(self):
c1 = Connection()
c2 = Connection()
t1 = Transport()
t1.bind(c1)
c1._transport = t1
t2 = Transport()
t2.bind(c2)
c2._transport = t2
self._wires.append((c1, t1, c2, t2))
trc = os.environ.get("PN_TRACE_FRM")
if trc and trc.lower() in ("1", "2", "yes", "true"):
t1.trace(Transport.TRACE_FRM)
if trc == "2":
t2.trace(Transport.TRACE_FRM)
return c1, c2
def link(self, name, max_frame=None):
c1, c2 = self.connection()
if max_frame:
c1._transport.max_frame_size = max_frame[0]
c2._transport.max_frame_size = max_frame[1]
c1.open()
c2.open()
ssn1 = c1.session()
ssn1.open()
self.pump()
ssn2 = c2.session_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
ssn2.open()
self.pump()
snd = ssn1.sender(name)
rcv = ssn2.receiver(name)
return snd, rcv
def cleanup(self):
pass
def pump(self):
for c1, t1, c2, t2 in self._wires:
pump(t1, t2)
class ConnectionTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
def teardown(self):
self.cleanup()
def test_open_close(self):
assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
self.c1.open()
self.pump()
assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE
self.c2.open()
self.pump()
assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.c1.close()
self.pump()
assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
self.c2.close()
self.pump()
assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_simultaneous_open_close(self):
assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
self.c1.open()
self.c2.open()
self.pump()
assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.c1.close()
self.c2.close()
self.pump()
assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_capabilities(self):
self.c1.offered_capabilities.put_array(False, Data.SYMBOL)
self.c1.offered_capabilities.enter()
self.c1.offered_capabilities.put_symbol("O_one")
self.c1.offered_capabilities.put_symbol("O_two")
self.c1.offered_capabilities.put_symbol("O_three")
self.c1.offered_capabilities.exit()
self.c1.desired_capabilities.put_array(False, Data.SYMBOL)
self.c1.desired_capabilities.enter()
self.c1.desired_capabilities.put_symbol("D_one")
self.c1.desired_capabilities.put_symbol("D_two")
self.c1.desired_capabilities.put_symbol("D_three")
self.c1.desired_capabilities.exit()
self.c1.open()
assert self.c2.remote_offered_capabilities.format() == ""
assert self.c2.remote_desired_capabilities.format() == ""
self.pump()
assert self.c2.remote_offered_capabilities.format() == self.c1.offered_capabilities.format()
assert self.c2.remote_desired_capabilities.format() == self.c1.desired_capabilities.format()
class SessionTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
self.ssn = self.c1.session()
self.c1.open()
self.c2.open()
def teardown(self):
self.cleanup()
def test_open_close(self):
assert self.ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
self.ssn.open()
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
self.pump()
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
assert ssn != None
assert ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
ssn.open()
assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
self.pump()
assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
ssn.close()
assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.pump()
assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
self.ssn.close()
assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
self.pump()
assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_simultaneous_close(self):
self.ssn.open()
self.pump()
ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
assert ssn != None
ssn.open()
self.pump()
assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.ssn.close()
ssn.close()
assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
self.pump()
assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_closing_connection(self):
self.ssn.open()
self.pump()
self.c1.close()
self.pump()
self.ssn.close()
self.pump()
class LinkTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
def teardown(self):
self.cleanup()
def test_open_close(self):
assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
self.snd.open()
assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
self.pump()
assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE
self.rcv.open()
assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.pump()
assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.snd.close()
assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.pump()
assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
self.rcv.close()
assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
self.pump()
assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_simultaneous_open_close(self):
assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
self.snd.open()
self.rcv.open()
assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
self.pump()
assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.snd.close()
self.rcv.close()
assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
self.pump()
assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_multiple(self):
rcv = self.snd.session.receiver("second-rcv")
self.snd.open()
rcv.open()
self.pump()
c2 = self.rcv.session.connection
l = c2.link_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
while l:
l.open()
l = l.next(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
self.pump()
assert self.snd
assert rcv
self.snd.close()
rcv.close()
ssn = rcv.session
conn = ssn.connection
ssn.close()
conn.close()
self.pump()
def test_closing_session(self):
self.snd.open()
self.rcv.open()
ssn1 = self.snd.session
self.pump()
ssn1.close()
self.pump()
self.snd.close()
self.pump()
def test_closing_connection(self):
self.snd.open()
self.rcv.open()
ssn1 = self.snd.session
c1 = ssn1.connection
self.pump()
c1.close()
self.pump()
self.snd.close()
self.pump()
def assertEqualTermini(self, t1, t2):
assert t1.type == t2.type, (t1.type, t2.type)
assert t1.address == t2.address, (t1.address, t2.address)
assert t1.durability == t2.durability, (t1.durability, t2.durability)
assert t1.expiry_policy == t2.expiry_policy, (t1.expiry_policy, t2.expiry_policy)
assert t1.timeout == t2.timeout, (t1.timeout, t2.timeout)
assert t1.dynamic == t2.dynamic, (t1.dynamic, t2.dynamic)
for attr in ["properties", "capabilities", "outcomes", "filter"]:
d1 = getattr(t1, attr)
d2 = getattr(t2, attr)
assert d1.format() == d2.format(), (attr, d1.format(), d2.format())
def _test_source_target(self, config_source, config_target):
if config_source is None:
self.snd.source.type = Terminus.UNSPECIFIED
else:
config_source(self.snd.source)
if config_target is None:
self.snd.target.type = Terminus.UNSPECIFIED
else:
config_target(self.snd.target)
self.snd.open()
self.pump()
self.assertEqualTermini(self.rcv.remote_source, self.snd.source)
self.assertEqualTermini(self.rcv.remote_target, self.snd.target)
self.rcv.target.copy(self.rcv.remote_target)
self.rcv.source.copy(self.rcv.remote_source)
self.rcv.open()
self.pump()
self.assertEqualTermini(self.snd.remote_target, self.snd.target)
self.assertEqualTermini(self.snd.remote_source, self.snd.source)
def test_source_target(self):
self._test_source_target(TerminusConfig(address="source"),
TerminusConfig(address="target"))
def test_source(self):
self._test_source_target(TerminusConfig(address="source"), None)
def test_target(self):
self._test_source_target(None, TerminusConfig(address="target"))
def test_source_target_full(self):
self._test_source_target(TerminusConfig(address="source",
timeout=3,
filter=[("int", 1), ("symbol", "two"), ("string", "three")],
capabilities=["one", "two", "three"]),
TerminusConfig(address="source",
timeout=7,
capabilities=[]))
class TerminusConfig:
def __init__(self, address=None, timeout=None, durability=None, filter=None,
capabilities=None):
self.address = address
self.timeout = timeout
self.durability = durability
self.filter = filter
self.capabilities = capabilities
def __call__(self, terminus):
if self.address is not None:
terminus.address = self.address
if self.timeout is not None:
terminus.timeout = self.timeout
if self.durability is not None:
terminus.durability = self.durability
if self.capabilities is not None:
terminus.capabilities.put_array(False, Data.SYMBOL)
terminus.capabilities.enter()
for c in self.capabilities:
terminus.capabilities.put_symbol(c)
if self.filter is not None:
terminus.filter.put_list()
terminus.filter.enter()
for (t, v) in self.filter:
setter = getattr(terminus.filter, "put_%s" % t)
setter(v)
class TransferTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
self.c1 = self.snd.session.connection
self.c2 = self.rcv.session.connection
self.snd.open()
self.rcv.open()
self.pump()
def teardown(self):
self.cleanup()
def test_work_queue(self):
assert self.c1.work_head is None
self.snd.delivery("tag")
assert self.c1.work_head is None
self.rcv.flow(1)
self.pump()
d = self.c1.work_head
assert d is not None
tag = d.tag
assert tag == "tag", tag
assert d.writable
n = self.snd.send("this is a test")
assert self.snd.advance()
assert self.c1.work_head is None
self.pump()
d = self.c2.work_head
assert d.tag == "tag"
assert d.readable
def test_multiframe(self):
self.rcv.flow(1)
self.snd.delivery("tag")
msg = "this is a test"
n = self.snd.send(msg)
assert n == len(msg)
self.pump()
d = self.rcv.current
assert d
assert d.tag == "tag", repr(d.tag)
assert d.readable
bytes = self.rcv.recv(1024)
assert bytes == msg
bytes = self.rcv.recv(1024)
assert bytes == ""
msg = "this is more"
n = self.snd.send(msg)
assert n == len(msg)
assert self.snd.advance()
self.pump()
bytes = self.rcv.recv(1024)
assert bytes == msg
bytes = self.rcv.recv(1024)
assert bytes is None
def test_disposition(self):
self.rcv.flow(1)
self.pump()
sd = self.snd.delivery("tag")
msg = "this is a test"
n = self.snd.send(msg)
assert n == len(msg)
assert self.snd.advance()
self.pump()
rd = self.rcv.current
assert rd is not None
assert rd.tag == sd.tag
rmsg = self.rcv.recv(1024)
assert rmsg == msg
rd.update(Delivery.ACCEPTED)
self.pump()
rdisp = sd.remote_state
ldisp = rd.local_state
assert rdisp == ldisp == Delivery.ACCEPTED, (rdisp, ldisp)
assert sd.updated
sd.update(Delivery.ACCEPTED)
sd.settle()
self.pump()
assert sd.local_state == rd.remote_state == Delivery.ACCEPTED
class MaxFrameTransferTest(Test):
def setup(self):
pass
def teardown(self):
self.cleanup()
def message(self, size):
parts = []
for i in range(size):
parts.append(str(i))
return "/".join(parts)[:size]
def testMinFrame(self):
"""
Configure receiver to support minimum max-frame as defined by AMQP-1.0.
Verify transfer of messages larger than 512.
"""
self.snd, self.rcv = self.link("test-link", max_frame=[0,512])
self.c1 = self.snd.session.connection
self.c2 = self.rcv.session.connection
self.snd.open()
self.rcv.open()
self.pump()
assert self.rcv.session.connection._transport.max_frame_size == 512
assert self.snd.session.connection._transport.remote_max_frame_size == 512
self.rcv.flow(1)
self.snd.delivery("tag")
msg = self.message(513)
n = self.snd.send(msg)
assert n == len(msg)
assert self.snd.advance()
self.pump()
bytes = self.rcv.recv(513)
assert bytes == msg
bytes = self.rcv.recv(1024)
assert bytes == None
def testOddFrame(self):
"""
Test an odd sized max limit with data that will require multiple frames to
be transfered.
"""
self.snd, self.rcv = self.link("test-link", max_frame=[0,521])
self.c1 = self.snd.session.connection
self.c2 = self.rcv.session.connection
self.snd.open()
self.rcv.open()
self.pump()
assert self.rcv.session.connection._transport.max_frame_size == 521
assert self.snd.session.connection._transport.remote_max_frame_size == 521
self.rcv.flow(2)
self.snd.delivery("tag")
msg = "X" * 1699
n = self.snd.send(msg)
assert n == len(msg)
assert self.snd.advance()
self.pump()
bytes = self.rcv.recv(1699)
assert bytes == msg
bytes = self.rcv.recv(1024)
assert bytes == None
self.rcv.advance()
self.snd.delivery("gat")
msg = self.message(1426)
n = self.snd.send(msg)
assert n == len(msg)
assert self.snd.advance()
self.pump()
bytes = self.rcv.recv(1426)
assert bytes == msg
self.pump()
bytes = self.rcv.recv(1024)
assert bytes == None
class CreditTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
self.c1 = self.snd.session.connection
self.c2 = self.rcv.session.connection
self.snd.open()
self.rcv.open()
self.pump()
def teardown(self):
self.cleanup()
def testCreditSender(self):
credit = self.snd.credit
assert credit == 0, credit
self.rcv.flow(10)
self.pump()
credit = self.snd.credit
assert credit == 10, credit
self.rcv.flow(PN_SESSION_WINDOW)
self.pump()
credit = self.snd.credit
assert credit == 10 + PN_SESSION_WINDOW, credit
def testCreditReceiver(self):
self.rcv.flow(10)
self.pump()
assert self.rcv.credit == 10, self.rcv.credit
d = self.snd.delivery("tag")
assert d
assert self.snd.advance()
self.pump()
assert self.rcv.credit == 10, self.rcv.credit
assert self.rcv.queued == 1, self.rcv.queued
c = self.rcv.current
assert c.tag == "tag", c.tag
assert self.rcv.advance()
assert self.rcv.credit == 9, self.rcv.credit
assert self.rcv.queued == 0, self.rcv.queued
def settle(self):
result = []
d = self.c1.work_head
while d:
if d.updated:
result.append(d.tag)
d.settle()
d = d.work_next
return result
def testBuffering(self):
self.rcv.flow(PN_SESSION_WINDOW + 10)
self.pump()
assert self.rcv.queued == 0, self.rcv.queued
idx = 0
while self.snd.credit:
d = self.snd.delivery("tag%s" % idx)
assert d
assert self.snd.advance()
self.pump()
idx += 1
assert idx == PN_SESSION_WINDOW + 10, idx
assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
extra = self.snd.delivery("extra")
assert extra
assert self.snd.advance()
self.pump()
assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
for i in range(10):
d = self.rcv.current
assert d.tag == "tag%s" % i, d.tag
assert self.rcv.advance()
d.settle()
self.pump()
assert self.rcv.queued == PN_SESSION_WINDOW - (i+1), self.rcv.queued
tags = self.settle()
assert tags == ["tag%s" % i for i in range(10)], tags
self.pump()
assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
for i in range(PN_SESSION_WINDOW):
d = self.rcv.current
assert d, i
assert d.tag == "tag%s" % (i+10), d.tag
assert self.rcv.advance()
d.settle()
self.pump()
assert self.rcv.queued == 0, self.rcv.queued
tags = self.settle()
assert tags == ["tag%s" % (i+10) for i in range(PN_SESSION_WINDOW)]
assert self.rcv.queued == 0, self.rcv.queued
def _testBufferingOnClose(self, a, b):
for i in range(10):
d = self.snd.delivery("tag-%s" % i)
assert d
d.settle()
self.pump()
assert self.snd.queued == 10
endpoints = {"connection": (self.c1, self.c2),
"session": (self.snd.session, self.rcv.session),
"link": (self.snd, self.rcv)}
local_a, remote_a = endpoints[a]
local_b, remote_b = endpoints[b]
remote_b.close()
self.pump()
assert local_b.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
local_a.close()
self.pump()
assert remote_a.state & Endpoint.REMOTE_CLOSED
assert self.snd.queued == 10
def testBufferingOnCloseLinkLink(self):
self._testBufferingOnClose("link", "link")
def testBufferingOnCloseLinkSession(self):
self._testBufferingOnClose("link", "session")
def testBufferingOnCloseLinkConnection(self):
self._testBufferingOnClose("link", "connection")
def testBufferingOnCloseSessionLink(self):
self._testBufferingOnClose("session", "link")
def testBufferingOnCloseSessionSession(self):
self._testBufferingOnClose("session", "session")
def testBufferingOnCloseSessionConnection(self):
self._testBufferingOnClose("session", "connection")
def testBufferingOnCloseConnectionLink(self):
self._testBufferingOnClose("connection", "link")
def testBufferingOnCloseConnectionSession(self):
self._testBufferingOnClose("connection", "session")
def testBufferingOnCloseConnectionConnection(self):
self._testBufferingOnClose("connection", "connection")
def testCreditWithBuffering(self):
self.rcv.flow(PN_SESSION_WINDOW + 10)
self.pump()
assert self.snd.credit == PN_SESSION_WINDOW + 10, self.snd.credit
assert self.rcv.queued == 0, self.rcv.queued
idx = 0
while self.snd.credit:
d = self.snd.delivery("tag%s" % idx)
assert d
assert self.snd.advance()
self.pump()
idx += 1
assert idx == PN_SESSION_WINDOW + 10, idx
assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
self.rcv.flow(1)
self.pump()
assert self.snd.credit == 1, self.snd.credit
def testFullDrain(self):
assert self.rcv.credit == 0
assert self.snd.credit == 0
self.rcv.drain(10)
assert self.rcv.credit == 10
assert self.snd.credit == 0
self.pump()
assert self.rcv.credit == 10
assert self.snd.credit == 10
self.snd.drained()
assert self.rcv.credit == 10
assert self.snd.credit == 0
self.pump()
assert self.rcv.credit == 0
assert self.snd.credit == 0
def testPartialDrain(self):
self.rcv.drain(2)
self.pump()
d = self.snd.delivery("tag")
assert d
assert self.snd.advance()
self.snd.drained()
self.pump()
c = self.rcv.current
assert self.rcv.queued == 1, self.rcv.queued
assert c.tag == d.tag, c.tag
assert self.rcv.advance()
assert not self.rcv.current
assert self.rcv.credit == 0, self.rcv.credit
def testDrainFlow(self):
assert self.rcv.credit == 0
assert self.snd.credit == 0
self.rcv.drain(10)
assert self.rcv.credit == 10
assert self.snd.credit == 0
self.pump()
assert self.rcv.credit == 10
assert self.snd.credit == 10
self.snd.drained()
assert self.rcv.credit == 10
assert self.snd.credit == 0
self.pump()
assert self.rcv.credit == 0
assert self.snd.credit == 0
self.rcv.flow(10)
assert self.rcv.credit == 10
assert self.snd.credit == 0
self.pump()
assert self.rcv.credit == 10
assert self.snd.credit == 10
self.snd.drained()
assert self.rcv.credit == 10
assert self.snd.credit == 10
self.pump()
assert self.rcv.credit == 10
assert self.snd.credit == 10
def testNegative(self):
assert self.snd.credit == 0
d = self.snd.delivery("tag")
assert d
assert self.snd.advance()
self.pump()
assert self.rcv.credit == 0
assert self.rcv.queued == 0
self.rcv.flow(1)
assert self.rcv.credit == 1
assert self.rcv.queued == 0
self.pump()
assert self.rcv.credit == 1
assert self.rcv.queued == 1
c = self.rcv.current
assert c
assert c.tag == "tag"
assert self.rcv.advance()
assert self.rcv.credit == 0
assert self.rcv.queued == 0
def testDrainZero(self):
assert self.snd.credit == 0
assert self.rcv.credit == 0
assert self.rcv.queued == 0
self.rcv.flow(10)
self.pump()
assert self.snd.credit == 10
assert self.rcv.credit == 10
assert self.rcv.queued == 0
self.snd.drained()
self.pump()
assert self.snd.credit == 10
assert self.rcv.credit == 10
assert self.rcv.queued == 0
self.rcv.drain(0)
assert self.snd.credit == 10
assert self.rcv.credit == 10
assert self.rcv.queued == 0
self.pump()
assert self.snd.credit == 10
assert self.rcv.credit == 10
assert self.rcv.queued == 0
self.snd.drained()
assert self.snd.credit == 0
assert self.rcv.credit == 10
assert self.rcv.queued == 0
self.pump()
assert self.snd.credit == 0
assert self.rcv.credit == 0
assert self.rcv.queued == 0
class SettlementTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
self.c1 = self.snd.session.connection
self.c2 = self.rcv.session.connection
self.snd.open()
self.rcv.open()
self.pump()
def teardown(self):
self.cleanup()
def testSettleCurrent(self):
self.rcv.flow(10)
self.pump()
assert self.snd.credit == 10, self.snd.credit
d = self.snd.delivery("tag")
e = self.snd.delivery("tag2")
assert d
assert e
c = self.snd.current
assert c.tag == "tag", c.tag
c.settle()
c = self.snd.current
assert c.tag == "tag2", c.tag
c.settle()
c = self.snd.current
assert not c
self.pump()
c = self.rcv.current
assert c
assert c.tag == "tag", c.tag
assert c.settled
c.settle()
c = self.rcv.current
assert c
assert c.tag == "tag2", c.tag
assert c.settled
c.settle()
c = self.rcv.current
assert not c
def testUnsettled(self):
self.rcv.flow(10)
self.pump()
assert self.snd.unsettled == 0, self.snd.unsettled
assert self.rcv.unsettled == 0, self.rcv.unsettled
d = self.snd.delivery("tag")
assert d
assert self.snd.unsettled == 1, self.snd.unsettled
assert self.rcv.unsettled == 0, self.rcv.unsettled
assert self.snd.advance()
self.pump()
assert self.snd.unsettled == 1, self.snd.unsettled
assert self.rcv.unsettled == 1, self.rcv.unsettled
c = self.rcv.current
assert c
c.settle()
assert self.snd.unsettled == 1, self.snd.unsettled
assert self.rcv.unsettled == 0, self.rcv.unsettled
class PipelineTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
def teardown(self):
self.cleanup()
def test(self):
ssn = self.c1.session()
snd = ssn.sender("sender")
self.c1.open()
ssn.open()
snd.open()
for i in range(10):
d = snd.delivery("delivery-%s" % i)
snd.send("delivery-%s" % i)
d.settle()
snd.close()
ssn.close()
self.c1.close()
self.pump()
state = self.c2.state
assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
ssn2 = self.c2.session_head(Endpoint.LOCAL_UNINIT)
assert ssn2
state == ssn2.state
assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
rcv = self.c2.link_head(Endpoint.LOCAL_UNINIT)
assert rcv
state = rcv.state
assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
self.c2.open()
ssn2.open()
rcv.open()
rcv.flow(10)
assert rcv.queued == 0, rcv.queued
self.pump()
assert rcv.queued == 10, rcv.queued
state = rcv.state
assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
state = ssn2.state
assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
state = self.c2.state
assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
for i in range(rcv.queued):
d = rcv.current
assert d
assert d.tag == "delivery-%s" % i
d.settle()
assert rcv.queued == 0, rcv.queued