DISPATCH-458 - Added flush for credits that can, in some cases, be stuck on an incoming link.
Added a test that causes the credit-hang.
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 336b90f..2b95b41 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -191,6 +191,16 @@
             if (link->incremental_credit > 0) {
                 core->flow_handler(core->user_context, link, link->incremental_credit);
                 link->incremental_credit = 0;
+
+                //
+                // Note:  This unprotected read of a CT-only value is safe in this case.
+                // If there is pending credit on the link that needs to be pushed down to
+                // Proton, we need to give the core a kick to make sure it is sent.  It is
+                // possible that no more credit will be issued to cause the movement of CT
+                // credit to Proton credit (see DISPATCH-458).
+                //
+                if (link->incremental_credit_CT > 0)
+                    qdr_link_check_credit(core, link);
             }
             if (link->drain_mode_changed) {
                 core->drain_handler(core->user_context, link, link->drain_mode);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 3113165..a578810 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -627,4 +627,11 @@
                        void                    *context,
                        qd_router_entity_type_t  type,
                        qd_composed_field_t     *body);
+
+//
+// Cause the core to check credit on an incoming link that might have CT credit but
+// no IO/Proton credit.
+//
+void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link);
+
 #endif
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 40ba82e..b6b4f21 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -24,6 +24,7 @@
 
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 
@@ -184,6 +185,14 @@
 }
 
 
+void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link)
+{
+    qdr_action_t *action = qdr_action(qdr_link_check_credit_CT, "link_check_credit");
+    action->args.connection.link = link;
+    qdr_action_enqueue(core, action);
+}
+
+
 void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr, bool exclude_inprocess, bool control)
 {
     qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -430,6 +439,16 @@
 }
 
 
+static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    qdr_link_t *link = action->args.connection.link;
+    qdr_link_issue_credit_CT(core, link, 0, false);
+}
+
+
 static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr)
 {
     int fanout = 0;
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index fbd5672..026e076 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -1076,6 +1076,11 @@
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_20_batched_settlement(self):
+        test = BatchedSettlementTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
     def test_connection_properties(self):
         connection = BlockingConnection(self.router.addresses[0],
                                         timeout=60,
@@ -1388,5 +1393,60 @@
         Container(self).run()
 
 
+class BatchedSettlementTest(MessagingHandler):
+    def __init__(self, address):
+        super(BatchedSettlementTest, self).__init__(auto_accept=False)
+        self.address = address
+        self.dest = "balanced.BatchedSettlement"
+        self.error = None
+        self.count       = 20000
+        self.batch_count = 200
+        self.n_sent      = 0
+        self.n_received  = 0
+        self.n_settled   = 0
+        self.batch       = []
+
+    def check_if_done(self):
+        if self.n_settled == self.count:
+            self.timer.cancel()
+            self.conn.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d settled=%d" % \
+                     (self.n_sent, self.n_received, self.n_settled)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer    = event.reactor.schedule(20, Timeout(self))
+        self.conn     = event.container.connect(self.address)
+        self.sender   = event.container.create_sender(self.conn, self.dest)
+        self.receiver = event.container.create_receiver(self.conn, self.dest)
+
+    def send(self):
+        if self.n_sent < self.count:
+            while self.sender.credit > 0:
+                msg = Message(body="Batch-Test")
+                self.sender.send(msg)
+                self.n_sent += 1
+
+    def on_sendable(self, event):
+        if self.n_sent < self.count:
+            self.send()
+
+    def on_message(self, event):
+        self.n_received += 1
+        self.batch.insert(0, event.delivery)
+        if len(self.batch) == self.batch_count:
+            while len(self.batch) > 0:
+                self.accept(self.batch.pop())
+
+    def on_accepted(self, event):
+        self.n_settled += 1
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())