DISPATCH-646 - Refactored the core->IO communication around link-specific work lists to preserve the order of flows and deliveries.
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 3d0e270..0d66d1c 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -553,7 +553,7 @@
typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count);
typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode);
-typedef void (*qdr_link_push_t) (void *context, qdr_link_t *link);
+typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit);
typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 8c95f9f..abc6a4c 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -201,22 +201,6 @@
int event_count = 0;
- do {
- sys_mutex_lock(conn->work_lock);
- ref = DEQ_HEAD(conn->links_with_deliveries);
- if (ref) {
- link = ref->link;
- qdr_del_link_ref(&conn->links_with_deliveries, ref->link, QDR_LINK_LIST_CLASS_DELIVERY);
- } else
- link = 0;
- sys_mutex_unlock(conn->work_lock);
-
- if (link) {
- core->push_handler(core->user_context, link);
- event_count++;
- }
- } while (link);
-
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(conn->work_list, work_list);
sys_mutex_unlock(conn->work_lock);
@@ -234,55 +218,93 @@
case QDR_CONNECTION_WORK_SECOND_ATTACH :
core->second_attach_handler(core->user_context, work->link, work->source, work->target);
break;
-
- case QDR_CONNECTION_WORK_FIRST_DETACH :
- core->detach_handler(core->user_context, work->link, work->error, true, work->close_link);
- break;
-
- case QDR_CONNECTION_WORK_SECOND_DETACH :
- core->detach_handler(core->user_context, work->link, work->error, false, work->close_link);
- free_qdr_link_t(work->link);
- break;
}
qdr_terminus_free(work->source);
qdr_terminus_free(work->target);
- qdr_error_free(work->error);
free_qdr_connection_work_t(work);
work = DEQ_HEAD(work_list);
}
do {
+ qdr_link_work_t *link_work;
+
sys_mutex_lock(conn->work_lock);
- ref = DEQ_HEAD(conn->links_with_credit);
+ ref = DEQ_HEAD(conn->links_with_work);
if (ref) {
link = ref->link;
- qdr_del_link_ref(&conn->links_with_credit, ref->link, QDR_LINK_LIST_CLASS_FLOW);
+ qdr_del_link_ref(&conn->links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK);
+ link_work = DEQ_HEAD(link->work_list);
+ if (link_work)
+ DEQ_REMOVE_HEAD(link->work_list);
} else
link = 0;
sys_mutex_unlock(conn->work_lock);
if (link) {
- if (link->incremental_credit > 0) {
- core->flow_handler(core->user_context, link, link->incremental_credit);
- link->incremental_credit = 0;
+ while (link_work) {
+ switch (link_work->work_type) {
+ case QDR_LINK_WORK_DELIVERY :
+ {
+ int count = core->push_handler(core->user_context, link, link_work->value);
+ assert(count <= link_work->value);
+ link_work->value -= count;
+ break;
+ }
- //
- // Note: This unprotected read of a CT-only value is safe in this case.
- // If there is pending credit on the link that needs to be pushed down to
- // Proton, we need to give the core a kick to make sure it is sent. It is
- // possible that no more credit will be issued to cause the movement of CT
- // credit to Proton credit (see DISPATCH-458).
- //
- if (link->incremental_credit_CT > 0)
- qdr_link_check_credit(core, link);
+ case QDR_LINK_WORK_FLOW :
+ if (link_work->value > 0)
+ core->flow_handler(core->user_context, link, link_work->value);
+ if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
+ core->drain_handler(core->user_context, link, true);
+ else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
+ core->drain_handler(core->user_context, link, false);
+ else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
+ core->drained_handler(core->user_context, link);
+ break;
+
+ case QDR_LINK_WORK_FIRST_DETACH :
+ core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link);
+ break;
+
+ case QDR_LINK_WORK_SECOND_DETACH :
+ core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link);
+ free_qdr_link_t(link);
+ break;
+ }
+
+ sys_mutex_lock(conn->work_lock);
+ if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0) {
+ DEQ_INSERT_HEAD(link->work_list, link_work);
+ link_work = 0; // Halt work processing
+ } else {
+ qdr_error_free(link_work->error);
+ free_qdr_link_work_t(link_work);
+ link_work = DEQ_HEAD(link->work_list);
+ if (link_work)
+ DEQ_REMOVE_HEAD(link->work_list);
+ }
+ sys_mutex_unlock(conn->work_lock);
+ event_count++;
}
- if (link->drain_mode_changed) {
- core->drain_handler(core->user_context, link, link->drain_mode);
- link->drain_mode_changed = false;
+
+ //
+ // Handle disposition/settlement updates
+ //
+ qdr_delivery_ref_list_t updated_deliveries;
+ sys_mutex_lock(conn->work_lock);
+ DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+ sys_mutex_unlock(conn->work_lock);
+
+ qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
+ while (dref) {
+ core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
+ qdr_delivery_decref(core, dref->dlv);
+ qdr_del_delivery_ref(&updated_deliveries, dref);
+ dref = DEQ_HEAD(updated_deliveries);
+ event_count++;
}
- event_count++;
}
} while (link);
@@ -470,6 +492,21 @@
}
+void qdr_link_enqueue_work_CT(qdr_core_t *core,
+ qdr_link_t *link,
+ qdr_link_work_t *work)
+{
+ qdr_connection_t *conn = link->conn;
+
+ sys_mutex_lock(conn->work_lock);
+ DEQ_INSERT_TAIL(link->work_list, work);
+ qdr_add_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ sys_mutex_unlock(conn->work_lock);
+
+ qdr_connection_activate_CT(core, conn);
+}
+
+
#define QDR_DISCRIMINATOR_SIZE 16
static void qdr_generate_discriminator(char *string)
{
@@ -554,8 +591,10 @@
qdr_delivery_ref_list_t updated_deliveries;
qdr_delivery_list_t undelivered;
qdr_delivery_list_t unsettled;
+ qdr_link_work_list_t work_list;
sys_mutex_lock(conn->work_lock);
+ DEQ_MOVE(link->work_list, work_list);
DEQ_MOVE(link->updated_deliveries, updated_deliveries);
DEQ_MOVE(link->undelivered, undelivered);
qdr_delivery_t *d = DEQ_HEAD(undelivered);
@@ -575,6 +614,17 @@
sys_mutex_unlock(conn->work_lock);
//
+ // Free the work list
+ //
+ qdr_link_work_t *link_work = DEQ_HEAD(work_list);
+ while (link_work) {
+ DEQ_REMOVE_HEAD(work_list);
+ qdr_error_free(link_work->error);
+ free_qdr_link_work_t(link_work);
+ link_work = DEQ_HEAD(work_list);
+ }
+
+ //
// Free all the 'updated' references
//
qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
@@ -675,8 +725,7 @@
//
qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
sys_mutex_lock(conn->work_lock);
- qdr_del_link_ref(&conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
- qdr_del_link_ref(&conn->links_with_credit, link, QDR_LINK_LIST_CLASS_FLOW);
+ qdr_del_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
sys_mutex_unlock(conn->work_lock);
//
@@ -724,7 +773,6 @@
work->link = link;
work->source = source;
work->target = target;
- work->error = 0;
qdr_connection_enqueue_work_CT(core, conn, work);
return link;
@@ -733,12 +781,10 @@
void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close)
{
- qdr_connection_work_t *work = new_qdr_connection_work_t();
+ qdr_link_work_t *work = new_qdr_link_work_t();
ZERO(work);
- work->work_type = ++link->detach_count == 1 ? QDR_CONNECTION_WORK_FIRST_DETACH : QDR_CONNECTION_WORK_SECOND_DETACH;
- work->link = link;
+ work->work_type = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH;
work->close_link = close;
- work->error = 0;
if (error)
work->error = error;
@@ -769,7 +815,7 @@
if (link->detach_count == 2)
qdr_link_cleanup_CT(core, link->conn, link);
- qdr_connection_enqueue_work_CT(core, link->conn, work);
+ qdr_link_enqueue_work_CT(core, link, work);
}
@@ -781,7 +827,6 @@
work->link = link;
work->source = source;
work->target = target;
- work->error = 0;
link->oper_status = QDR_LINK_OPER_UP;
@@ -1163,7 +1208,6 @@
DEQ_REMOVE_HEAD(conn->work_list);
qdr_terminus_free(work->source);
qdr_terminus_free(work->target);
- qdr_error_free(work->error);
free_qdr_connection_work_t(work);
work = DEQ_HEAD(conn->work_list);
}
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index debce01..b37b4b8 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -145,6 +145,19 @@
if (dlv->settled) {
DEQ_REMOVE(link->undelivered, dlv);
dlv->where = QDR_DELIVERY_NOWHERE;
+
+ //
+ // The link-work item representing this pending delivery must be
+ // updated to reflect the removal of the delivery. If the item
+ // has no other deliveries associated with it, it can be removed
+ // from the work list.
+ //
+ assert(dlv->link_work);
+ if (dlv->link_work && (--dlv->link_work->value == 0)) {
+ DEQ_REMOVE(link->work_list, dlv->link_work);
+ free_qdr_link_work_t(dlv->link_work);
+ dlv->link_work = 0;
+ }
qdr_delivery_decref_CT(core, dlv);
}
dlv = next;
@@ -169,9 +182,22 @@
qdr_delivery_incref(dlv);
//
- // If the link isn't already on the links_with_deliveries list, put it there.
+ // We must put a work item on the link's work list to represent this pending delivery.
+ // If there's already a delivery item on the tail of the work list, simply join that item
+ // by incrementing the value.
//
- qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+ qdr_link_work_t *work = DEQ_TAIL(link->work_list);
+ if (work && work->work_type == QDR_LINK_WORK_DELIVERY) {
+ work->value++;
+ } else {
+ work = new_qdr_link_work_t();
+ ZERO(work);
+ work->work_type = QDR_LINK_WORK_DELIVERY;
+ work->value = 1;
+ DEQ_INSERT_TAIL(link->work_list, work);
+ qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ }
+ dlv->link_work = work;
sys_mutex_unlock(link->conn->work_lock);
//
@@ -695,7 +721,6 @@
work->link = out_link;
work->source = source;
work->target = target;
- work->error = 0;
qdr_connection_enqueue_work_CT(core, conn, work);
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index ef73553..3257980 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -30,6 +30,7 @@
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
ALLOC_DEFINE(qdr_general_work_t);
+ALLOC_DEFINE(qdr_link_work_t);
ALLOC_DEFINE(qdr_connection_ref_t);
ALLOC_DEFINE(qdr_connection_info_t);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c657391..414647b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -152,6 +152,104 @@
ALLOC_DECLARE(qdr_action_t);
DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
+//
+// General Work
+//
+// The following types are used to post work to the IO threads for
+// non-connection-specific action. These actions are serialized through
+// a zero-delay timer and are processed by one thread at a time. General
+// actions occur in-order and are not run concurrently.
+//
+typedef struct qdr_general_work_t qdr_general_work_t;
+typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work);
+
+struct qdr_general_work_t {
+ DEQ_LINKS(qdr_general_work_t);
+ qdr_general_work_handler_t handler;
+ qdr_field_t *field;
+ int maskbit;
+ int inter_router_cost;
+ qdr_receive_t on_message;
+ void *on_message_context;
+ qd_message_t *msg;
+};
+
+ALLOC_DECLARE(qdr_general_work_t);
+DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t);
+
+qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler);
+
+
+//
+// Connection Work
+//
+// The following types are used to post work to the IO threads for
+// connection-specific action. The actions for a particular connection
+// are run in-order and are not concurrent. Actions for different connections
+// will run concurrently.
+//
+typedef enum {
+ QDR_CONNECTION_WORK_FIRST_ATTACH,
+ QDR_CONNECTION_WORK_SECOND_ATTACH
+} qdr_connection_work_type_t;
+
+typedef struct qdr_connection_work_t {
+ DEQ_LINKS(struct qdr_connection_work_t);
+ qdr_connection_work_type_t work_type;
+ qdr_link_t *link;
+ qdr_terminus_t *source;
+ qdr_terminus_t *target;
+} qdr_connection_work_t;
+
+ALLOC_DECLARE(qdr_connection_work_t);
+DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
+
+
+//
+// Link Work
+//
+// The following type is used to post link-specific work to the IO threads.
+// This ensures that work related to a particular link (deliveries, disposition
+// updates, flow updates, and detaches) are processed in-order.
+//
+// DELIVERY - Push up to _value_ deliveries from the undelivered list to the
+// link (outgoing links only). Don't push more than there is
+// available credit for. If the full number of deliveries (_value_)
+// cannot be pushed, don't consume this work item from the list.
+// This link will be blocked until further credit is received.
+// FLOW - Push a flow update using _drain_action_ and _value_ for the
+// number of incremental credits.
+// FIRST_DETACH - Issue a first detach on this link, using _error_ if there is an
+// error condition.
+// SECOND_DETACH - Issue a second detach on this link.
+//
+typedef enum {
+ QDR_LINK_WORK_DELIVERY,
+ QDR_LINK_WORK_FLOW,
+ QDR_LINK_WORK_FIRST_DETACH,
+ QDR_LINK_WORK_SECOND_DETACH
+} qdr_link_work_type_t;
+
+typedef enum {
+ QDR_LINK_WORK_DRAIN_ACTION_NONE = 0,
+ QDR_LINK_WORK_DRAIN_ACTION_SET,
+ QDR_LINK_WORK_DRAIN_ACTION_CLEAR,
+ QDR_LINK_WORK_DRAIN_ACTION_DRAINED
+} qdr_link_work_drain_action_t;
+
+typedef struct qdr_link_work_t {
+ DEQ_LINKS(struct qdr_link_work_t);
+ qdr_link_work_type_t work_type;
+ qdr_error_t *error;
+ int value;
+ bool close_link;
+ qdr_link_work_drain_action_t drain_action;
+} qdr_link_work_t;
+
+ALLOC_DECLARE(qdr_link_work_t);
+DEQ_DECLARE(qdr_link_work_t, qdr_link_work_list_t);
+
+
#define QDR_AGENT_MAX_COLUMNS 64
#define QDR_AGENT_COLUMN_NULL (QDR_AGENT_MAX_COLUMNS + 1)
@@ -225,6 +323,7 @@
qd_bitmask_t *link_exclusion;
qdr_address_t *tracking_addr;
int tracking_addr_bit;
+ qdr_link_work_t *link_work; ///< Delivery work item for this delivery
};
ALLOC_DECLARE(qdr_delivery_t);
@@ -242,10 +341,9 @@
void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref);
#define QDR_LINK_LIST_CLASS_ADDRESS 0
-#define QDR_LINK_LIST_CLASS_DELIVERY 1
-#define QDR_LINK_LIST_CLASS_FLOW 2
-#define QDR_LINK_LIST_CLASS_CONNECTION 3
-#define QDR_LINK_LIST_CLASSES 4
+#define QDR_LINK_LIST_CLASS_WORK 1
+#define QDR_LINK_LIST_CLASS_CONNECTION 2
+#define QDR_LINK_LIST_CLASSES 3
typedef enum {
QDR_LINK_OPER_UP,
@@ -262,6 +360,7 @@
qdr_connection_t *conn; ///< [ref] Connection that owns this link
qd_link_type_t link_type;
qd_direction_t link_direction;
+ qdr_link_work_list_t work_list;
char *name;
int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle
qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
@@ -276,11 +375,8 @@
bool strip_annotations_in;
bool strip_annotations_out;
int capacity;
- int incremental_credit_CT;
- int incremental_credit;
bool flow_started; ///< for incoming, true iff initial credit has been granted
bool drain_mode;
- bool drain_mode_changed;
int credit_to_core; ///< Number of the available credits incrementally given to the core
uint64_t total_deliveries;
@@ -389,59 +485,11 @@
//
-// General Work
+// Connection Information
//
-// The following types are used to post work to the IO threads for
-// non-connection-specific action. These actions are serialized through
-// a zero-delay timer and are processed by one thread at a time. General
-// actions occur in-order and are not run concurrently.
+// This record is used to give the core thread access to the details
+// of a connection's configuration.
//
-typedef struct qdr_general_work_t qdr_general_work_t;
-typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work);
-
-struct qdr_general_work_t {
- DEQ_LINKS(qdr_general_work_t);
- qdr_general_work_handler_t handler;
- qdr_field_t *field;
- int maskbit;
- int inter_router_cost;
- qdr_receive_t on_message;
- void *on_message_context;
- qd_message_t *msg;
-};
-
-ALLOC_DECLARE(qdr_general_work_t);
-DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t);
-
-qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler);
-
-//
-// Connection Work
-//
-// The following types are used to post work to the IO threads for
-// connection-specific action. The actions for a particular connection
-// are run in-order and are not concurrent. Actions for different connections
-// will run concurrently.
-//
-typedef enum {
- QDR_CONNECTION_WORK_FIRST_ATTACH,
- QDR_CONNECTION_WORK_SECOND_ATTACH,
- QDR_CONNECTION_WORK_FIRST_DETACH,
- QDR_CONNECTION_WORK_SECOND_DETACH
-} qdr_connection_work_type_t;
-
-typedef struct qdr_connection_work_t {
- DEQ_LINKS(struct qdr_connection_work_t);
- qdr_connection_work_type_t work_type;
- qdr_link_t *link;
- qdr_terminus_t *source;
- qdr_terminus_t *target;
- qdr_error_t *error;
- bool close_link;
-} qdr_connection_work_t;
-
-ALLOC_DECLARE(qdr_connection_work_t);
-DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
struct qdr_connection_info_t {
const char *container;
@@ -481,7 +529,7 @@
sys_mutex_t *work_lock;
qdr_link_ref_list_t links;
qdr_link_ref_list_t links_with_deliveries;
- qdr_link_ref_list_t links_with_credit;
+ qdr_link_ref_list_t links_with_work;
char *tenant_space;
int tenant_space_len;
qdr_connection_info_t *connection_info;
@@ -662,6 +710,9 @@
void qdr_connection_enqueue_work_CT(qdr_core_t *core,
qdr_connection_t *conn,
qdr_connection_work_t *work);
+void qdr_link_enqueue_work_CT(qdr_core_t *core,
+ qdr_link_t *conn,
+ qdr_link_work_t *work);
qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
qdr_connection_t *conn,
@@ -677,10 +728,4 @@
qd_router_entity_type_t type,
qd_composed_field_t *body);
-//
-// Cause the core to check credit on an incoming link that might have CT credit but
-// no IO/Proton credit.
-//
-void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link);
-
#endif
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index a35e658..66ab3d8 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -23,7 +23,6 @@
static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -129,6 +128,7 @@
dlv = DEQ_HEAD(link->undelivered);
if (dlv) {
DEQ_REMOVE_HEAD(link->undelivered);
+ dlv->link_work = 0;
settled = dlv->settled;
if (!settled) {
DEQ_INSERT_TAIL(link->unsettled, dlv);
@@ -156,22 +156,6 @@
else if (offer != -1)
core->offer_handler(core->user_context, link, offer);
}
-
- //
- // Handle disposition/settlement updates
- //
- qdr_delivery_ref_list_t updated_deliveries;
- sys_mutex_lock(conn->work_lock);
- DEQ_MOVE(link->updated_deliveries, updated_deliveries);
- sys_mutex_unlock(conn->work_lock);
-
- qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
- while (ref) {
- core->delivery_update_handler(core->user_context, ref->dlv, ref->dlv->disposition, ref->dlv->settled);
- qdr_delivery_decref(core, ref->dlv);
- qdr_del_delivery_ref(&updated_deliveries, ref);
- ref = DEQ_HEAD(updated_deliveries);
- }
}
@@ -197,14 +181,6 @@
}
-void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link)
-{
- qdr_action_t *action = qdr_action(qdr_link_check_credit_CT, "link_check_credit");
- action->args.connection.link = link;
- qdr_action_enqueue(core, action);
-}
-
-
void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_iterator_t *addr, bool exclude_inprocess, bool control)
{
qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -452,12 +428,13 @@
if (discard)
return;
- qdr_link_t *link = action->args.connection.link;
- int credit = action->args.connection.credit;
- bool drain = action->args.connection.drain;
- bool activate = false;
- bool drain_was_set = !link->drain_mode && drain;
-
+ qdr_link_t *link = action->args.connection.link;
+ int credit = action->args.connection.credit;
+ bool drain = action->args.connection.drain;
+ bool activate = false;
+ bool drain_was_set = !link->drain_mode && drain;
+ qdr_link_work_t *work = 0;
+
link->drain_mode = drain;
//
@@ -470,10 +447,13 @@
if (clink->link_direction == QD_INCOMING)
qdr_link_issue_credit_CT(core, link->connected_link, credit, drain);
else {
- sys_mutex_lock(clink->conn->work_lock);
- qdr_add_link_ref(&clink->conn->links_with_deliveries, clink, QDR_LINK_LIST_CLASS_DELIVERY);
- sys_mutex_unlock(clink->conn->work_lock);
- qdr_connection_activate_CT(core, clink->conn);
+ work = new_qdr_link_work_t();
+ ZERO(work);
+ work->work_type = QDR_LINK_WORK_FLOW;
+ work->value = credit;
+ if (drain)
+ work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
+ qdr_link_enqueue_work_CT(core, clink, work);
}
return;
@@ -483,9 +463,18 @@
// Handle the replenishing of credit outbound
//
if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
+ if (drain_was_set) {
+ work = new_qdr_link_work_t();
+ ZERO(work);
+ work->work_type = QDR_LINK_WORK_FLOW;
+ work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
+ }
+
sys_mutex_lock(link->conn->work_lock);
+ if (work)
+ DEQ_INSERT_TAIL(link->work_list, work);
if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
- qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+ qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}
sys_mutex_unlock(link->conn->work_lock);
@@ -499,16 +488,6 @@
}
-static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
- if (discard)
- return;
-
- qdr_link_t *link = action->args.connection.link;
- qdr_link_issue_credit_CT(core, link, 0, false);
-}
-
-
/**
* Return the number of outbound paths to destinations that this address has.
* Note that even if there are more than zero paths, the destination still may
@@ -771,42 +750,31 @@
/**
- * Check the link's accumulated credit. If the credit given to the connection thread
- * has been issued to Proton, provide the next batch of credit to the connection thread.
+ * Add link-work to provide credit to the link in an IO thread
*/
void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain)
{
+ assert(link->link_direction == QD_INCOMING);
+
bool drain_changed = link->drain_mode |= drain;
- bool activate = drain_changed;
+ link->drain_mode = drain;
- link->drain_mode = drain;
- link->drain_mode_changed = drain_changed;
+ if (!drain_changed && credit == 0)
+ return;
- link->incremental_credit_CT += credit;
- link->flow_started = true;
+ if (credit > 0)
+ link->flow_started = true;
- if (link->incremental_credit_CT && link->incremental_credit == 0) {
- //
- // Move the credit from the core-thread value to the connection-thread value.
- //
- link->incremental_credit = link->incremental_credit_CT;
- link->incremental_credit_CT = 0;
- activate = true;
- }
+ qdr_link_work_t *work = new_qdr_link_work_t();
+ ZERO(work);
- if (activate) {
- //
- // Put this link on the connection's has-credit list.
- //
- sys_mutex_lock(link->conn->work_lock);
- qdr_add_link_ref(&link->conn->links_with_credit, link, QDR_LINK_LIST_CLASS_FLOW);
- sys_mutex_unlock(link->conn->work_lock);
+ work->work_type = QDR_LINK_WORK_FLOW;
+ work->value = credit;
- //
- // Activate the connection
- //
- qdr_connection_activate_CT(core, link->conn);
- }
+ if (drain_changed)
+ work->drain_action = drain ? QDR_LINK_WORK_DRAIN_ACTION_SET : QDR_LINK_WORK_DRAIN_ACTION_CLEAR;
+
+ qdr_link_enqueue_work_CT(core, link, work);
}
@@ -876,7 +844,7 @@
if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
qdr_delivery_incref(dlv);
qdr_add_delivery_ref(&link->updated_deliveries, dlv);
- qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+ qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}
sys_mutex_unlock(link->conn->work_lock);
diff --git a/src/router_node.c b/src/router_node.c
index ff666e6..5868570 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -991,19 +991,24 @@
}
-static void CORE_link_push(void *context, qdr_link_t *link)
+static int CORE_link_push(void *context, qdr_link_t *link, int limit)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
- return;
+ return 0;
pn_link_t *plink = qd_link_pn(qlink);
if (plink) {
int link_credit = pn_link_credit(plink);
+ if (link_credit > limit)
+ link_credit = limit;
qdr_link_process_deliveries(router->router_core, link, link_credit);
+ return link_credit;
}
+
+ return 0;
}
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index e4bd2bc..e59f870 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -19,7 +19,7 @@
from proton.handlers import MessagingHandler
from proton.reactor import Container
-from proton import Message
+from proton import Message, Endpoint
class Timeout(object):
def __init__(self, parent):
@@ -54,6 +54,22 @@
self.sender = event.container.create_sender(self.conn, "org.apache.dev")
self.receiver.flow(1)
+ def on_link_flow(self, event):
+ if event.link.is_sender and event.link.credit \
+ and event.link.state & Endpoint.LOCAL_ACTIVE \
+ and event.link.state & Endpoint.REMOTE_ACTIVE :
+ self.on_sendable(event)
+
+ # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+ # messages. That along with 10 messages received indicates that the drain worked and we can
+ # declare that the test is successful
+ if self.received_count == 10 and event.link.credit == 0:
+ self.error = None
+ self.timer.cancel()
+ self.receiver.close()
+ self.sender.close()
+ self.conn.close()
+
def on_sendable(self, event):
if self.sent_count < 10:
msg = Message(body="Hello World", properties={'seq': self.sent_count})
@@ -75,16 +91,6 @@
# receiver
event.receiver.drain(20)
- # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
- # messages. That along with 10 messages received indicates that the drain worked and we can
- # declare that the test is successful
- if self.received_count == 10 and event.link.credit == 0:
- self.error = None
- self.timer.cancel()
- self.receiver.close()
- self.sender.close()
- self.conn.close()
-
def run(self):
Container(self).run()