DISPATCH-458 - Added flush for credits that can, in some cases, be stuck on an incoming link.
Added a test that causes the credit-hang.
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 336b90f..2b95b41 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -191,6 +191,16 @@
if (link->incremental_credit > 0) {
core->flow_handler(core->user_context, link, link->incremental_credit);
link->incremental_credit = 0;
+
+ //
+ // Note: This unprotected read of a CT-only value is safe in this case.
+ // If there is pending credit on the link that needs to be pushed down to
+ // Proton, we need to give the core a kick to make sure it is sent. It is
+ // possible that no more credit will be issued to cause the movement of CT
+ // credit to Proton credit (see DISPATCH-458).
+ //
+ if (link->incremental_credit_CT > 0)
+ qdr_link_check_credit(core, link);
}
if (link->drain_mode_changed) {
core->drain_handler(core->user_context, link, link->drain_mode);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 3113165..a578810 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -627,4 +627,11 @@
void *context,
qd_router_entity_type_t type,
qd_composed_field_t *body);
+
+//
+// Cause the core to check credit on an incoming link that might have CT credit but
+// no IO/Proton credit.
+//
+void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link);
+
#endif
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 40ba82e..b6b4f21 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -24,6 +24,7 @@
static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -184,6 +185,14 @@
}
+void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link)
+{
+ qdr_action_t *action = qdr_action(qdr_link_check_credit_CT, "link_check_credit");
+ action->args.connection.link = link;
+ qdr_action_enqueue(core, action);
+}
+
+
void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr, bool exclude_inprocess, bool control)
{
qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -430,6 +439,16 @@
}
+static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+
+ qdr_link_t *link = action->args.connection.link;
+ qdr_link_issue_credit_CT(core, link, 0, false);
+}
+
+
static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr)
{
int fanout = 0;
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index fbd5672..026e076 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -1076,6 +1076,11 @@
test.run()
self.assertEqual(None, test.error)
+ def test_20_batched_settlement(self):
+ test = BatchedSettlementTest(self.address)
+ test.run()
+ self.assertEqual(None, test.error)
+
def test_connection_properties(self):
connection = BlockingConnection(self.router.addresses[0],
timeout=60,
@@ -1388,5 +1393,60 @@
Container(self).run()
+class BatchedSettlementTest(MessagingHandler):
+ def __init__(self, address):
+ super(BatchedSettlementTest, self).__init__(auto_accept=False)
+ self.address = address
+ self.dest = "balanced.BatchedSettlement"
+ self.error = None
+ self.count = 20000
+ self.batch_count = 200
+ self.n_sent = 0
+ self.n_received = 0
+ self.n_settled = 0
+ self.batch = []
+
+ def check_if_done(self):
+ if self.n_settled == self.count:
+ self.timer.cancel()
+ self.conn.close()
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d rcvd=%d settled=%d" % \
+ (self.n_sent, self.n_received, self.n_settled)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(20, Timeout(self))
+ self.conn = event.container.connect(self.address)
+ self.sender = event.container.create_sender(self.conn, self.dest)
+ self.receiver = event.container.create_receiver(self.conn, self.dest)
+
+ def send(self):
+ if self.n_sent < self.count:
+ while self.sender.credit > 0:
+ msg = Message(body="Batch-Test")
+ self.sender.send(msg)
+ self.n_sent += 1
+
+ def on_sendable(self, event):
+ if self.n_sent < self.count:
+ self.send()
+
+ def on_message(self, event):
+ self.n_received += 1
+ self.batch.insert(0, event.delivery)
+ if len(self.batch) == self.batch_count:
+ while len(self.batch) > 0:
+ self.accept(self.batch.pop())
+
+ def on_accepted(self, event):
+ self.n_settled += 1
+ self.check_if_done()
+
+ def run(self):
+ Container(self).run()
+
+
if __name__ == '__main__':
unittest.main(main_module())