DISPATCH-646 - Refactored the core->IO communication around link-specific work lists to preserve the order of flows and deliveries.
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 3d0e270..0d66d1c 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -553,7 +553,7 @@
 typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int delivery_count);
 typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
 typedef void (*qdr_link_drain_t)         (void *context, qdr_link_t *link, bool mode);
-typedef void (*qdr_link_push_t)          (void *context, qdr_link_t *link);
+typedef int  (*qdr_link_push_t)          (void *context, qdr_link_t *link, int limit);
 typedef void (*qdr_link_deliver_t)       (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
 typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
 
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 8c95f9f..abc6a4c 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -201,22 +201,6 @@
 
     int event_count = 0;
 
-    do {
-        sys_mutex_lock(conn->work_lock);
-        ref = DEQ_HEAD(conn->links_with_deliveries);
-        if (ref) {
-            link = ref->link;
-            qdr_del_link_ref(&conn->links_with_deliveries, ref->link, QDR_LINK_LIST_CLASS_DELIVERY);
-        } else
-            link = 0;
-        sys_mutex_unlock(conn->work_lock);
-
-        if (link) {
-            core->push_handler(core->user_context, link);
-            event_count++;
-        }
-    } while (link);
-
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
     sys_mutex_unlock(conn->work_lock);
@@ -234,55 +218,93 @@
         case QDR_CONNECTION_WORK_SECOND_ATTACH :
             core->second_attach_handler(core->user_context, work->link, work->source, work->target);
             break;
-
-        case QDR_CONNECTION_WORK_FIRST_DETACH :
-            core->detach_handler(core->user_context, work->link, work->error, true, work->close_link);
-            break;
-
-        case QDR_CONNECTION_WORK_SECOND_DETACH :
-            core->detach_handler(core->user_context, work->link, work->error, false, work->close_link);
-            free_qdr_link_t(work->link);
-            break;
         }
 
         qdr_terminus_free(work->source);
         qdr_terminus_free(work->target);
-        qdr_error_free(work->error);
         free_qdr_connection_work_t(work);
 
         work = DEQ_HEAD(work_list);
     }
 
     do {
+        qdr_link_work_t *link_work;
+
         sys_mutex_lock(conn->work_lock);
-        ref = DEQ_HEAD(conn->links_with_credit);
+        ref = DEQ_HEAD(conn->links_with_work);
         if (ref) {
             link = ref->link;
-            qdr_del_link_ref(&conn->links_with_credit, ref->link, QDR_LINK_LIST_CLASS_FLOW);
+            qdr_del_link_ref(&conn->links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK);
+            link_work = DEQ_HEAD(link->work_list);
+            if (link_work)
+                DEQ_REMOVE_HEAD(link->work_list);
         } else
             link = 0;
         sys_mutex_unlock(conn->work_lock);
 
         if (link) {
-            if (link->incremental_credit > 0) {
-                core->flow_handler(core->user_context, link, link->incremental_credit);
-                link->incremental_credit = 0;
+            while (link_work) {
+                switch (link_work->work_type) {
+                case QDR_LINK_WORK_DELIVERY :
+                    {
+                        int count = core->push_handler(core->user_context, link, link_work->value);
+                        assert(count <= link_work->value);
+                        link_work->value -= count;
+                        break;
+                    }
 
-                //
-                // Note:  This unprotected read of a CT-only value is safe in this case.
-                // If there is pending credit on the link that needs to be pushed down to
-                // Proton, we need to give the core a kick to make sure it is sent.  It is
-                // possible that no more credit will be issued to cause the movement of CT
-                // credit to Proton credit (see DISPATCH-458).
-                //
-                if (link->incremental_credit_CT > 0)
-                    qdr_link_check_credit(core, link);
+                case QDR_LINK_WORK_FLOW :
+                    if (link_work->value > 0)
+                        core->flow_handler(core->user_context, link, link_work->value);
+                    if      (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
+                        core->drain_handler(core->user_context, link, true);
+                    else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
+                        core->drain_handler(core->user_context, link, false);
+                    else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
+                        core->drained_handler(core->user_context, link);
+                    break;
+
+                case QDR_LINK_WORK_FIRST_DETACH :
+                    core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link);
+                    break;
+
+                case QDR_LINK_WORK_SECOND_DETACH :
+                    core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link);
+                    free_qdr_link_t(link);
+                    break;
+                }
+
+                sys_mutex_lock(conn->work_lock);
+                if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0) {
+                    DEQ_INSERT_HEAD(link->work_list, link_work);
+                    link_work = 0; // Halt work processing
+                } else {
+                    qdr_error_free(link_work->error);
+                    free_qdr_link_work_t(link_work);
+                    link_work = DEQ_HEAD(link->work_list);
+                    if (link_work)
+                        DEQ_REMOVE_HEAD(link->work_list);
+                }
+                sys_mutex_unlock(conn->work_lock);
+                event_count++;
             }
-            if (link->drain_mode_changed) {
-                core->drain_handler(core->user_context, link, link->drain_mode);
-                link->drain_mode_changed = false;
+
+            //
+            // Handle disposition/settlement updates
+            //
+            qdr_delivery_ref_list_t updated_deliveries;
+            sys_mutex_lock(conn->work_lock);
+            DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+            sys_mutex_unlock(conn->work_lock);
+
+            qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
+            while (dref) {
+                core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
+                qdr_delivery_decref(core, dref->dlv);
+                qdr_del_delivery_ref(&updated_deliveries, dref);
+                dref = DEQ_HEAD(updated_deliveries);
+                event_count++;
             }
-            event_count++;
         }
     } while (link);
 
@@ -470,6 +492,21 @@
 }
 
 
+void qdr_link_enqueue_work_CT(qdr_core_t      *core,
+                              qdr_link_t      *link,
+                              qdr_link_work_t *work)
+{
+    qdr_connection_t *conn = link->conn;
+
+    sys_mutex_lock(conn->work_lock);
+    DEQ_INSERT_TAIL(link->work_list, work);
+    qdr_add_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+    sys_mutex_unlock(conn->work_lock);
+
+    qdr_connection_activate_CT(core, conn);
+}
+
+
 #define QDR_DISCRIMINATOR_SIZE 16
 static void qdr_generate_discriminator(char *string)
 {
@@ -554,8 +591,10 @@
     qdr_delivery_ref_list_t updated_deliveries;
     qdr_delivery_list_t     undelivered;
     qdr_delivery_list_t     unsettled;
+    qdr_link_work_list_t    work_list;
 
     sys_mutex_lock(conn->work_lock);
+    DEQ_MOVE(link->work_list, work_list);
     DEQ_MOVE(link->updated_deliveries, updated_deliveries);
     DEQ_MOVE(link->undelivered, undelivered);
     qdr_delivery_t *d = DEQ_HEAD(undelivered);
@@ -575,6 +614,17 @@
     sys_mutex_unlock(conn->work_lock);
 
     //
+    // Free the work list
+    //
+    qdr_link_work_t *link_work = DEQ_HEAD(work_list);
+    while (link_work) {
+        DEQ_REMOVE_HEAD(work_list);
+        qdr_error_free(link_work->error);
+        free_qdr_link_work_t(link_work);
+        link_work = DEQ_HEAD(work_list);
+    }
+
+    //
     // Free all the 'updated' references
     //
     qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
@@ -675,8 +725,7 @@
     //
     qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
     sys_mutex_lock(conn->work_lock);
-    qdr_del_link_ref(&conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
-    qdr_del_link_ref(&conn->links_with_credit,     link, QDR_LINK_LIST_CLASS_FLOW);
+    qdr_del_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
 
     //
@@ -724,7 +773,6 @@
     work->link      = link;
     work->source    = source;
     work->target    = target;
-    work->error     = 0;
 
     qdr_connection_enqueue_work_CT(core, conn, work);
     return link;
@@ -733,12 +781,10 @@
 
 void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close)
 {
-    qdr_connection_work_t *work = new_qdr_connection_work_t();
+    qdr_link_work_t *work = new_qdr_link_work_t();
     ZERO(work);
-    work->work_type  = ++link->detach_count == 1 ? QDR_CONNECTION_WORK_FIRST_DETACH : QDR_CONNECTION_WORK_SECOND_DETACH;
-    work->link       = link;
+    work->work_type  = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH;
     work->close_link = close;
-    work->error      = 0;
 
     if (error)
         work->error = error;
@@ -769,7 +815,7 @@
     if (link->detach_count == 2)
         qdr_link_cleanup_CT(core, link->conn, link);
 
-    qdr_connection_enqueue_work_CT(core, link->conn, work);
+    qdr_link_enqueue_work_CT(core, link, work);
 }
 
 
@@ -781,7 +827,6 @@
     work->link      = link;
     work->source    = source;
     work->target    = target;
-    work->error     = 0;
 
     link->oper_status = QDR_LINK_OPER_UP;
 
@@ -1163,7 +1208,6 @@
         DEQ_REMOVE_HEAD(conn->work_list);
         qdr_terminus_free(work->source);
         qdr_terminus_free(work->target);
-        qdr_error_free(work->error);
         free_qdr_connection_work_t(work);
         work = DEQ_HEAD(conn->work_list);
     }
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index debce01..b37b4b8 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -145,6 +145,19 @@
         if (dlv->settled) {
             DEQ_REMOVE(link->undelivered, dlv);
             dlv->where = QDR_DELIVERY_NOWHERE;
+
+            //
+            // The link-work item representing this pending delivery must be
+            // updated to reflect the removal of the delivery.  If the item
+            // has no other deliveries associated with it, it can be removed
+            // from the work list.
+            //
+            assert(dlv->link_work);
+            if (dlv->link_work && (--dlv->link_work->value == 0)) {
+                DEQ_REMOVE(link->work_list, dlv->link_work);
+                free_qdr_link_work_t(dlv->link_work);
+                dlv->link_work = 0;
+            }
             qdr_delivery_decref_CT(core, dlv);
         }
         dlv = next;
@@ -169,9 +182,22 @@
     qdr_delivery_incref(dlv);
 
     //
-    // If the link isn't already on the links_with_deliveries list, put it there.
+    // We must put a work item on the link's work list to represent this pending delivery.
+    // If there's already a delivery item on the tail of the work list, simply join that item
+    // by incrementing the value.
     //
-    qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+    qdr_link_work_t *work = DEQ_TAIL(link->work_list);
+    if (work && work->work_type == QDR_LINK_WORK_DELIVERY) {
+        work->value++;
+    } else {
+        work = new_qdr_link_work_t();
+        ZERO(work);
+        work->work_type = QDR_LINK_WORK_DELIVERY;
+        work->value     = 1;
+        DEQ_INSERT_TAIL(link->work_list, work);
+        qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+    }
+    dlv->link_work = work;
     sys_mutex_unlock(link->conn->work_lock);
 
     //
@@ -695,7 +721,6 @@
         work->link      = out_link;
         work->source    = source;
         work->target    = target;
-        work->error     = 0;
 
         qdr_connection_enqueue_work_CT(core, conn, work);
 
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index ef73553..3257980 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -30,6 +30,7 @@
 ALLOC_DEFINE(qdr_router_ref_t);
 ALLOC_DEFINE(qdr_link_ref_t);
 ALLOC_DEFINE(qdr_general_work_t);
+ALLOC_DEFINE(qdr_link_work_t);
 ALLOC_DEFINE(qdr_connection_ref_t);
 ALLOC_DEFINE(qdr_connection_info_t);
 
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c657391..414647b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -152,6 +152,104 @@
 ALLOC_DECLARE(qdr_action_t);
 DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
 
+//
+// General Work
+//
+// The following types are used to post work to the IO threads for
+// non-connection-specific action.  These actions are serialized through
+// a zero-delay timer and are processed by one thread at a time.  General
+// actions occur in-order and are not run concurrently.
+//
+typedef struct qdr_general_work_t qdr_general_work_t;
+typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work);
+
+struct qdr_general_work_t {
+    DEQ_LINKS(qdr_general_work_t);
+    qdr_general_work_handler_t  handler;
+    qdr_field_t                *field;
+    int                         maskbit;
+    int                         inter_router_cost;
+    qdr_receive_t               on_message;
+    void                       *on_message_context;
+    qd_message_t               *msg;
+};
+
+ALLOC_DECLARE(qdr_general_work_t);
+DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t);
+
+qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler);
+
+
+//
+// Connection Work
+//
+// The following types are used to post work to the IO threads for
+// connection-specific action.  The actions for a particular connection
+// are run in-order and are not concurrent.  Actions for different connections
+// will run concurrently.
+//
+typedef enum {
+    QDR_CONNECTION_WORK_FIRST_ATTACH,
+    QDR_CONNECTION_WORK_SECOND_ATTACH
+} qdr_connection_work_type_t;
+
+typedef struct qdr_connection_work_t {
+    DEQ_LINKS(struct qdr_connection_work_t);
+    qdr_connection_work_type_t  work_type;
+    qdr_link_t                 *link;
+    qdr_terminus_t             *source;
+    qdr_terminus_t             *target;
+} qdr_connection_work_t;
+
+ALLOC_DECLARE(qdr_connection_work_t);
+DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
+
+
+//
+// Link Work
+//
+// The following type is used to post link-specific work to the IO threads.
+// This ensures that work related to a particular link (deliveries, disposition
+// updates, flow updates, and detaches) are processed in-order.
+//
+// DELIVERY      - Push up to _value_ deliveries from the undelivered list to the
+//                 link (outgoing links only).  Don't push more than there is
+//                 available credit for.  If the full number of deliveries (_value_)
+//                 cannot be pushed, don't consume this work item from the list.
+//                 This link will be blocked until further credit is received.
+// FLOW          - Push a flow update using _drain_action_ and _value_ for the
+//                 number of incremental credits.
+// FIRST_DETACH  - Issue a first detach on this link, using _error_ if there is an
+//                 error condition.
+// SECOND_DETACH - Issue a second detach on this link.
+//
+typedef enum {
+    QDR_LINK_WORK_DELIVERY,
+    QDR_LINK_WORK_FLOW,
+    QDR_LINK_WORK_FIRST_DETACH,
+    QDR_LINK_WORK_SECOND_DETACH
+} qdr_link_work_type_t;
+
+typedef enum {
+    QDR_LINK_WORK_DRAIN_ACTION_NONE = 0,
+    QDR_LINK_WORK_DRAIN_ACTION_SET,
+    QDR_LINK_WORK_DRAIN_ACTION_CLEAR,
+    QDR_LINK_WORK_DRAIN_ACTION_DRAINED
+} qdr_link_work_drain_action_t;
+
+typedef struct qdr_link_work_t {
+    DEQ_LINKS(struct qdr_link_work_t);
+    qdr_link_work_type_t          work_type;
+    qdr_error_t                  *error;
+    int                           value;
+    bool                          close_link;
+    qdr_link_work_drain_action_t  drain_action;
+} qdr_link_work_t;
+
+ALLOC_DECLARE(qdr_link_work_t);
+DEQ_DECLARE(qdr_link_work_t, qdr_link_work_list_t);
+
+
 #define QDR_AGENT_MAX_COLUMNS 64
 #define QDR_AGENT_COLUMN_NULL (QDR_AGENT_MAX_COLUMNS + 1)
 
@@ -225,6 +323,7 @@
     qd_bitmask_t        *link_exclusion;
     qdr_address_t       *tracking_addr;
     int                  tracking_addr_bit;
+    qdr_link_work_t     *link_work;         ///< Delivery work item for this delivery
 };
 
 ALLOC_DECLARE(qdr_delivery_t);
@@ -242,10 +341,9 @@
 void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref);
 
 #define QDR_LINK_LIST_CLASS_ADDRESS    0
-#define QDR_LINK_LIST_CLASS_DELIVERY   1
-#define QDR_LINK_LIST_CLASS_FLOW       2
-#define QDR_LINK_LIST_CLASS_CONNECTION 3
-#define QDR_LINK_LIST_CLASSES          4
+#define QDR_LINK_LIST_CLASS_WORK       1
+#define QDR_LINK_LIST_CLASS_CONNECTION 2
+#define QDR_LINK_LIST_CLASSES          3
 
 typedef enum {
     QDR_LINK_OPER_UP,
@@ -262,6 +360,7 @@
     qdr_connection_t        *conn;               ///< [ref] Connection that owns this link
     qd_link_type_t           link_type;
     qd_direction_t           link_direction;
+    qdr_link_work_list_t     work_list;
     char                    *name;
     int                      detach_count;       ///< 0, 1, or 2 depending on the state of the lifecycle
     qdr_address_t           *owning_addr;        ///< [ref] Address record that owns this link
@@ -276,11 +375,8 @@
     bool                     strip_annotations_in;
     bool                     strip_annotations_out;
     int                      capacity;
-    int                      incremental_credit_CT;
-    int                      incremental_credit;
     bool                     flow_started;   ///< for incoming, true iff initial credit has been granted
     bool                     drain_mode;
-    bool                     drain_mode_changed;
     int                      credit_to_core; ///< Number of the available credits incrementally given to the core
 
     uint64_t total_deliveries;
@@ -389,59 +485,11 @@
 
 
 //
-// General Work
+// Connection Information
 //
-// The following types are used to post work to the IO threads for
-// non-connection-specific action.  These actions are serialized through
-// a zero-delay timer and are processed by one thread at a time.  General
-// actions occur in-order and are not run concurrently.
+// This record is used to give the core thread access to the details
+// of a connection's configuration.
 //
-typedef struct qdr_general_work_t qdr_general_work_t;
-typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work);
-
-struct qdr_general_work_t {
-    DEQ_LINKS(qdr_general_work_t);
-    qdr_general_work_handler_t  handler;
-    qdr_field_t                *field;
-    int                         maskbit;
-    int                         inter_router_cost;
-    qdr_receive_t               on_message;
-    void                       *on_message_context;
-    qd_message_t               *msg;
-};
-
-ALLOC_DECLARE(qdr_general_work_t);
-DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t);
-
-qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler);
-
-//
-// Connection Work
-//
-// The following types are used to post work to the IO threads for
-// connection-specific action.  The actions for a particular connection
-// are run in-order and are not concurrent.  Actions for different connections
-// will run concurrently.
-//
-typedef enum {
-    QDR_CONNECTION_WORK_FIRST_ATTACH,
-    QDR_CONNECTION_WORK_SECOND_ATTACH,
-    QDR_CONNECTION_WORK_FIRST_DETACH,
-    QDR_CONNECTION_WORK_SECOND_DETACH
-} qdr_connection_work_type_t;
-
-typedef struct qdr_connection_work_t {
-    DEQ_LINKS(struct qdr_connection_work_t);
-    qdr_connection_work_type_t  work_type;
-    qdr_link_t                 *link;
-    qdr_terminus_t             *source;
-    qdr_terminus_t             *target;
-    qdr_error_t                *error;
-    bool                        close_link;
-} qdr_connection_work_t;
-
-ALLOC_DECLARE(qdr_connection_work_t);
-DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
 
 struct qdr_connection_info_t {
     const  char                *container;
@@ -481,7 +529,7 @@
     sys_mutex_t                *work_lock;
     qdr_link_ref_list_t         links;
     qdr_link_ref_list_t         links_with_deliveries;
-    qdr_link_ref_list_t         links_with_credit;
+    qdr_link_ref_list_t         links_with_work;
     char                       *tenant_space;
     int                         tenant_space_len;
     qdr_connection_info_t      *connection_info;
@@ -662,6 +710,9 @@
 void qdr_connection_enqueue_work_CT(qdr_core_t            *core,
                                     qdr_connection_t      *conn,
                                     qdr_connection_work_t *work);
+void qdr_link_enqueue_work_CT(qdr_core_t      *core,
+                              qdr_link_t      *conn,
+                              qdr_link_work_t *work);
 
 qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
                                qdr_connection_t *conn,
@@ -677,10 +728,4 @@
                        qd_router_entity_type_t  type,
                        qd_composed_field_t     *body);
 
-//
-// Cause the core to check credit on an incoming link that might have CT credit but
-// no IO/Proton credit.
-//
-void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link);
-
 #endif
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index a35e658..66ab3d8 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -23,7 +23,6 @@
 
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -129,6 +128,7 @@
             dlv = DEQ_HEAD(link->undelivered);
             if (dlv) {
                 DEQ_REMOVE_HEAD(link->undelivered);
+                dlv->link_work = 0;
                 settled = dlv->settled;
                 if (!settled) {
                     DEQ_INSERT_TAIL(link->unsettled, dlv);
@@ -156,22 +156,6 @@
         else if (offer != -1)
             core->offer_handler(core->user_context, link, offer);
     }
-
-    //
-    // Handle disposition/settlement updates
-    //
-    qdr_delivery_ref_list_t updated_deliveries;
-    sys_mutex_lock(conn->work_lock);
-    DEQ_MOVE(link->updated_deliveries, updated_deliveries);
-    sys_mutex_unlock(conn->work_lock);
-
-    qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
-    while (ref) {
-        core->delivery_update_handler(core->user_context, ref->dlv, ref->dlv->disposition, ref->dlv->settled);
-        qdr_delivery_decref(core, ref->dlv);
-        qdr_del_delivery_ref(&updated_deliveries, ref);
-        ref = DEQ_HEAD(updated_deliveries);
-    }
 }
 
 
@@ -197,14 +181,6 @@
 }
 
 
-void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link)
-{
-    qdr_action_t *action = qdr_action(qdr_link_check_credit_CT, "link_check_credit");
-    action->args.connection.link = link;
-    qdr_action_enqueue(core, action);
-}
-
-
 void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_iterator_t *addr, bool exclude_inprocess, bool control)
 {
     qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -452,12 +428,13 @@
     if (discard)
         return;
 
-    qdr_link_t *link   = action->args.connection.link;
-    int  credit        = action->args.connection.credit;
-    bool drain         = action->args.connection.drain;
-    bool activate      = false;
-    bool drain_was_set = !link->drain_mode && drain;
-
+    qdr_link_t *link      = action->args.connection.link;
+    int  credit           = action->args.connection.credit;
+    bool drain            = action->args.connection.drain;
+    bool activate         = false;
+    bool drain_was_set    = !link->drain_mode && drain;
+    qdr_link_work_t *work = 0;
+    
     link->drain_mode = drain;
 
     //
@@ -470,10 +447,13 @@
         if (clink->link_direction == QD_INCOMING)
             qdr_link_issue_credit_CT(core, link->connected_link, credit, drain);
         else {
-            sys_mutex_lock(clink->conn->work_lock);
-            qdr_add_link_ref(&clink->conn->links_with_deliveries, clink, QDR_LINK_LIST_CLASS_DELIVERY);
-            sys_mutex_unlock(clink->conn->work_lock);
-            qdr_connection_activate_CT(core, clink->conn);
+            work = new_qdr_link_work_t();
+            ZERO(work);
+            work->work_type = QDR_LINK_WORK_FLOW;
+            work->value     = credit;
+            if (drain)
+                work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
+            qdr_link_enqueue_work_CT(core, clink, work);
         }
 
         return;
@@ -483,9 +463,18 @@
     // Handle the replenishing of credit outbound
     //
     if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
+        if (drain_was_set) {
+            work = new_qdr_link_work_t();
+            ZERO(work);
+            work->work_type    = QDR_LINK_WORK_FLOW;
+            work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
+        }
+
         sys_mutex_lock(link->conn->work_lock);
+        if (work)
+            DEQ_INSERT_TAIL(link->work_list, work);
         if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
-            qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+            qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
             activate = true;
         }
         sys_mutex_unlock(link->conn->work_lock);
@@ -499,16 +488,6 @@
 }
 
 
-static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    if (discard)
-        return;
-
-    qdr_link_t *link = action->args.connection.link;
-    qdr_link_issue_credit_CT(core, link, 0, false);
-}
-
-
 /**
  * Return the number of outbound paths to destinations that this address has.
  * Note that even if there are more than zero paths, the destination still may
@@ -771,42 +750,31 @@
 
 
 /**
- * Check the link's accumulated credit.  If the credit given to the connection thread
- * has been issued to Proton, provide the next batch of credit to the connection thread.
+ * Add link-work to provide credit to the link in an IO thread
  */
 void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain)
 {
+    assert(link->link_direction == QD_INCOMING);
+
     bool drain_changed = link->drain_mode |= drain;
-    bool activate      = drain_changed;
+    link->drain_mode   = drain;
 
-    link->drain_mode = drain;
-    link->drain_mode_changed = drain_changed;
+    if (!drain_changed && credit == 0)
+        return;
 
-    link->incremental_credit_CT += credit;
-    link->flow_started = true;
+    if (credit > 0)
+        link->flow_started = true;
 
-    if (link->incremental_credit_CT && link->incremental_credit == 0) {
-        //
-        // Move the credit from the core-thread value to the connection-thread value.
-        //
-        link->incremental_credit    = link->incremental_credit_CT;
-        link->incremental_credit_CT = 0;
-        activate = true;
-    }
+    qdr_link_work_t *work = new_qdr_link_work_t();
+    ZERO(work);
 
-    if (activate) {
-        //
-        // Put this link on the connection's has-credit list.
-        //
-        sys_mutex_lock(link->conn->work_lock);
-        qdr_add_link_ref(&link->conn->links_with_credit, link, QDR_LINK_LIST_CLASS_FLOW);
-        sys_mutex_unlock(link->conn->work_lock);
+    work->work_type = QDR_LINK_WORK_FLOW;
+    work->value     = credit;
 
-        //
-        // Activate the connection
-        //
-        qdr_connection_activate_CT(core, link->conn);
-    }
+    if (drain_changed)
+        work->drain_action = drain ? QDR_LINK_WORK_DRAIN_ACTION_SET : QDR_LINK_WORK_DRAIN_ACTION_CLEAR;
+
+    qdr_link_enqueue_work_CT(core, link, work);
 }
 
 
@@ -876,7 +844,7 @@
     if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
         qdr_delivery_incref(dlv);
         qdr_add_delivery_ref(&link->updated_deliveries, dlv);
-        qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+        qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
         activate = true;
     }
     sys_mutex_unlock(link->conn->work_lock);
diff --git a/src/router_node.c b/src/router_node.c
index ff666e6..5868570 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -991,19 +991,24 @@
 }
 
 
-static void CORE_link_push(void *context, qdr_link_t *link)
+static int CORE_link_push(void *context, qdr_link_t *link, int limit)
 {
     qd_router_t *router = (qd_router_t*) context;
     qd_link_t   *qlink  = (qd_link_t*) qdr_link_get_context(link);
     if (!qlink)
-        return;
+        return 0;
     
     pn_link_t *plink = qd_link_pn(qlink);
 
     if (plink) {
         int link_credit = pn_link_credit(plink);
+        if (link_credit > limit)
+            link_credit = limit;
         qdr_link_process_deliveries(router->router_core, link, link_credit);
+        return link_credit;
     }
+
+    return 0;
 }
 
 
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index e4bd2bc..e59f870 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -19,7 +19,7 @@
 
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
-from proton import Message
+from proton import Message, Endpoint
 
 class Timeout(object):
     def __init__(self, parent):
@@ -54,6 +54,22 @@
         self.sender = event.container.create_sender(self.conn, "org.apache.dev")
         self.receiver.flow(1)
 
+    def on_link_flow(self, event):
+        if event.link.is_sender and event.link.credit \
+           and event.link.state & Endpoint.LOCAL_ACTIVE \
+           and event.link.state & Endpoint.REMOTE_ACTIVE :
+            self.on_sendable(event)
+
+        # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+        # messages. That along with 10 messages received indicates that the drain worked and we can
+        # declare that the test is successful
+        if self.received_count == 10 and event.link.credit == 0:
+            self.error = None
+            self.timer.cancel()
+            self.receiver.close()
+            self.sender.close()
+            self.conn.close()
+
     def on_sendable(self, event):
         if self.sent_count < 10:
             msg = Message(body="Hello World", properties={'seq': self.sent_count})
@@ -75,16 +91,6 @@
                 # receiver
                 event.receiver.drain(20)
 
-            # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
-            # messages. That along with 10 messages received indicates that the drain worked and we can
-            # declare that the test is successful
-            if self.received_count == 10 and event.link.credit == 0:
-                self.error = None
-                self.timer.cancel()
-                self.receiver.close()
-                self.sender.close()
-                self.conn.close()
-
     def run(self):
         Container(self).run()