blob: 29859c8c945e81c31fc3b46bed51727eeb957108 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, common, xproton
from xproton import *
# future test areas
# + different permutations of setup
# - creating deliveries and calling input/output before opening the session/link
# + shrinking output_size down to something small? should the enginge buffer?
# + resuming
# - locally and remotely created deliveries with the same tag
OUTPUT_SIZE = 10*1024
def pump(t1, t2):
while True:
cd, out1 = pn_output(t1, OUTPUT_SIZE)
assert cd >= 0 or cd == PN_EOS, (cd, out1, len(out1))
cd, out2 = pn_output(t2, OUTPUT_SIZE)
assert cd >= 0 or cd == PN_EOS, (cd, out2, len(out2))
if out1 or out2:
if out1:
cd = pn_input(t2, out1)
assert cd == PN_EOS or cd == len(out1), (cd, out1, len(out1))
if out2:
cd = pn_input(t1, out2)
assert cd == PN_EOS or cd == len(out2), (cd, out2, len(out2))
else:
return
class Test(common.Test):
def __init__(self, *args):
common.Test.__init__(self, *args)
self._wires = []
def connection(self):
c1 = pn_connection()
c2 = pn_connection()
t1 = pn_transport(c1)
t2 = pn_transport(c2)
self._wires.append((c1, t1, c2, t2))
trc = os.environ.get("PN_TRACE_FRM")
if trc and trc.lower() in ("1", "2", "yes", "true"):
pn_trace(t1, PN_TRACE_FRM)
if trc == "2":
pn_trace(t2, PN_TRACE_FRM)
return c1, c2
def link(self, name):
c1, c2 = self.connection()
pn_connection_open(c1)
pn_connection_open(c2)
ssn1 = pn_session(c1)
pn_session_open(ssn1)
self.pump()
ssn2 = pn_session_head(c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
pn_session_open(ssn2)
self.pump()
snd = pn_sender(ssn1, name)
rcv = pn_receiver(ssn2, name)
return snd, rcv
def cleanup(self):
for c1, t1, c2, t2 in self._wires:
pn_connection_free(c1)
pn_connection_free(c2)
def pump(self):
for c1, t1, c2, t2 in self._wires:
pump(t1, t2)
class ConnectionTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
def teardown(self):
self.cleanup()
def test_open_close(self):
assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
pn_connection_open(self.c1)
self.pump()
assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
pn_connection_open(self.c2)
self.pump()
assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
pn_connection_close(self.c1)
self.pump()
assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
pn_connection_close(self.c2)
self.pump()
assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
def test_simultaneous_open_close(self):
assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
pn_connection_open(self.c1)
pn_connection_open(self.c2)
self.pump()
assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
pn_connection_close(self.c1)
pn_connection_close(self.c2)
self.pump()
assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
class SessionTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
self.ssn = pn_session(self.c1)
pn_connection_open(self.c1)
pn_connection_open(self.c2)
def teardown(self):
self.cleanup()
def test_open_close(self):
assert pn_session_state(self.ssn) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
pn_session_open(self.ssn)
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
self.pump()
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
assert ssn != None
assert pn_session_state(ssn) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
pn_session_open(ssn)
assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
self.pump()
assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
pn_session_close(ssn)
assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
self.pump()
assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
pn_session_close(self.ssn)
assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
self.pump()
assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
def test_simultaneous_close(self):
pn_session_open(self.ssn)
self.pump()
ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
assert ssn != None
pn_session_open(ssn)
self.pump()
assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
pn_session_close(self.ssn)
pn_session_close(ssn)
assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
self.pump()
assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
def test_closing_connection(self):
pn_session_open(self.ssn)
self.pump()
pn_connection_close(self.c1)
self.pump()
pn_session_close(self.ssn)
self.pump()
class LinkTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
def teardown(self):
self.cleanup()
def test_open_close(self):
assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
pn_link_open(self.snd)
assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
self.pump()
assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
pn_link_open(self.rcv)
assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
self.pump()
assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
pn_link_close(self.snd)
assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
self.pump()
assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
pn_link_close(self.rcv)
assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
self.pump()
assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
def test_simultaneous_open_close(self):
assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
pn_link_open(self.snd)
pn_link_open(self.rcv)
assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
self.pump()
assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
pn_link_close(self.snd)
pn_link_close(self.rcv)
assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
self.pump()
assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
def test_multiple(self):
rcv = pn_receiver(pn_get_session(self.snd), "second-rcv")
pn_link_open(self.snd)
pn_link_open(rcv)
self.pump()
c2 = pn_get_connection(pn_get_session(self.rcv))
l = pn_link_head(c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
while l:
pn_link_open(l)
l = pn_link_next(l, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
if l==l:
break
self.pump()
assert self.snd
assert rcv
pn_link_close(self.snd)
pn_link_close(rcv)
ssn = pn_get_session(rcv)
conn = pn_get_connection(ssn)
pn_session_close(ssn)
pn_connection_close(conn)
self.pump()
def test_closing_session(self):
pn_link_open(self.snd)
pn_link_open(self.rcv)
ssn1 = pn_get_session(self.snd)
self.pump()
pn_session_close(ssn1)
self.pump()
pn_link_close(self.snd)
self.pump()
def test_closing_connection(self):
pn_link_open(self.snd)
pn_link_open(self.rcv)
ssn1 = pn_get_session(self.snd)
c1 = pn_get_connection(ssn1)
self.pump()
pn_connection_close(c1)
self.pump()
pn_link_close(self.snd)
self.pump()
class TransferTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
self.c1 = pn_get_connection(pn_get_session(self.snd))
self.c2 = pn_get_connection(pn_get_session(self.rcv))
pn_link_open(self.snd)
pn_link_open(self.rcv)
self.pump()
def teardown(self):
self.cleanup()
def test_work_queue(self):
assert pn_work_head(self.c1) is None
pn_delivery(self.snd, "tag")
assert pn_work_head(self.c1) is None
pn_flow(self.rcv, 1)
self.pump()
d = pn_work_head(self.c1)
assert d is not None
tag = pn_delivery_tag(d)
assert tag == "tag", tag
assert pn_writable(d)
n = pn_send(self.snd, "this is a test")
assert pn_advance(self.snd)
assert pn_work_head(self.c1) is None
self.pump()
d = pn_work_head(self.c2)
assert pn_delivery_tag(d) == "tag"
assert pn_readable(d)
def test_multiframe(self):
pn_flow(self.rcv, 1)
pn_delivery(self.snd, "tag")
msg = "this is a test"
n = pn_send(self.snd, msg)
assert n == len(msg)
self.pump()
d = pn_current(self.rcv)
assert d
assert pn_delivery_tag(d) == "tag", repr(pn_delivery_tag(d))
assert pn_readable(d)
cd, bytes = pn_recv(self.rcv, 1024)
assert bytes == msg
assert cd == len(bytes)
cd, bytes = pn_recv(self.rcv, 1024)
assert cd == 0
assert bytes == ""
msg = "this is more"
n = pn_send(self.snd, msg)
assert n == len(msg)
assert pn_advance(self.snd)
self.pump()
cd, bytes = pn_recv(self.rcv, 1024)
assert cd == len(bytes)
assert bytes == msg
cd, bytes = pn_recv(self.rcv, 1024)
assert cd == PN_EOS
assert bytes == ""
def test_disposition(self):
pn_flow(self.rcv, 1)
self.pump()
sd = pn_delivery(self.snd, "tag")
msg = "this is a test"
n = pn_send(self.snd, msg)
assert n == len(msg)
assert pn_advance(self.snd)
self.pump()
rd = pn_current(self.rcv)
assert rd is not None
assert pn_delivery_tag(rd) == pn_delivery_tag(sd)
cd, rmsg = pn_recv(self.rcv, 1024)
assert cd == len(rmsg)
assert rmsg == msg
pn_disposition(rd, PN_ACCEPTED)
self.pump()
rdisp = pn_remote_disposition(sd)
ldisp = pn_local_disposition(rd)
assert rdisp == ldisp == PN_ACCEPTED, (rdisp, ldisp)
assert pn_updated(sd)
pn_disposition(sd, PN_ACCEPTED)
pn_settle(sd)
self.pump()
assert pn_local_disposition(sd) == pn_remote_disposition(rd) == PN_ACCEPTED
class CreditTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
self.c1 = pn_get_connection(pn_get_session(self.snd))
self.c2 = pn_get_connection(pn_get_session(self.rcv))
pn_link_open(self.snd)
pn_link_open(self.rcv)
self.pump()
def teardown(self):
self.cleanup()
def testCreditSender(self):
credit = pn_credit(self.snd)
assert credit == 0, credit
pn_flow(self.rcv, 10)
self.pump()
credit = pn_credit(self.snd)
assert credit == 10, credit
pn_flow(self.rcv, PN_SESSION_WINDOW)
self.pump()
credit = pn_credit(self.snd)
assert credit == 10 + PN_SESSION_WINDOW, credit
def testCreditReceiver(self):
pn_flow(self.rcv, 10)
self.pump()
assert pn_credit(self.rcv) == 10, pn_credit(self.rcv)
d = pn_delivery(self.snd, "tag")
assert d
assert pn_advance(self.snd)
self.pump()
assert pn_credit(self.rcv) == 10, pn_credit(self.rcv)
assert pn_queued(self.rcv) == 1, pn_queued(self.rcv)
c = pn_current(self.rcv)
assert pn_delivery_tag(c) == "tag", pn_delivery_tag(c)
assert pn_advance(self.rcv)
assert pn_credit(self.rcv) == 9, pn_credit(self.rcv)
assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
def settle(self):
result = []
d = pn_work_head(self.c1)
while d:
if pn_updated(d):
result.append(pn_delivery_tag(d))
pn_settle(d)
d = pn_work_next(d)
return result
def testBuffering(self):
print "PN_SESSION_WINDOW ",PN_SESSION_WINDOW
pn_flow(self.rcv, PN_SESSION_WINDOW + 10)
self.pump()
assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
idx = 0
while pn_credit(self.snd):
d = pn_delivery(self.snd, "tag%s" % idx)
assert d
assert pn_advance(self.snd)
self.pump()
idx += 1
assert idx == PN_SESSION_WINDOW + 10, idx
assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
extra = pn_delivery(self.snd, "extra")
assert extra
assert pn_advance(self.snd)
self.pump()
assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
for i in range(10):
d = pn_current(self.rcv)
print "d =",d
assert pn_delivery_tag(d) == "tag%s" % i, pn_delivery_tag(d)
assert pn_advance(self.rcv)
pn_settle(d)
self.pump()
print "queued : ", pn_queued(self.rcv)
assert pn_queued(self.rcv) == PN_SESSION_WINDOW - (i+1), pn_queued(self.rcv)
tags = self.settle()
print "tags=", tags
assert tags == ["tag%s" % i for i in range(10)], tags
self.pump()
print "queued : ", pn_queued(self.rcv)
print "PN_SESSION_WINDOW",PN_SESSION_WINDOW
assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
for i in range(PN_SESSION_WINDOW):
d = pn_current(self.rcv)
assert d, i
assert pn_delivery_tag(d) == "tag%s" % (i+10), pn_delivery_tag(d)
assert pn_advance(self.rcv)
pn_settle(d)
self.pump()
assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
tags = self.settle()
assert tags == ["tag%s" % (i+10) for i in range(PN_SESSION_WINDOW)]
assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
def _testBufferingOnClose(self, a, b):
for i in range(10):
d = pn_delivery(self.snd, "tag-%s" % i)
assert d
pn_settle(d)
self.pump()
assert pn_queued(self.snd) == 10
endpoints = {"connection": (self.c1, self.c2),
"session": (pn_get_session(self.snd), pn_get_session(self.rcv)),
"link": (self.snd, self.rcv)}
local_a, remote_a = endpoints[a]
local_b, remote_b = endpoints[b]
a_close = getattr(xproton, "pn_%s_close" % a)
a_state = getattr(xproton, "pn_%s_state" % a)
b_close = getattr(xproton, "pn_%s_close" % b)
b_state = getattr(xproton, "pn_%s_state" % b)
b_close(remote_b)
self.pump()
assert b_state(local_b) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
a_close(local_a)
self.pump()
assert a_state(remote_a) & PN_REMOTE_CLOSED
assert pn_queued(self.snd) == 10
def testBufferingOnCloseLinkLink(self):
self._testBufferingOnClose("link", "link")
def testBufferingOnCloseLinkSession(self):
self._testBufferingOnClose("link", "session")
def testBufferingOnCloseLinkConnection(self):
self._testBufferingOnClose("link", "connection")
def testBufferingOnCloseSessionLink(self):
self._testBufferingOnClose("session", "link")
def testBufferingOnCloseSessionSession(self):
self._testBufferingOnClose("session", "session")
def testBufferingOnCloseSessionConnection(self):
self._testBufferingOnClose("session", "connection")
def testBufferingOnCloseConnectionLink(self):
self._testBufferingOnClose("connection", "link")
def testBufferingOnCloseConnectionSession(self):
self._testBufferingOnClose("connection", "session")
def testBufferingOnCloseConnectionConnection(self):
self._testBufferingOnClose("connection", "connection")
def testCreditWithBuffering(self):
pn_flow(self.rcv, PN_SESSION_WINDOW + 10)
self.pump()
assert pn_credit(self.snd) == PN_SESSION_WINDOW + 10, pn_credit(self.snd)
assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
idx = 0
while pn_credit(self.snd):
d = pn_delivery(self.snd, "tag%s" % idx)
assert d
assert pn_advance(self.snd)
self.pump()
idx += 1
assert idx == PN_SESSION_WINDOW + 10, idx
assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
pn_flow(self.rcv, 1)
self.pump()
assert pn_credit(self.snd) == 1, pn_credit(self.snd)
def testFullDrain(self):
assert pn_credit(self.rcv) == 0
assert pn_credit(self.snd) == 0
pn_drain(self.rcv, 10)
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 0
self.pump()
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 10
pn_drained(self.snd)
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 10
self.pump()
assert pn_credit(self.rcv) == 0
assert pn_credit(self.snd) == 0
def testPartialDrain(self):
pn_drain(self.rcv, 2)
self.pump()
d = pn_delivery(self.snd, "tag")
assert d
assert pn_advance(self.snd)
pn_drained(self.snd)
self.pump()
c = pn_current(self.rcv)
assert pn_queued(self.rcv) == 1, pn_queued(self.rcv)
assert pn_delivery_tag(c) == pn_delivery_tag(d), pn_delivery_tag(c)
assert pn_advance(self.rcv)
assert not pn_current(self.rcv)
assert pn_credit(self.rcv) == 0, pn_credit(self.rcv)
def testDrainFlow(self):
assert pn_credit(self.rcv) == 0
assert pn_credit(self.snd) == 0
pn_drain(self.rcv, 10)
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 0
self.pump()
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 10
pn_drained(self.snd)
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 10
self.pump()
assert pn_credit(self.rcv) == 0
assert pn_credit(self.snd) == 0
pn_flow(self.rcv, 10)
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 0
self.pump()
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 10
pn_drained(self.snd)
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 10
self.pump()
assert pn_credit(self.rcv) == 10
assert pn_credit(self.snd) == 10
def testNegative(self):
assert pn_credit(self.snd) == 0
d = pn_delivery(self.snd, "tag")
assert d
assert pn_advance(self.snd)
self.pump()
assert pn_credit(self.rcv) == 0
assert pn_queued(self.rcv) == 0
pn_flow(self.rcv, 1)
assert pn_credit(self.rcv) == 1
assert pn_queued(self.rcv) == 0
self.pump()
assert pn_credit(self.rcv) == 1
assert pn_queued(self.rcv) == 1
c = pn_current(self.rcv)
assert c
assert pn_delivery_tag(c) == "tag"
assert pn_advance(self.rcv)
assert pn_credit(self.rcv) == 0
assert pn_queued(self.rcv) == 0
class SettlementTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
self.c1 = pn_get_connection(pn_get_session(self.snd))
self.c2 = pn_get_connection(pn_get_session(self.rcv))
pn_link_open(self.snd)
pn_link_open(self.rcv)
self.pump()
def teardown(self):
self.cleanup()
def testSettleCurrent(self):
pn_flow(self.rcv, 10)
self.pump()
assert pn_credit(self.snd) == 10, pn_credit(self.snd)
d = pn_delivery(self.snd, "tag")
e = pn_delivery(self.snd, "tag2")
assert d
assert e
c = pn_current(self.snd)
assert pn_delivery_tag(c) == "tag", pn_delivery_tag(c)
pn_settle(c)
c = pn_current(self.snd)
assert pn_delivery_tag(c) == "tag2", pn_delivery_tag(c)
pn_settle(c)
c = pn_current(self.snd)
assert not c
self.pump()
c = pn_current(self.rcv)
assert c
assert pn_delivery_tag(c) == "tag", pn_delivery_tag(c)
assert pn_remote_settled(c)
pn_settle(c)
c = pn_current(self.rcv)
assert c
assert pn_delivery_tag(c) == "tag2", pn_delivery_tag(c)
assert pn_remote_settled(c)
pn_settle(c)
c = pn_current(self.rcv)
assert not c
def testUnsettled(self):
pn_flow(self.rcv, 10)
self.pump()
assert pn_unsettled(self.snd) == 0, pn_unsettled(self.snd)
assert pn_unsettled(self.rcv) == 0, pn_unsettled(self.rcv)
d = pn_delivery(self.snd, "tag")
assert d
assert pn_unsettled(self.snd) == 1, pn_unsettled(self.snd)
assert pn_unsettled(self.rcv) == 0, pn_unsettled(self.rcv)
assert pn_advance(self.snd)
self.pump()
assert pn_unsettled(self.snd) == 1, pn_unsettled(self.snd)
assert pn_unsettled(self.rcv) == 1, pn_unsettled(self.rcv)
c = pn_current(self.rcv)
assert c
pn_settle(c)
assert pn_unsettled(self.snd) == 1, pn_unsettled(self.snd)
assert pn_unsettled(self.rcv) == 0, pn_unsettled(self.rcv)
class PipelineTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
def teardown(self):
self.cleanup()
def test(self):
ssn = pn_session(self.c1)
snd = pn_sender(ssn, "sender")
pn_connection_open(self.c1)
pn_session_open(ssn)
pn_link_open(snd)
for i in range(10):
d = pn_delivery(snd, "delivery-%s" % i)
pn_send(snd, "delivery-%s" % i)
pn_settle(d)
pn_link_close(snd)
pn_session_close(ssn)
#pn_connection_close(self.c1)
self.pump()
assert pn_connection_state(self.c2) == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
ssn2 = pn_session_head(self.c2, PN_LOCAL_UNINIT)
assert ssn2
assert pn_session_state(ssn2) == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
rcv = pn_link_head(self.c2, PN_LOCAL_UNINIT)
assert rcv
assert pn_link_state(rcv) == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
pn_connection_open(self.c2)
pn_session_open(ssn2)
pn_link_open(rcv)
pn_flow(rcv, 10)
assert pn_queued(rcv) == 0
self.pump()
assert pn_queued(rcv) == 10
assert pn_link_state(rcv) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
assert pn_session_state(ssn2) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
assert pn_connection_state(self.c2) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
for i in range(pn_queued(rcv)):
d = pn_current(rcv)
assert d
assert pn_delivery_tag(d) == "delivery-%s" % i
pn_settle(d)
assert pn_queued(rcv) == 0