DISPATCH-1532 - Reimplemented mobile-address-synchronization as a core module.
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index bcf5671..5d77004 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -85,18 +85,18 @@
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
void qdr_core_set_cost(qdr_core_t *core, int router_maskbit, int cost);
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers);
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash, int treatment_hint);
-void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash);
+void qdr_core_flush_destinations(qdr_core_t *core, int router_maskbit);
+void qdr_core_mobile_seq_advanced(qdr_core_t *core, int router_maskbit);
-typedef void (*qdr_mobile_added_t) (void *context, const char *address_hash, qd_address_treatment_t treatment);
-typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash);
-typedef void (*qdr_link_lost_t) (void *context, int link_maskbit);
+typedef void (*qdr_set_mobile_seq_t) (void *context, int router_maskbit, uint64_t mobile_seq);
+typedef void (*qdr_set_my_mobile_seq_t) (void *context, uint64_t mobile_seq);
+typedef void (*qdr_link_lost_t) (void *context, int link_maskbit);
-void qdr_core_route_table_handlers(qdr_core_t *core,
- void *context,
- qdr_mobile_added_t mobile_added,
- qdr_mobile_removed_t mobile_removed,
- qdr_link_lost_t link_lost);
+void qdr_core_route_table_handlers(qdr_core_t *core,
+ void *context,
+ qdr_set_mobile_seq_t set_mobile_seq,
+ qdr_set_my_mobile_seq_t set_my_mobile_seq,
+ qdr_link_lost_t link_lost);
/**
******************************************************************************
@@ -106,11 +106,27 @@
typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit, int inter_router_cost,
uint64_t conn_id);
+/**
+ * qdr_core_subscribe
+ *
+ * Subscribe an in-process handler to receive messages to a particular address.
+ *
+ * @param core Pointer to the core module
+ * @param address The address of messages to be received
+ * @param aclass Address class character
+ * @param phase Address phase character ('0' .. '9')
+ * @param treatment Treatment for the address if it be being created as a side effect of this call
+ * @param in_core True iff the handler is to be run in the context of the core thread
+ * @param on_message The handler function
+ * @param context The opaque context sent to the handler on all invocations
+ * @return Pointer to the subscription object
+ */
qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core,
const char *address,
char aclass,
char phase,
qd_address_treatment_t treatment,
+ bool in_core,
qdr_receive_t on_message,
void *context);
diff --git a/python/qpid_dispatch_internal/router/engine.py b/python/qpid_dispatch_internal/router/engine.py
index 753d743..e564523 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -26,7 +26,6 @@
from .hello import HelloProtocol
from .link import LinkStateEngine
from .path import PathEngine
-from .mobile import MobileAddressEngine
from .node import NodeTracker
from .message import Message
@@ -56,13 +55,10 @@
self._config = None # Not yet loaded
self._log_hello = LogAdapter("ROUTER_HELLO")
self._log_ls = LogAdapter("ROUTER_LS")
- self._log_ma = LogAdapter("ROUTER_MA")
self._log_general = LogAdapter("ROUTER")
- self.io_adapter = [IoAdapter(self.receive, "qdrouter", 'L', '0', TREATMENT_MULTICAST_FLOOD),
- IoAdapter(self.receive, "qdrouter.ma", 'L', '0', TREATMENT_MULTICAST_ONCE),
- IoAdapter(self.receive, "qdrouter", 'T', '0', TREATMENT_MULTICAST_FLOOD),
- IoAdapter(self.receive, "qdrouter.ma", 'T', '0', TREATMENT_MULTICAST_ONCE),
- IoAdapter(self.receive, "qdhello", 'L', '0', TREATMENT_MULTICAST_FLOOD)]
+ self.io_adapter = [IoAdapter(self.receive, "qdrouter", 'L', '0', TREATMENT_MULTICAST_FLOOD),
+ IoAdapter(self.receive, "qdrouter", 'T', '0', TREATMENT_MULTICAST_FLOOD),
+ IoAdapter(self.receive, "qdhello", 'L', '0', TREATMENT_MULTICAST_FLOOD)]
self.max_routers = max_routers
self.id = router_id
self.instance = int(time.time())
@@ -77,7 +73,6 @@
self.hello_protocol = HelloProtocol(self, self.node_tracker)
self.link_state_engine = LinkStateEngine(self)
self.path_engine = PathEngine(self)
- self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker)
##========================================================================================
@@ -98,26 +93,28 @@
raise ValueError("No router configuration found")
return self._config
- def addressAdded(self, addr, treatment):
- """
- """
- try:
- if addr[0] in 'MCDEFH':
- self.mobile_address_engine.add_local_address(addr, treatment)
- except Exception:
- self.log_ma(LOG_ERROR, "Exception in new-address processing\n%s" % format_exc(LOG_STACK_LIMIT))
- def addressRemoved(self, addr):
+ def setMobileSeq(self, router_maskbit, mobile_seq):
"""
+ Another router's mobile sequence number has been changed and the Python router needs to store
+ this number.
"""
- try:
- if addr[0] in 'MCDEFH':
- self.mobile_address_engine.del_local_address(addr)
- except Exception:
- self.log_ma(LOG_ERROR, "Exception in del-address processing\n%s" % format_exc(LOG_STACK_LIMIT))
+ self.node_tracker.set_mobile_seq(router_maskbit, mobile_seq)
+
+ def setMyMobileSeq(self, mobile_seq):
+ """
+ This router's mobile sequence number has been changed and the Python router needs to store
+ this number and immediately send a router-advertisement message to reflect the change.
+ """
+ self.link_state_engine.set_mobile_seq(mobile_seq)
+ self.link_state_engine.send_ra(time.time())
+
+
def linkLost(self, link_id):
"""
+ The control-link to a neighbor has been dropped. We can cancel the neighbor from the
+ link-state immediately instead of waiting for the hello-timeout to expire.
"""
self.node_tracker.link_lost(link_id)
@@ -133,6 +130,7 @@
except Exception:
self.log(LOG_ERROR, "Exception in timer processing\n%s" % format_exc(LOG_STACK_LIMIT))
+
def handleControlMessage(self, opcode, body, link_id, cost):
"""
"""
@@ -158,20 +156,11 @@
self.log_ls(LOG_TRACE, "RCVD: %r" % msg)
self.link_state_engine.handle_lsr(msg, now)
- elif opcode == 'MAU':
- msg = MessageMAU(body)
- self.log_ma(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mau(msg, now)
-
- elif opcode == 'MAR':
- msg = MessageMAR(body)
- self.log_ma(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mar(msg, now)
-
except Exception:
self.log(LOG_ERROR, "Exception in control message processing\n%s" % format_exc(LOG_STACK_LIMIT))
self.log(LOG_ERROR, "Control message error: opcode=%s body=%r" % (opcode, body))
+
def receive(self, message, link_id, cost):
"""
This is the IoAdapter message-receive handler
@@ -183,6 +172,7 @@
self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r" %
(message.properties, message.body))
+
def getRouterData(self, kind):
"""
"""
diff --git a/python/qpid_dispatch_internal/router/mobile.py b/python/qpid_dispatch_internal/router/mobile.py
deleted file mode 100644
index e6ee48e..0000000
--- a/python/qpid_dispatch_internal/router/mobile.py
+++ /dev/null
@@ -1,175 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from __future__ import unicode_literals
-from __future__ import division
-from __future__ import absolute_import
-from __future__ import print_function
-
-from .data import MessageMAR, MessageMAU
-from ..dispatch import LOG_TRACE
-
-MAX_KEPT_DELTAS = 10
-
-class MobileAddressEngine(object):
- """
- This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
- It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
- Note that this routing table maps from the mobile address to the remote router where that address
- is directly bound.
- """
- def __init__(self, container, node_tracker):
- self.container = container
- self.node_tracker = node_tracker
- self.id = self.container.id
- self.mobile_seq = 0
- self.local_addrs = set([])
- self.added_addrs = set([])
- self.deleted_addrs = set([])
- self.sent_deltas = {}
- self.treatments = {}
-
-
- def tick(self, now):
- ##
- ## If local addrs have changed, collect the changes and send a MAU with the diffs
- ## Note: it is important that the differential-MAU be sent before a RA is sent
- ##
- if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
- self.mobile_seq += 1
- hints = [self.treatments[a] for a in self.added_addrs]
- msg = MessageMAU(None, self.id, self.mobile_seq, list(self.added_addrs), list(self.deleted_addrs), _hints=hints)
-
- self.sent_deltas[self.mobile_seq] = msg
- if len(self.sent_deltas) > MAX_KEPT_DELTAS:
- self.sent_deltas.pop(self.mobile_seq - MAX_KEPT_DELTAS)
-
- self.container.send('amqp:/_topo/0/all/qdrouter.ma', msg)
- self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
- self.local_addrs.update(self.added_addrs)
- self.local_addrs.difference_update(self.deleted_addrs)
- self.added_addrs.clear()
- self.deleted_addrs.clear()
- return self.mobile_seq
-
-
- def add_local_address(self, addr, treatment):
- """
- """
- self.treatments[addr] = treatment
- if addr not in self.local_addrs:
- if addr not in self.added_addrs:
- self.added_addrs.add(addr)
- else:
- if addr in self.deleted_addrs:
- self.deleted_addrs.remove(addr)
-
-
- def del_local_address(self, addr):
- """
- """
- del self.treatments[addr]
- if addr in self.local_addrs:
- if addr not in self.deleted_addrs:
- self.deleted_addrs.add(addr)
- else:
- if addr in self.added_addrs:
- self.added_addrs.remove(addr)
-
-
- def handle_mau(self, msg, now):
- ##
- ## If the MAU is differential, we can only use it if its sequence is exactly one greater
- ## than our stored sequence. If not, we will ignore the content and schedule a MAR.
- ##
- ## If the MAU is absolute, we can use it in all cases.
- ##
- if msg.id == self.id:
- return
- node = self.node_tracker.router_node(msg.id)
-
- if msg.exist_list != None:
- ##
- ## Absolute MAU
- ##
- if msg.mobile_seq == node.mobile_address_sequence:
- return
- node.mobile_address_sequence = msg.mobile_seq
- node.overwrite_addresses(msg.exist_list)
- else:
- ##
- ## Differential MAU
- ##
- if node.mobile_address_sequence + 1 == msg.mobile_seq:
- ##
- ## This message represents the next expected sequence, incorporate the deltas
- ##
- node.mobile_address_sequence += 1
- treatments = msg.hints or []
- for a in msg.add_list:
- if len(treatments):
- treatment = treatments.pop(0)
- else:
- treatment = -1
- node.map_address(a, treatment)
- for a in msg.del_list:
- node.unmap_address(a)
-
- elif node.mobile_address_sequence == msg.mobile_seq:
- ##
- ## Ignore duplicates
- ##
- return
-
- else:
- ##
- ## This is an out-of-sequence delta. Don't use it. Schedule a MAR to
- ## get back on track.
- ##
- node.mobile_address_request()
-
-
- def handle_mar(self, msg, now):
- if msg.id == self.id:
- return
- if msg.have_seq == self.mobile_seq:
- return
- if self.mobile_seq - (msg.have_seq + 1) < len(self.sent_deltas):
- ##
- ## We can catch the peer up with a series of stored differential updates
- ##
- for s in range(msg.have_seq + 1, self.mobile_seq + 1):
- self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, self.sent_deltas[s])
- self.container.log_ma(LOG_TRACE, "SENT: %r" % self.sent_deltas[s])
- return
-
- ##
- ## The peer needs to be sent an absolute update with the whole address list
- ##
- smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, list(self.local_addrs))
- self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, smsg)
- self.container.log_ma(LOG_TRACE, "SENT: %r" % smsg)
-
-
- def send_mar(self, node_id, seq):
- msg = MessageMAR(None, self.id, seq)
- self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % node_id, msg)
- self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
-
-
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index c99ce95..aed6870 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -187,14 +187,7 @@
if node.link_state_requested():
self.container.link_state_engine.send_lsr(node_id)
if node.mobile_address_requested():
- self.container.mobile_address_engine.send_mar(node_id, node.mobile_address_sequence)
-
- ##
- ## If local changes have been made to the list of mobile addresses, send
- ## an unsolicited mobile-address-update to all routers.
- ##
- mobile_seq = self.container.mobile_address_engine.tick(now)
- self.container.link_state_engine.set_mobile_seq(mobile_seq)
+ self.container.router_adapter.mobile_seq_advanced(node.maskbit)
##
## Send an immediate RA if our link state changed
@@ -260,6 +253,15 @@
self.link_state_changed = True
+ def set_mobile_seq(self, router_maskbit, mobile_seq):
+ """
+ """
+ for node in self.nodes.values():
+ if node.maskbit == router_maskbit:
+ node.mobile_address_sequence = mobile_seq
+ return
+
+
def in_flux_mode(self, now):
result = (now - self.last_topology_change) <= self.flux_interval
if not result and self.flux_mode:
@@ -406,7 +408,6 @@
self.next_hop_router = None
self.cost = None
self.valid_origins = None
- self.mobile_addresses = set([])
self.mobile_address_sequence = 0
self.need_ls_request = True
self.need_mobile_request = False
@@ -541,34 +542,10 @@
return False
- def map_address(self, addr, treatment = -1):
- self.mobile_addresses.add(addr)
- self.adapter.map_destination(addr, treatment, self.maskbit)
- self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % (self._logify(addr), self.id))
-
-
- def unmap_address(self, addr):
- self.mobile_addresses.remove(addr)
- self.adapter.unmap_destination(addr, self.maskbit)
- self.log(LOG_DEBUG, "Remote destination %s unmapped from router %s" % (self._logify(addr), self.id))
-
-
def unmap_all_addresses(self):
self.mobile_address_sequence = 0
- for addr in self.mobile_addresses:
- self.adapter.unmap_destination(addr, self.maskbit)
- self.log(LOG_DEBUG, "Remote destination %s unmapped from router %s" % (self._logify(addr), self.id))
-
- def overwrite_addresses(self, addrs_list):
- added = []
- deleted = []
- addrs = set(addrs_list)
- added = addrs.difference(self.mobile_addresses)
- deleted = self.mobile_addresses.difference(addrs)
- for a in added:
- self.map_address(a)
- for a in deleted:
- self.unmap_address(a)
+ self.adapter.flush_destinations(self.maskbit)
+ self.log(LOG_DEBUG, "Remote destinations flushed from router %s" % (self.id))
def update_instance(self, instance, version):
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index d57df77..0a01d43 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -100,6 +100,7 @@
router_core/modules/address_lookup_server/address_lookup_server.c
router_core/modules/address_lookup_client/lookup_client.c
router_core/modules/stuck_delivery_detection/delivery_tracker.c
+ router_core/modules/mobile_sync/mobile.c
router_node.c
router_pynode.c
schema_enum.c
diff --git a/src/parse.c b/src/parse.c
index 087b8d5..25d303d 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -676,7 +676,9 @@
if (!field)
return 0;
- return field->tag == QD_AMQP_LIST8 || field->tag == QD_AMQP_LIST32;
+ return field->tag == QD_AMQP_LIST8
+ || field->tag == QD_AMQP_LIST32
+ || field->tag == QD_AMQP_LIST0;
}
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 154ee00..972a235 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -587,7 +587,8 @@
char *address = py_string_2_c(addr);
if (!address) return -1;
qd_error_clear();
- self->sub = qdr_core_subscribe(self->core, address, aclass, phase, treatment, qd_io_rx_handler, self);
+ self->sub = qdr_core_subscribe(self->core, address, aclass, phase, treatment,
+ false, qd_io_rx_handler, self);
free(address);
if (qd_error_code()) {
PyErr_SetString(PyExc_RuntimeError, qd_error_message());
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0a710d0..1a44baf 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1249,7 +1249,6 @@
&& DEQ_SIZE(addr->inlinks) == 0
&& qd_bitmask_cardinality(addr->rnodes) == 0
&& addr->ref_count == 0
- && !addr->block_deletion
&& addr->tracked_deliveries == 0
&& addr->core_endpoint == 0
&& addr->fallback_for == 0) {
@@ -1767,7 +1766,7 @@
return;
}
- qdr_address_t *addr = link->owning_addr;
+ qdr_address_t *addr = link->owning_addr;
if (link->detach_received)
return;
@@ -1839,7 +1838,9 @@
//
// Unbind the address and the link.
//
+ addr->ref_count++;
qdr_core_unbind_address_link_CT(core, addr, link);
+ addr->ref_count--;
//
// If this is an edge data link, raise a link event to indicate its detachment.
@@ -1865,8 +1866,11 @@
switch (link->link_type) {
case QD_LINK_ENDPOINT:
case QD_LINK_EDGE_DOWNLINK:
- if (addr)
+ if (addr) {
+ addr->ref_count++;
qdr_core_unbind_address_link_CT(core, addr, link);
+ addr->ref_count--;
+ }
break;
case QD_LINK_CONTROL:
diff --git a/src/router_core/core_events.c b/src/router_core/core_events.c
index 34e7b5f..bbdc1c0 100644
--- a/src/router_core/core_events.c
+++ b/src/router_core/core_events.c
@@ -21,14 +21,16 @@
struct qdrc_event_subscription_t {
- DEQ_LINKS_N(CONN, qdrc_event_subscription_t);
- DEQ_LINKS_N(LINK, qdrc_event_subscription_t);
- DEQ_LINKS_N(ADDR, qdrc_event_subscription_t);
+ DEQ_LINKS_N(CONN, qdrc_event_subscription_t);
+ DEQ_LINKS_N(LINK, qdrc_event_subscription_t);
+ DEQ_LINKS_N(ADDR, qdrc_event_subscription_t);
+ DEQ_LINKS_N(ROUTER, qdrc_event_subscription_t);
void *context;
qdrc_event_t events;
qdrc_connection_event_t on_conn_event;
qdrc_link_event_t on_link_event;
qdrc_address_event_t on_addr_event;
+ qdrc_router_event_t on_router_event;
};
@@ -37,21 +39,24 @@
qdrc_connection_event_t on_conn_event,
qdrc_link_event_t on_link_event,
qdrc_address_event_t on_addr_event,
+ qdrc_router_event_t on_router_event,
void *context)
{
qdrc_event_subscription_t *sub = NEW(qdrc_event_subscription_t);
ZERO(sub);
- sub->context = context;
- sub->events = events;
- sub->on_conn_event = on_conn_event;
- sub->on_link_event = on_link_event;
- sub->on_addr_event = on_addr_event;
+ sub->context = context;
+ sub->events = events;
+ sub->on_conn_event = on_conn_event;
+ sub->on_link_event = on_link_event;
+ sub->on_addr_event = on_addr_event;
+ sub->on_router_event = on_router_event;
- assert((events & ~(_QDRC_EVENT_CONN_RANGE | _QDRC_EVENT_LINK_RANGE | _QDRC_EVENT_ADDR_RANGE)) == 0);
- assert(!(events & _QDRC_EVENT_CONN_RANGE) || on_conn_event);
- assert(!(events & _QDRC_EVENT_LINK_RANGE) || on_link_event);
- assert(!(events & _QDRC_EVENT_ADDR_RANGE) || on_addr_event);
+ assert((events & ~(_QDRC_EVENT_CONN_RANGE | _QDRC_EVENT_LINK_RANGE | _QDRC_EVENT_ADDR_RANGE | _QDRC_EVENT_ROUTER_RANGE)) == 0);
+ assert(!(events & _QDRC_EVENT_CONN_RANGE) || on_conn_event);
+ assert(!(events & _QDRC_EVENT_LINK_RANGE) || on_link_event);
+ assert(!(events & _QDRC_EVENT_ADDR_RANGE) || on_addr_event);
+ assert(!(events & _QDRC_EVENT_ROUTER_RANGE) || on_router_event);
if (events & _QDRC_EVENT_CONN_RANGE)
DEQ_INSERT_TAIL_N(CONN, core->conn_event_subscriptions, sub);
@@ -62,6 +67,9 @@
if (events & _QDRC_EVENT_ADDR_RANGE)
DEQ_INSERT_TAIL_N(ADDR, core->addr_event_subscriptions, sub);
+ if (events & _QDRC_EVENT_ROUTER_RANGE)
+ DEQ_INSERT_TAIL_N(ROUTER, core->router_event_subscriptions, sub);
+
return sub;
}
@@ -77,6 +85,9 @@
if (sub->events & _QDRC_EVENT_ADDR_RANGE)
DEQ_REMOVE_N(ADDR, core->addr_event_subscriptions, sub);
+ if (sub->events & _QDRC_EVENT_ROUTER_RANGE)
+ DEQ_REMOVE_N(ROUTER, core->router_event_subscriptions, sub);
+
free(sub);
}
@@ -116,3 +127,15 @@
}
}
+
+void qdrc_event_router_raise(qdr_core_t *core, qdrc_event_t event, qdr_node_t *router)
+{
+ qdrc_event_subscription_t *sub = DEQ_HEAD(core->router_event_subscriptions);
+
+ while (sub) {
+ if (sub->events & event)
+ sub->on_router_event(sub->context, event, router);
+ sub = DEQ_NEXT_N(ROUTER, sub);
+ }
+}
+
diff --git a/src/router_core/core_events.h b/src/router_core/core_events.h
index 67f2fca..7b351c0 100644
--- a/src/router_core/core_events.h
+++ b/src/router_core/core_events.h
@@ -60,6 +60,11 @@
* QDRC_EVENT_ADDR_NO_LONGER_SOURCE An address transitioned from one to zero local sources (inlink)
* QDRC_EVENT_ADDR_TWO_SOURCE An address transitioned from one to two local sources (inlink)
* QDRC_EVENT_ADDR_ONE_SOURCE An address transitioned from two to one local sources (inlink)
+ *
+ * QDRC_EVENT_ROUTER_ADDED A remote router has been discovered
+ * QDRC_EVENT_ROUTER_REMOVED A remote router has been lost
+ * QDRC_EVENT_ROUTER_MOBILE_FLUSH A remote router needs its mobile addresses unmapped
+ * QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED A remote router's mobile sequence advanced past our version of the sequence
*/
#define QDRC_EVENT_CONN_OPENED 0x00000001
@@ -70,27 +75,33 @@
#define QDRC_EVENT_CONN_IR_LOST 0x00000020
#define _QDRC_EVENT_CONN_RANGE 0x0000003F
-#define QDRC_EVENT_LINK_IN_ATTACHED 0x00000100
-#define QDRC_EVENT_LINK_IN_DETACHED 0x00000200
-#define QDRC_EVENT_LINK_OUT_ATTACHED 0x00000400
-#define QDRC_EVENT_LINK_OUT_DETACHED 0x00000800
-#define QDRC_EVENT_LINK_EDGE_DATA_ATTACHED 0x00001000
-#define QDRC_EVENT_LINK_EDGE_DATA_DETACHED 0x00002000
-#define _QDRC_EVENT_LINK_RANGE 0x00003F00
+#define QDRC_EVENT_LINK_IN_ATTACHED 0x00000040
+#define QDRC_EVENT_LINK_IN_DETACHED 0x00000080
+#define QDRC_EVENT_LINK_OUT_ATTACHED 0x00000100
+#define QDRC_EVENT_LINK_OUT_DETACHED 0x00000200
+#define QDRC_EVENT_LINK_EDGE_DATA_ATTACHED 0x00000400
+#define QDRC_EVENT_LINK_EDGE_DATA_DETACHED 0x00000800
+#define _QDRC_EVENT_LINK_RANGE 0x00000FC0
-#define QDRC_EVENT_ADDR_ADDED 0x00010000
-#define QDRC_EVENT_ADDR_REMOVED 0x00020000
-#define QDRC_EVENT_ADDR_BECAME_LOCAL_DEST 0x00040000
-#define QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST 0x00080000
-#define QDRC_EVENT_ADDR_BECAME_DEST 0x00100000
-#define QDRC_EVENT_ADDR_NO_LONGER_DEST 0x00200000
-#define QDRC_EVENT_ADDR_ONE_LOCAL_DEST 0x00400000
-#define QDRC_EVENT_ADDR_TWO_DEST 0x00800000
-#define QDRC_EVENT_ADDR_BECAME_SOURCE 0x01000000
-#define QDRC_EVENT_ADDR_NO_LONGER_SOURCE 0x02000000
-#define QDRC_EVENT_ADDR_TWO_SOURCE 0x04000000
-#define QDRC_EVENT_ADDR_ONE_SOURCE 0x08000000
-#define _QDRC_EVENT_ADDR_RANGE 0x0FFF0000
+#define QDRC_EVENT_ADDR_ADDED 0x00001000
+#define QDRC_EVENT_ADDR_REMOVED 0x00002000
+#define QDRC_EVENT_ADDR_BECAME_LOCAL_DEST 0x00004000
+#define QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST 0x00008000
+#define QDRC_EVENT_ADDR_BECAME_DEST 0x00010000
+#define QDRC_EVENT_ADDR_NO_LONGER_DEST 0x00020000
+#define QDRC_EVENT_ADDR_ONE_LOCAL_DEST 0x00040000
+#define QDRC_EVENT_ADDR_TWO_DEST 0x00080000
+#define QDRC_EVENT_ADDR_BECAME_SOURCE 0x00100000
+#define QDRC_EVENT_ADDR_NO_LONGER_SOURCE 0x00200000
+#define QDRC_EVENT_ADDR_TWO_SOURCE 0x00400000
+#define QDRC_EVENT_ADDR_ONE_SOURCE 0x00800000
+#define _QDRC_EVENT_ADDR_RANGE 0x00FFF000
+
+#define QDRC_EVENT_ROUTER_ADDED 0x01000000
+#define QDRC_EVENT_ROUTER_REMOVED 0x02000000
+#define QDRC_EVENT_ROUTER_MOBILE_FLUSH 0x04000000
+#define QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED 0x08000000
+#define _QDRC_EVENT_ROUTER_RANGE 0x0F000000
/**
@@ -112,6 +123,10 @@
qdrc_event_t event_type,
qdr_address_t *addr);
+typedef void (*qdrc_router_event_t) (void *context,
+ qdrc_event_t event_type,
+ qdr_node_t *router);
+
/**
* qdrc_event_subscribe_CT
*
@@ -130,6 +145,7 @@
qdrc_connection_event_t on_conn_event,
qdrc_link_event_t on_link_event,
qdrc_address_event_t on_addr_event,
+ qdrc_router_event_t on_router_event,
void *context);
/**
@@ -152,5 +168,6 @@
void qdrc_event_conn_raise(qdr_core_t *core, qdrc_event_t event, qdr_connection_t *conn);
void qdrc_event_link_raise(qdr_core_t *core, qdrc_event_t event, qdr_link_t *link);
void qdrc_event_addr_raise(qdr_core_t *core, qdrc_event_t event, qdr_address_t *addr);
+void qdrc_event_router_raise(qdr_core_t *core, qdrc_event_t event, qdr_node_t *router);
#endif
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 6d98673..7471716 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -448,6 +448,15 @@
qdr_delivery_increment_counters_CT(core, delivery);
//
+ // Remove any subscription references
+ //
+ qdr_subscription_ref_t *sub = DEQ_HEAD(delivery->subscriptions);
+ while (sub) {
+ qdr_del_subscription_ref_CT(&delivery->subscriptions, sub);
+ sub = DEQ_HEAD(delivery->subscriptions);
+ }
+
+ //
// Free all the peer qdr_delivery_ref_t references
//
qdr_delivery_ref_t *ref = DEQ_HEAD(delivery->peers);
@@ -1063,11 +1072,11 @@
// The entire message has now been received. Check to see if there are in process subscriptions that need to
// receive this message. in process subscriptions, at this time, can deal only with full messages.
//
- qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions);
- while (sub) {
- DEQ_REMOVE_HEAD(in_dlv->subscriptions);
- qdr_forward_on_message_CT(core, sub, link, in_dlv->msg);
- sub = DEQ_HEAD(in_dlv->subscriptions);
+ qdr_subscription_ref_t *subref = DEQ_HEAD(in_dlv->subscriptions);
+ while (subref) {
+ qdr_forward_on_message_CT(core, subref->sub, link, in_dlv->msg);
+ qdr_del_subscription_ref_CT(&in_dlv->subscriptions, subref);
+ subref = DEQ_HEAD(in_dlv->subscriptions);
}
// This is a presettled multi-frame unicast delivery.
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 3e93970..ac29739 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -59,7 +59,7 @@
int tracking_addr_bit;
int ingress_index;
qdr_link_work_t *link_work; ///< Delivery work item for this delivery
- qdr_subscription_list_t subscriptions;
+ qdr_subscription_ref_list_t subscriptions;
qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer.
bool multicast; /// True if this delivery is targeted for a multicast address.
bool via_edge; /// True if this delivery arrived via an edge-connection.
diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c
index dc3b495..5bf1bbb 100644
--- a/src/router_core/exchange_bindings.c
+++ b/src/router_core/exchange_bindings.c
@@ -928,9 +928,11 @@
ex->alternate = next_hop(ex, alternate, alt_phase);
}
- qdr_post_mobile_added_CT(core,
- (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle),
- ex->qdr_addr->treatment);
+ //
+ // TODO - handle case where there was already a local dest.
+ //
+ qdr_addr_start_inlinks_CT(ex->core, ex->qdr_addr);
+ qdrc_event_addr_raise(ex->core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, ex->qdr_addr);
}
return ex;
@@ -939,8 +941,7 @@
static void qdr_exchange_free(qdr_exchange_t *ex)
{
if (ex->core->running && DEQ_SIZE(ex->qdr_addr->rlinks) == 0) {
- qdr_post_mobile_removed_CT(ex->core,
- (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle));
+ qdrc_event_addr_raise(ex->core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, ex->qdr_addr);
}
DEQ_REMOVE(ex->core->exchanges, ex);
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index ab73501..52e479a 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -269,14 +269,28 @@
void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg)
{
- qdr_general_work_t *work = qdr_general_work(qdr_forward_on_message);
- work->on_message = sub->on_message;
- work->on_message_context = sub->on_message_context;
- work->msg = qd_message_copy(msg);
- work->maskbit = link ? link->conn->mask_bit : 0;
- work->inter_router_cost = link ? link->conn->inter_router_cost : 1;
- work->in_conn_id = link ? link->conn->identity : 0;
- qdr_post_general_work_CT(core, work);
+ int mask_bit = link ? link->conn->mask_bit : 0;
+ int cost = link ? link->conn->inter_router_cost : 1;
+ uint64_t identity = link ? link->conn->identity : 0;
+
+ if (sub->in_core) {
+ //
+ // The handler runs in-core. Invoke it right now.
+ //
+ sub->on_message(sub->on_message_context, msg, mask_bit, cost, identity);
+ } else {
+ //
+ // The handler runs in an IO thread. Defer its invocation.
+ //
+ qdr_general_work_t *work = qdr_general_work(qdr_forward_on_message);
+ work->on_message = sub->on_message;
+ work->on_message_context = sub->on_message_context;
+ work->msg = qd_message_copy(msg);
+ work->maskbit = mask_bit;
+ work->inter_router_cost = cost;
+ work->in_conn_id = identity;
+ qdr_post_general_work_CT(core, work);
+ }
}
@@ -327,7 +341,7 @@
// after the message fully arrives
//
assert(in_dlv);
- DEQ_INSERT_TAIL(in_dlv->subscriptions, sub);
+ qdr_add_subscription_ref_CT(&in_dlv->subscriptions, sub);
qd_message_Q2_holdoff_disable(in_msg);
}
}
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index ba4ffd8..aa6f968 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -774,7 +774,7 @@
client->core = core;
client->event_sub = qdrc_event_subscribe_CT(client->core,
QDRC_EVENT_CONN_EDGE_ESTABLISHED | QDRC_EVENT_CONN_EDGE_LOST,
- on_conn_event, 0, 0,
+ on_conn_event, 0, 0, 0,
client);
core->addr_lookup_handler = qcm_addr_lookup_CT;
diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
index c89a9cf..2fa7bff 100644
--- a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
+++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
@@ -407,6 +407,7 @@
0,
on_link_event,
on_addr_event,
+ 0,
context);
}
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index 22998be..af6e5c2 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -526,7 +526,8 @@
on_conn_event,
0,
on_addr_event,
- ap);
+ 0,
+ ap);
core->edge_conn_addr = qcm_edge_conn_addr;
core->edge_context = ap;
diff --git a/src/router_core/modules/edge_router/connection_manager.c b/src/router_core/modules/edge_router/connection_manager.c
index bb4ae1c..76cfd6c 100644
--- a/src/router_core/modules/edge_router/connection_manager.c
+++ b/src/router_core/modules/edge_router/connection_manager.c
@@ -98,6 +98,7 @@
on_conn_event,
0,
0,
+ 0,
cm);
cm->active_edge_connection = 0;
diff --git a/src/router_core/modules/edge_router/edge_mgmt.c b/src/router_core/modules/edge_router/edge_mgmt.c
index beee1c5..84cb9a2 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.c
+++ b/src/router_core/modules/edge_router/edge_mgmt.c
@@ -303,8 +303,9 @@
(QDRC_EVENT_CONN_EDGE_ESTABLISHED
| QDRC_EVENT_CONN_EDGE_LOST),
_conn_event_CT,
- NULL, // link event
- NULL, // addr event
+ 0, // link event
+ 0, // addr event
+ 0, // router event
core); // context
}
diff --git a/src/router_core/modules/edge_router/link_route_proxy.c b/src/router_core/modules/edge_router/link_route_proxy.c
index 0e8cb7e..d21d23d 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.c
+++ b/src/router_core/modules/edge_router/link_route_proxy.c
@@ -462,8 +462,9 @@
| QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST
| QDRC_EVENT_ADDR_BECAME_LOCAL_DEST),
_on_conn_event,
- NULL,
+ 0,
_on_addr_event,
+ 0,
core);
}
diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c
new file mode 100644
index 0000000..5055f2f
--- /dev/null
+++ b/src/router_core/modules/mobile_sync/mobile.c
@@ -0,0 +1,892 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "module.h"
+#include "router_core_private.h"
+#include "core_events.h"
+#include "route_control.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/router.h>
+#include <stdio.h>
+#include <inttypes.h>
+
+#define PROTOCOL_VERSION 1
+static const char *OPCODE = "opcode";
+static const char *MAR = "MAR";
+static const char *MAU = "MAU";
+static const char *ID = "id";
+static const char *PV = "pv";
+static const char *AREA = "area";
+static const char *MOBILE_SEQ = "mobile_seq";
+static const char *HINTS = "hints";
+static const char *ADD = "add";
+static const char *DEL = "del";
+static const char *EXIST = "exist";
+static const char *HAVE_SEQ = "have_seq";
+
+//
+// Address.sync_mask bit values
+//
+#define ADDR_SYNC_IN_ADD_LIST 0x00000001
+#define ADDR_SYNC_IN_DEL_LIST 0x00000002
+#define ADDR_SYNC_TO_BE_DELETED 0x00000004
+#define ADDR_SYNC_MOBILE_TRACKING 0x00000008
+
+#define BIT_SET(M,B) M |= B
+#define BIT_CLEAR(M,B) M &= ~B
+#define BIT_IS_SET(M,B) (M & B)
+
+typedef struct {
+ qdr_core_t *core;
+ qdrc_event_subscription_t *event_sub;
+ qdr_core_timer_t *timer;
+ qdr_subscription_t *message_sub1;
+ qdr_subscription_t *message_sub2;
+ qd_log_source_t *log;
+ uint64_t mobile_seq;
+ qdr_address_list_t added_addrs;
+ qdr_address_list_t deleted_addrs;
+} qdrm_mobile_sync_t;
+
+
+//================================================================================
+// Helper Functions
+//================================================================================
+
+static qd_address_treatment_t qcm_mobile_sync_default_treatment(qdr_core_t *core, int hint) {
+ switch (hint) {
+ case QD_TREATMENT_MULTICAST_FLOOD:
+ return QD_TREATMENT_MULTICAST_FLOOD;
+ case QD_TREATMENT_MULTICAST_ONCE:
+ return QD_TREATMENT_MULTICAST_ONCE;
+ case QD_TREATMENT_ANYCAST_CLOSEST:
+ return QD_TREATMENT_ANYCAST_CLOSEST;
+ case QD_TREATMENT_ANYCAST_BALANCED:
+ return QD_TREATMENT_ANYCAST_BALANCED;
+ case QD_TREATMENT_LINK_BALANCED:
+ return QD_TREATMENT_LINK_BALANCED;
+ case QD_TREATMENT_UNAVAILABLE:
+ return QD_TREATMENT_UNAVAILABLE;
+ default:
+ return core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE ? QD_TREATMENT_ANYCAST_BALANCED : core->qd->default_treatment;
+ }
+}
+
+
+static bool qcm_mobile_sync_addr_is_mobile(qdr_address_t *addr)
+{
+ const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ return !!strchr("MCDEFH", hash_key[0]);
+}
+
+
+qdr_node_t *qdc_mobile_sync_router_by_id(qdrm_mobile_sync_t *msync, qd_parsed_field_t *id_field)
+{
+ qd_iterator_t *id_iter = qd_parse_raw(id_field);
+ qdr_node_t *router = DEQ_HEAD(msync->core->routers);
+ while (!!router) {
+ if (qd_iterator_equal(id_iter, qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1))
+ return router;
+ router = DEQ_NEXT(router);
+ }
+
+ return 0;
+}
+
+
+/**
+ * Bump the ref_count on the address to ensure it is not deleted out from under our attention.
+ */
+static void qcm_mobile_sync_start_tracking(qdr_address_t *addr)
+{
+ BIT_SET(addr->sync_mask, ADDR_SYNC_MOBILE_TRACKING);
+ addr->ref_count++;
+}
+
+
+/**
+ * Decrement the address's ref_count.
+ * Check the address to have it deleted if it is no longer referenced anywhere.
+ */
+static void qcm_mobile_sync_stop_tracking(qdr_core_t *core, qdr_address_t *addr)
+{
+ BIT_CLEAR(addr->sync_mask, ADDR_SYNC_MOBILE_TRACKING);
+ if (--addr->ref_count == 0)
+ qdr_check_addr_CT(core, addr);
+}
+
+
+static qd_composed_field_t *qcm_mobile_sync_message_headers(const char *address, const char *opcode)
+{
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(field);
+ qd_compose_insert_bool(field, 0); // durable
+ qd_compose_end_list(field);
+
+ field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+ qd_compose_start_list(field);
+ qd_compose_insert_null(field); // message-id
+ qd_compose_insert_null(field); // user-id
+ qd_compose_insert_string(field, address); // to
+ qd_compose_end_list(field);
+
+ field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+ qd_compose_start_map(field);
+ qd_compose_insert_symbol(field, OPCODE);
+ qd_compose_insert_string(field, opcode);
+ qd_compose_end_map(field);
+
+ return field;
+}
+
+
+static void qcm_mobile_sync_compose_diff_addr_list(qdrm_mobile_sync_t *msync, qd_composed_field_t *field, bool is_added)
+{
+ qdr_address_list_t *list = is_added ? &msync->added_addrs : &msync->deleted_addrs;
+
+ qd_compose_start_list(field);
+ qdr_address_t *addr = DEQ_HEAD(*list);
+ while (addr) {
+ const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ qd_compose_insert_string(field, hash_key);
+ if (is_added) {
+ DEQ_REMOVE_HEAD_N(SYNC_ADD, *list);
+ BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST);
+ } else {
+ DEQ_REMOVE_HEAD_N(SYNC_DEL, *list);
+ BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST);
+ qcm_mobile_sync_stop_tracking(msync->core, addr);
+ }
+ addr = DEQ_HEAD(*list);
+ }
+ qd_compose_end_list(field);
+}
+
+
+static void qcm_mobile_sync_compose_diff_hint_list(qdrm_mobile_sync_t *msync, qd_composed_field_t *field)
+{
+ qd_compose_start_list(field);
+ qdr_address_t *addr = DEQ_HEAD(msync->added_addrs);
+ while (addr) {
+ qd_compose_insert_int(field, addr->treatment);
+ addr = DEQ_NEXT_N(SYNC_ADD, addr);
+ }
+ qd_compose_end_list(field);
+}
+
+
+static qd_message_t *qcm_mobile_sync_compose_differential_mau(qdrm_mobile_sync_t *msync, const char *address)
+{
+ qd_message_t *msg = qd_message();
+ qd_composed_field_t *headers = qcm_mobile_sync_message_headers(address, MAU);
+ qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+ //
+ // Add the ingress and trace annotations to the message to prevent this multicast from bouncing
+ // back to us.
+ //
+ qd_composed_field_t *ingress = qd_compose_subfield(0);
+ qd_compose_insert_string(ingress, qd_router_id(msync->core->qd));
+
+ qd_composed_field_t *trace = qd_compose_subfield(0);
+ qd_compose_start_list(trace);
+ qd_compose_insert_string(trace, qd_router_id(msync->core->qd));
+ qd_compose_end_list(trace);
+
+ qd_message_set_ingress_annotation(msg, ingress);
+ qd_message_set_trace_annotation(msg, trace);
+
+ //
+ // Generate the message body
+ //
+ qd_compose_start_map(body);
+ qd_compose_insert_symbol(body, ID);
+ qd_compose_insert_string(body, msync->core->router_id);
+
+ qd_compose_insert_symbol(body, PV);
+ qd_compose_insert_long(body, PROTOCOL_VERSION);
+
+ qd_compose_insert_symbol(body, AREA);
+ qd_compose_insert_string(body, msync->core->router_area);
+
+ qd_compose_insert_symbol(body, MOBILE_SEQ);
+ qd_compose_insert_long(body, msync->mobile_seq);
+
+ qd_compose_insert_symbol(body, HINTS);
+ qcm_mobile_sync_compose_diff_hint_list(msync, body);
+
+ qd_compose_insert_symbol(body, ADD);
+ qcm_mobile_sync_compose_diff_addr_list(msync, body, true);
+
+ qd_compose_insert_symbol(body, DEL);
+ qcm_mobile_sync_compose_diff_addr_list(msync, body, false);
+
+ qd_compose_end_map(body);
+
+ qd_message_compose_3(msg, headers, body);
+ qd_compose_free(headers);
+ qd_compose_free(body);
+ return msg;
+}
+
+
+static qd_message_t *qcm_mobile_sync_compose_absolute_mau(qdrm_mobile_sync_t *msync, const char *address)
+{
+ qd_message_t *msg = qd_message();
+ qd_composed_field_t *headers = qcm_mobile_sync_message_headers(address, MAU);
+ qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+ qd_compose_start_map(body);
+ qd_compose_insert_symbol(body, ID);
+ qd_compose_insert_string(body, msync->core->router_id);
+
+ qd_compose_insert_symbol(body, PV);
+ qd_compose_insert_long(body, PROTOCOL_VERSION);
+
+ qd_compose_insert_symbol(body, AREA);
+ qd_compose_insert_string(body, msync->core->router_area);
+
+ qd_compose_insert_symbol(body, MOBILE_SEQ);
+ qd_compose_insert_long(body, msync->mobile_seq);
+
+ qd_compose_insert_symbol(body, EXIST);
+ qd_compose_start_list(body);
+ qdr_address_t *addr = DEQ_HEAD(msync->core->addrs);
+ while (!!addr) {
+ //
+ // For an address to be included in the list, it must:
+ // - be a mobile address
+ // - have at least one local consumer, link-route destination, or exchange
+ // _OR_ be in the delete list (because the peers haven't heard of its pending deletion)
+ // - not be in the add list (because the peers haven't heard of its pending addition)
+ //
+ // Note that in the two add/del list cases, we are reporting information that is not currently
+ // accurate. In these cases, a differentiao MAU will be sent very shortly that will put the
+ // peer router in the correct state.
+ //
+ if (qcm_mobile_sync_addr_is_mobile(addr)
+ && ((DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->conns) > 0 || !!addr->exchange)
+ || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST))
+ && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) {
+ const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ qd_compose_insert_string(body, hash_key);
+ }
+ addr = DEQ_NEXT(addr);
+ }
+ qd_compose_end_list(body);
+
+ qd_compose_insert_symbol(body, HINTS);
+ qd_compose_start_list(body);
+ addr = DEQ_HEAD(msync->core->addrs);
+ while (!!addr) {
+ //
+ // This loop uses the same logic as above.
+ //
+ if (qcm_mobile_sync_addr_is_mobile(addr)
+ && ((DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->conns) > 0 || !!addr->exchange)
+ || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST))
+ && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST))
+ qd_compose_insert_int(body, addr->treatment);
+ addr = DEQ_NEXT(addr);
+ }
+ qd_compose_end_list(body);
+ qd_compose_end_map(body);
+ qd_message_compose_3(msg, headers, body);
+ qd_compose_free(headers);
+ qd_compose_free(body);
+ return msg;
+}
+
+
+static qd_message_t *qcm_mobile_sync_compose_mar(qdrm_mobile_sync_t *msync, qdr_node_t *router)
+{
+ qd_message_t *msg = qd_message();
+ qd_composed_field_t *headers = qcm_mobile_sync_message_headers(router->wire_address_ma, MAR);
+ qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+ qd_compose_start_map(body);
+ qd_compose_insert_symbol(body, ID);
+ qd_compose_insert_string(body, msync->core->router_id);
+
+ qd_compose_insert_symbol(body, PV);
+ qd_compose_insert_long(body, PROTOCOL_VERSION);
+
+ qd_compose_insert_symbol(body, AREA);
+ qd_compose_insert_string(body, msync->core->router_area);
+
+ qd_compose_insert_symbol(body, HAVE_SEQ);
+ qd_compose_insert_long(body, router->mobile_seq);
+
+ qd_compose_end_map(body);
+
+ qd_message_compose_3(msg, headers, body);
+ qd_compose_free(headers);
+ qd_compose_free(body);
+ return msg;
+}
+
+
+//================================================================================
+// Timer Handler
+//================================================================================
+
+static void qcm_mobile_sync_on_timer_CT(qdr_core_t *core, void *context)
+{
+ qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context;
+
+ //
+ // Re-schedule the timer for the next go-around
+ //
+ qdr_core_timer_schedule_CT(core, msync->timer, 0);
+
+ //
+ // Check the add and delete lists. If they are empty, nothing of note occured in the last
+ // interval. Exit the handler function.
+ //
+ size_t added_count = DEQ_SIZE(msync->added_addrs);
+ size_t deleted_count = DEQ_SIZE(msync->deleted_addrs);
+
+ if (added_count == 0 && deleted_count == 0)
+ return;
+
+ //
+ // Bump the mobile sequence number.
+ //
+ msync->mobile_seq++;
+
+ //
+ // Prepare a differential MAU for sending to all the other routers.
+ //
+ qd_message_t *mau = qcm_mobile_sync_compose_differential_mau(msync, "_topo/0/all/qdrouter.ma");
+
+ //
+ // Multicast the control message. Set the exclude_inprocess and control flags.
+ // Use the TOPOLOGICAL class address for sending.
+ //
+ int fanout = qdr_forward_message_CT(core, core->routerma_addr_T, mau, 0, true, true);
+ qd_message_free(mau);
+
+ //
+ // Post the updated mobile sequence number to the Python router. It is important that this be
+ // done _after_ sending the differential MAU to prevent a storm of un-needed MAR requests from
+ // the other routers.
+ //
+ qdr_post_set_my_mobile_seq_CT(core, msync->mobile_seq);
+
+ //
+ // Trace log the activity of this sequence update.
+ //
+ qd_log(msync->log, QD_LOG_DEBUG, "New mobile sequence: mobile_seq=%"PRIu64", addrs_added=%ld, addrs_deleted=%ld, fanout=%d",
+ msync->mobile_seq, added_count, deleted_count, fanout);
+}
+
+
+//================================================================================
+// Message Handler
+//================================================================================
+
+static void qcm_mobile_sync_on_mar_CT(qdrm_mobile_sync_t *msync, qd_parsed_field_t *body)
+{
+ if (!!body && qd_parse_is_map(body)) {
+ qd_parsed_field_t *id_field = qd_parse_value_by_key(body, ID);
+ qd_parsed_field_t *have_seq_field = qd_parse_value_by_key(body, HAVE_SEQ);
+ uint64_t have_seq = qd_parse_as_ulong(have_seq_field);
+
+ qdr_node_t *router = qdc_mobile_sync_router_by_id(msync, id_field);
+ if (!!router) {
+ qd_log(msync->log, QD_LOG_DEBUG, "Received MAR from %s, have_seq=%"PRIu64,
+ (const char*) qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1, have_seq);
+
+ if (have_seq < msync->mobile_seq) {
+ //
+ // The requestor's view of our mobile_seq is less than our actual mobile_sync.
+ // Send them an absolute MAU to get them caught up to the present.
+ //
+ qd_message_t *mau = qcm_mobile_sync_compose_absolute_mau(msync, router->wire_address_ma);
+ (void) qdr_forward_message_CT(msync->core, router->owning_addr, mau, 0, true, true);
+ qd_message_free(mau);
+
+ //
+ // Trace log the activity of this sequence update.
+ //
+ qd_log(msync->log, QD_LOG_DEBUG, "Sent MAU to requestor: mobile_seq=%"PRIu64, msync->mobile_seq);
+ }
+ } else
+ qd_log(msync->log, QD_LOG_ERROR, "Received MAR from an unknown router");
+ }
+}
+
+
+static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field_t *body)
+{
+ if (!!body && qd_parse_is_map(body)) {
+ qd_parsed_field_t *id_field = qd_parse_value_by_key(body, ID);
+ qd_parsed_field_t *mobile_seq_field = qd_parse_value_by_key(body, MOBILE_SEQ);
+ uint64_t mobile_seq = qd_parse_as_ulong(mobile_seq_field);
+
+ qdr_node_t *router = qdc_mobile_sync_router_by_id(msync, id_field);
+ if (!!router) {
+ const char *router_id = (const char*) qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1;
+ qd_parsed_field_t *add_field = qd_parse_value_by_key(body, ADD);
+ qd_parsed_field_t *del_field = qd_parse_value_by_key(body, DEL);
+ qd_parsed_field_t *exist_field = qd_parse_value_by_key(body, EXIST);
+ qd_parsed_field_t *hints_field = qd_parse_value_by_key(body, HINTS);
+ uint32_t hints_count = 0;
+ qdr_address_t *addr;
+
+ //
+ // Validate the fields and determine what kind of MAU we've received.
+ //
+ if (!!hints_field && qd_parse_is_list(hints_field))
+ hints_count = qd_parse_sub_count(hints_field);
+
+ //
+ // Validate the exist, add, and del fields. They must, if they exist, be lists.
+ // If there is an exist field, there must not be an add or del field.
+ // If there is no exist field, there must be both an add and a del field.
+ //
+ if ((!!exist_field && !qd_parse_is_list(exist_field))
+ || (!!add_field && !qd_parse_is_list(add_field))
+ || (!!del_field && !qd_parse_is_list(del_field))
+ || (!!exist_field && (!!add_field || !!del_field))
+ || (!exist_field && (!add_field || !del_field))) {
+ qd_log(msync->log, QD_LOG_ERROR, "Received malformed MAU from %s", router_id);
+ return;
+ }
+
+ //
+ // Record the new mobile sequence for the remote router.
+ //
+ router->mobile_seq = mobile_seq;
+
+ //
+ // Check the exist/add list size against the hints-list size. If they are not
+ // exactly equal, ignore the hints.
+ //
+ if (!!exist_field) {
+ if (hints_count != qd_parse_sub_count(exist_field))
+ hints_count = 0;
+ } else {
+ if (hints_count != qd_parse_sub_count(add_field))
+ hints_count = 0;
+ }
+
+ qd_log(msync->log, QD_LOG_DEBUG, "Received MAU (%s) from %s, mobile_seq=%"PRIu64,
+ !!exist_field ? "absolute" : "differential", router_id, mobile_seq);
+
+ //
+ // If this is an absolute MAU, the existing set of addresses for this router must
+ // be marked as needing deletion, in case they are not mentioned in the existing
+ // address list.
+ //
+ if (!!exist_field) {
+ addr = DEQ_HEAD(msync->core->addrs);
+ while (!!addr) {
+ if (qcm_mobile_sync_addr_is_mobile(addr) && !!qd_bitmask_value(addr->rnodes, router->mask_bit))
+ BIT_SET(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED);
+ addr = DEQ_NEXT(addr);
+ }
+ }
+
+ //
+ // Run through the add/exist list (depending on which we have) and lookup/add the
+ // addresses, associating them with the sending router. Clear the to-delete bits
+ // on every address touched. If hints are available, use them for addresses that
+ // are newly created.
+ //
+ qd_parsed_field_t *field = !!exist_field ? exist_field : add_field;
+ qd_parsed_field_t *addr_field = qd_field_first_child(field);
+ qd_parsed_field_t *hint_field = !!hints_count ? qd_field_first_child(hints_field) : 0;
+ while (addr_field) {
+ qd_iterator_t *iter = qd_parse_raw(addr_field);
+ qdr_address_t *addr = 0;
+ int treatment_hint = !!hint_field ? qd_parse_as_int(hint_field) : -1;
+
+ do {
+ qd_hash_retrieve(msync->core->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ qdr_address_config_t *addr_config;
+ qd_address_treatment_t treatment =
+ qdr_treatment_for_address_hash_with_default_CT(msync->core,
+ iter,
+ qcm_mobile_sync_default_treatment(msync->core, treatment_hint),
+ &addr_config);
+ addr = qdr_address_CT(msync->core, treatment, addr_config);
+ if (!addr) {
+ qd_log(msync->log, QD_LOG_CRITICAL, "map_destination: ignored");
+ assert(false);
+ break;
+ }
+ qd_hash_insert(msync->core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(msync->core->addrs, addr);
+
+ //
+ // if the address is a link route, add the pattern to the wildcard
+ // address parse tree
+ //
+ const char *a_str = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ if (QDR_IS_LINK_ROUTE(a_str[0])) {
+ qdr_link_route_map_pattern_CT(msync->core, iter, addr);
+ }
+ }
+
+ BIT_CLEAR(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED);
+ if (!qd_bitmask_value(addr->rnodes, router->mask_bit)) {
+ qd_bitmask_set_bit(addr->rnodes, router->mask_bit);
+ router->ref_count++;
+ addr->cost_epoch--;
+ qdr_addr_start_inlinks_CT(msync->core, addr);
+
+ //
+ // Raise an address event if this is the first destination for the address
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 1)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_BECAME_DEST, addr);
+ else if (qd_bitmask_cardinality(addr->rnodes) == 1 && DEQ_SIZE(addr->rlinks) == 1)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_TWO_DEST, addr);
+ }
+ } while (false);
+
+ addr_field = qd_field_next_child(addr_field);
+ hint_field = !!hint_field ? qd_field_next_child(hint_field) : 0;
+ }
+
+ //
+ // Run through the delete list, if it exists, and disassociate each address from the
+ // sending router. Check the address to see if it needs to be deleted.
+ //
+ if (!!del_field) {
+ addr_field = qd_field_first_child(del_field);
+ while (!!addr_field) {
+ qd_iterator_t *iter = qd_parse_raw(addr_field);
+ qdr_address_t *addr = 0;
+
+ qd_hash_retrieve(msync->core->addr_hash, iter, (void**) &addr);
+ if (!!addr) {
+ if (qd_bitmask_value(addr->rnodes, router->mask_bit)) {
+ qd_bitmask_clear_bit(addr->rnodes, router->mask_bit);
+ router->ref_count--;
+ addr->cost_epoch--;
+
+ //
+ // Raise an address event if this was the last destination for the address
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr);
+ else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
+
+ qdr_check_addr_CT(msync->core, addr);
+ }
+ }
+ addr_field = qd_field_next_child(addr_field);
+ }
+ }
+
+ //
+ // If this was an absolute MAU, disassociate any addresses remaining with the
+ // to-delete flag set.
+ //
+ if (!!exist_field) {
+ addr = DEQ_HEAD(msync->core->addrs);
+ while (!!addr) {
+ qdr_address_t *next_addr = DEQ_NEXT(addr);
+ if (qcm_mobile_sync_addr_is_mobile(addr)
+ && !!qd_bitmask_value(addr->rnodes, router->mask_bit)
+ && BIT_IS_SET(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED)) {
+ qd_bitmask_clear_bit(addr->rnodes, router->mask_bit);
+ router->ref_count--;
+ addr->cost_epoch--;
+
+ //
+ // Raise an address event if this was the last destination for the address
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr);
+ else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
+
+ qdr_check_addr_CT(msync->core, addr);
+ }
+ addr = next_addr;
+ }
+ }
+
+ //
+ // Tell the python router about the new mobile sequence
+ //
+ qdr_post_set_mobile_seq_CT(msync->core, router->mask_bit, mobile_seq);
+ } else
+ qd_log(msync->log, QD_LOG_ERROR, "Received MAU from an unknown router");
+ }
+}
+
+
+static void qcm_mobile_sync_on_message_CT(void *context,
+ qd_message_t *msg,
+ int unused_link_maskbit,
+ int unused_inter_router_cost,
+ uint64_t unused_conn_id)
+{
+ qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context;
+ qd_iterator_t *ap_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
+ qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
+ qd_parsed_field_t *ap_field = qd_parse(ap_iter);
+ qd_parsed_field_t *body_field = qd_parse(body_iter);
+
+ if (!!ap_field && qd_parse_is_map(ap_field)) {
+ qd_parsed_field_t *opcode_field = qd_parse_value_by_key(ap_field, OPCODE);
+
+ if (qd_iterator_equal(qd_parse_raw(opcode_field), (const unsigned char*) MAR))
+ qcm_mobile_sync_on_mar_CT(msync, body_field);
+
+ if (qd_iterator_equal(qd_parse_raw(opcode_field), (const unsigned char*) MAU))
+ qcm_mobile_sync_on_mau_CT(msync, body_field);
+ }
+
+ qd_parse_free(ap_field);
+ qd_iterator_free(ap_iter);
+ qd_parse_free(body_field);
+ qd_iterator_free(body_iter);
+}
+
+
+//================================================================================
+// Event Handlers
+//================================================================================
+
+static void qcm_mobile_sync_on_became_local_dest_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr)
+{
+ if (!qcm_mobile_sync_addr_is_mobile(addr))
+ return;
+
+ qd_log(msync->log, QD_LOG_DEBUG, "Became Local Dest: %s", (const char*) qd_hash_key_by_handle(addr->hash_handle));
+
+ if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST))
+ return;
+
+ if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST)) {
+ //
+ // If the address was deleted since the last update, simply forget that it was deleted.
+ //
+ DEQ_REMOVE_N(SYNC_DEL, msync->deleted_addrs, addr);
+ BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST);
+ } else {
+ DEQ_INSERT_TAIL_N(SYNC_ADD, msync->added_addrs, addr);
+ BIT_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST);
+ qcm_mobile_sync_start_tracking(addr);
+ }
+}
+
+
+static void qcm_mobile_sync_on_no_longer_local_dest_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr)
+{
+ if (!qcm_mobile_sync_addr_is_mobile(addr))
+ return;
+
+ qd_log(msync->log, QD_LOG_DEBUG, "No Longer Local Dest: %s", (const char*) qd_hash_key_by_handle(addr->hash_handle));
+
+ if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST))
+ return;
+
+ if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) {
+ //
+ // If the address was added since the last update, simply forget that it was added.
+ //
+ DEQ_REMOVE_N(SYNC_ADD, msync->added_addrs, addr);
+ BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST);
+ qcm_mobile_sync_stop_tracking(msync->core, addr);
+ } else {
+ DEQ_INSERT_TAIL_N(SYNC_DEL, msync->deleted_addrs, addr);
+ BIT_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST);
+ }
+}
+
+
+static void qcm_mobile_sync_on_router_flush_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router)
+{
+ router->mobile_seq = 0;
+ qdr_address_t *addr = DEQ_HEAD(msync->core->addrs);
+ while (!!addr) {
+ qdr_address_t *next_addr = DEQ_NEXT(addr);
+ if (qcm_mobile_sync_addr_is_mobile(addr)
+ && !!qd_bitmask_value(addr->rnodes, router->mask_bit)) {
+ //
+ // This is an address mapped to the router. Unmap the address and clean up.
+ //
+ qd_bitmask_clear_bit(addr->rnodes, router->mask_bit);
+ router->ref_count--;
+ addr->cost_epoch--;
+
+ //
+ // Raise an address event if this was the last destination for the address
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr);
+ else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1)
+ qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
+
+ qdr_check_addr_CT(msync->core, addr);
+ }
+ addr = next_addr;
+ }
+}
+
+
+static void qcm_mobile_sync_on_router_advanced_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router)
+{
+ //
+ // Prepare a MAR to be sent to the router
+ //
+ qd_message_t *mar = qcm_mobile_sync_compose_mar(msync, router);
+
+ //
+ // Send the control message. Set the exclude_inprocess and control flags.
+ //
+ int fanout = qdr_forward_message_CT(msync->core, router->owning_addr, mar, 0, true, true);
+ qd_message_free(mar);
+
+ //
+ // Trace log the activity of this sequence update.
+ //
+ qd_log(msync->log, QD_LOG_DEBUG, "Send MAR request to router %s, have_seq=%"PRIu64", fanout=%d",
+ (const char*) qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1, router->mobile_seq, fanout);
+}
+
+
+static void qcm_mobile_sync_on_addr_event_CT(void *context,
+ qdrc_event_t event_type,
+ qdr_address_t *addr)
+{
+ qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context;
+
+ switch (event_type) {
+ case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST:
+ qcm_mobile_sync_on_became_local_dest_CT(msync, addr);
+ break;
+
+ case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST:
+ qcm_mobile_sync_on_no_longer_local_dest_CT(msync, addr);
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+static void qcm_mobile_sync_on_router_event_CT(void *context,
+ qdrc_event_t event_type,
+ qdr_node_t *router)
+{
+ qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context;
+
+ switch (event_type) {
+ case QDRC_EVENT_ROUTER_MOBILE_FLUSH:
+ qcm_mobile_sync_on_router_flush_CT(msync, router);
+ break;
+
+ case QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED:
+ qcm_mobile_sync_on_router_advanced_CT(msync, router);
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+//================================================================================
+// Module Handlers
+//================================================================================
+
+static bool qcm_mobile_sync_enable_CT(qdr_core_t *core)
+{
+ return core->router_mode == QD_ROUTER_MODE_INTERIOR;
+}
+
+
+static void qcm_mobile_sync_init_CT(qdr_core_t *core, void **module_context)
+{
+ qdrm_mobile_sync_t *msync = NEW(qdrm_mobile_sync_t);
+ ZERO(msync);
+ msync->core = core;
+
+ //
+ // Subscribe to core events:
+ //
+ // - ADDR_BECAME_LOCAL_DEST - Indicates a new address needs to tbe sync'ed with other routers
+ // - ADDR_NO_LONGER_LOCAL_DEST - Indicates an address needs to be un-sync'd with other routers
+ // - ROUTER_MOBILE_FLUSH - All addresses associated with the router must be unmapped
+ // - ROUTER_MOBILE_SEQ_ADVANCED - A router has an advanced mobile-seq and needs to be queried
+ //
+ msync->event_sub = qdrc_event_subscribe_CT(core,
+ QDRC_EVENT_ADDR_BECAME_LOCAL_DEST
+ | QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST
+ | QDRC_EVENT_ROUTER_MOBILE_FLUSH
+ | QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED,
+ 0,
+ 0,
+ qcm_mobile_sync_on_addr_event_CT,
+ qcm_mobile_sync_on_router_event_CT,
+ msync);
+
+ //
+ // Create and schedule a one-second recurring timer to drive the sync protocol
+ //
+ msync->timer = qdr_core_timer_CT(core, qcm_mobile_sync_on_timer_CT, msync);
+ qdr_core_timer_schedule_CT(core, msync->timer, 0);
+
+ //
+ // Subscribe to receive messages sent to the 'qdrouter.ma' addresses
+ //
+ msync->message_sub1 = qdr_core_subscribe(core, "qdrouter.ma", 'L', '0',
+ QD_TREATMENT_MULTICAST_ONCE, true, qcm_mobile_sync_on_message_CT, msync);
+ msync->message_sub2 = qdr_core_subscribe(core, "qdrouter.ma", 'T', '0',
+ QD_TREATMENT_MULTICAST_ONCE, true, qcm_mobile_sync_on_message_CT, msync);
+
+ //
+ // Create a log source for mobile address sync
+ //
+ msync->log = qd_log_source("ROUTER_MA");
+
+ *module_context = msync;
+}
+
+
+static void qcm_mobile_sync_final_CT(void *module_context)
+{
+ qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) module_context;
+
+ qdrc_event_unsubscribe_CT(msync->core, msync->event_sub);
+ qdr_core_timer_free_CT(msync->core, msync->timer);
+
+ //
+ // Don't explicitly unsubscribe the addresses, these are already gone at module-final time.
+ //
+
+ free(msync);
+}
+
+
+QDR_CORE_MODULE_DECLARE("mobile_sync", qcm_mobile_sync_enable_CT, qcm_mobile_sync_init_CT, qcm_mobile_sync_final_CT)
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index 86f4095..263b9d9 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -649,7 +649,7 @@
tc->conn_events = qdrc_event_subscribe_CT(test_module->core,
(QDRC_EVENT_CONN_OPENED | QDRC_EVENT_CONN_CLOSED),
_on_conn_event,
- NULL, NULL, tc);
+ 0, 0, 0, tc);
qd_log(test_module->core->log, QD_LOG_TRACE, "client test registered %p", tc->conn_events);
}
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 3466732..d6fdb15 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -21,18 +21,18 @@
#include "route_control.h"
#include <stdio.h>
-static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_set_cost_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_cost_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_flush_destinations_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_mobile_seq_advanced_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
//==================================================================================
@@ -108,34 +108,32 @@
}
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash, int treatment_hint)
+void qdr_core_flush_destinations(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdr_map_destination_CT, "map_destination");
+ qdr_action_t *action = qdr_action(qdr_flush_destinations_CT, "flush_destinations");
action->args.route_table.router_maskbit = router_maskbit;
- action->args.route_table.address = qdr_field(address_hash);
- action->args.route_table.treatment_hint = treatment_hint;
qdr_action_enqueue(core, action);
}
-void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash)
+void qdr_core_mobile_seq_advanced(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdr_unmap_destination_CT, "unmap_destination");
+ qdr_action_t *action = qdr_action(qdr_mobile_seq_advanced_CT, "mobile_seq_advanced");
action->args.route_table.router_maskbit = router_maskbit;
- action->args.route_table.address = qdr_field(address_hash);
qdr_action_enqueue(core, action);
}
-void qdr_core_route_table_handlers(qdr_core_t *core,
- void *context,
- qdr_mobile_added_t mobile_added,
- qdr_mobile_removed_t mobile_removed,
- qdr_link_lost_t link_lost)
+
+void qdr_core_route_table_handlers(qdr_core_t *core,
+ void *context,
+ qdr_set_mobile_seq_t set_mobile_seq,
+ qdr_set_my_mobile_seq_t set_my_mobile_seq,
+ qdr_link_lost_t link_lost)
{
- core->rt_context = context;
- core->rt_mobile_added = mobile_added;
- core->rt_mobile_removed = mobile_removed;
- core->rt_link_lost = link_lost;
+ core->rt_context = context;
+ core->rt_set_mobile_seq = set_mobile_seq;
+ core->rt_set_my_mobile_seq = set_my_mobile_seq;
+ core->rt_link_lost = link_lost;
}
@@ -144,6 +142,7 @@
char aclass,
char phase,
qd_address_treatment_t treatment,
+ bool in_core,
qdr_receive_t on_message,
void *context)
{
@@ -152,6 +151,7 @@
sub->addr = 0;
sub->on_message = on_message;
sub->on_message_context = context;
+ sub->in_core = in_core;
qdr_action_t *action = qdr_action(qdr_subscribe_CT, "subscribe");
action->args.io.address = qdr_field(address);
@@ -298,23 +298,26 @@
}
//
- // Set the block-deletion flag on this address for the time that it is associated
- // with an existing remote router node.
+ // Bump the address's ref_count for the time that it is associated with an existing remote router node.
//
- addr->block_deletion = true;
+ addr->ref_count++;
//
// Create a router-node record to represent the remote router.
//
qdr_node_t *rnode = new_qdr_node_t();
- DEQ_ITEM_INIT(rnode);
+ ZERO(rnode);
rnode->owning_addr = addr;
rnode->mask_bit = router_maskbit;
- rnode->next_hop = 0;
rnode->link_mask_bit = -1;
- rnode->ref_count = 0;
rnode->valid_origins = qd_bitmask(0);
- rnode->cost = 0;
+
+ qd_iterator_reset_view(iter, ITER_VIEW_ALL);
+ int addr_len = qd_iterator_length(iter);
+
+ rnode->wire_address_ma = (char*) malloc(addr_len + 4);
+ qd_iterator_ncopy(iter, (unsigned char*) rnode->wire_address_ma, addr_len);
+ strcpy(rnode->wire_address_ma + addr_len, ".ma");
//
// Insert at the head of the list because we don't yet know the cost to this
@@ -400,7 +403,7 @@
//
// Check the address and free it if there are no other interested parties tracking it
//
- oaddr->block_deletion = false;
+ oaddr->ref_count--;
qdr_check_addr_CT(core, oaddr);
}
@@ -558,141 +561,58 @@
qd_bitmask_free(valid_origins);
}
-static qd_address_treatment_t default_treatment(qdr_core_t *core, int hint) {
- switch (hint) {
- case QD_TREATMENT_MULTICAST_FLOOD:
- return QD_TREATMENT_MULTICAST_FLOOD;
- case QD_TREATMENT_MULTICAST_ONCE:
- return QD_TREATMENT_MULTICAST_ONCE;
- case QD_TREATMENT_ANYCAST_CLOSEST:
- return QD_TREATMENT_ANYCAST_CLOSEST;
- case QD_TREATMENT_ANYCAST_BALANCED:
- return QD_TREATMENT_ANYCAST_BALANCED;
- case QD_TREATMENT_LINK_BALANCED:
- return QD_TREATMENT_LINK_BALANCED;
- case QD_TREATMENT_UNAVAILABLE:
- return QD_TREATMENT_UNAVAILABLE;
- default:
- return core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE ? QD_TREATMENT_ANYCAST_BALANCED : core->qd->default_treatment;
- }
-}
-static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_flush_destinations_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
- int router_maskbit = action->args.route_table.router_maskbit;
- qdr_field_t *address = action->args.route_table.address;
- int treatment_hint = action->args.route_table.treatment_hint;
-
- if (discard) {
- qdr_field_free(address);
+ if (!!discard)
return;
- }
+
+ int router_maskbit = action->args.route_table.router_maskbit;
do {
if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router maskbit out of range: %d", router_maskbit);
+ qd_log(core->log, QD_LOG_CRITICAL, "flush_destinations: Router maskbit out of range: %d", router_maskbit);
break;
}
- if (core->routers_by_mask_bit[router_maskbit] == 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router not found");
- break;
- }
-
- qd_iterator_t *iter = address->iterator;
- qdr_address_t *addr = 0;
-
- qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
- if (!addr) {
- qdr_address_config_t *addr_config;
- qd_address_treatment_t treatment = qdr_treatment_for_address_hash_with_default_CT(core,
- iter,
- default_treatment(core, treatment_hint),
- &addr_config);
- addr = qdr_address_CT(core, treatment, addr_config);
- if (!addr) {
- qd_log(core->log, QD_LOG_CRITICAL, "map_destination: ignored");
- break;
- }
- qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_ITEM_INIT(addr);
- DEQ_INSERT_TAIL(core->addrs, addr);
- // if the address is a link route, add the pattern to the wildcard
- // address parse tree
- {
- const char *a_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
- if (QDR_IS_LINK_ROUTE(a_str[0])) {
- qdr_link_route_map_pattern_CT(core, iter, addr);
- }
- }
- }
-
qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
- qd_bitmask_set_bit(addr->rnodes, router_maskbit);
- rnode->ref_count++;
- addr->cost_epoch--;
- qdr_addr_start_inlinks_CT(core, addr);
+ if (rnode == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "flush_destinations: Router not found");
+ break;
+ }
//
- // Raise an address event if this is the first destination for the address
+ // Raise the event to be picked up by core modules.
//
- if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 1)
- qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_DEST, addr);
- else if (qd_bitmask_cardinality(addr->rnodes) == 1 && DEQ_SIZE(addr->rlinks) == 1)
- qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_DEST, addr);
+ qdrc_event_router_raise(core, QDRC_EVENT_ROUTER_MOBILE_FLUSH, rnode);
} while (false);
-
- qdr_field_free(address);
}
-static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_mobile_seq_advanced_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
- int router_maskbit = action->args.route_table.router_maskbit;
- qdr_field_t *address = action->args.route_table.address;
-
- if (discard) {
- qdr_field_free(address);
+ if (!!discard)
return;
- }
+
+ int router_maskbit = action->args.route_table.router_maskbit;
do {
if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router maskbit out of range: %d", router_maskbit);
+ qd_log(core->log, QD_LOG_CRITICAL, "seq_advanced: Router maskbit out of range: %d", router_maskbit);
break;
}
- if (core->routers_by_mask_bit[router_maskbit] == 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router not found");
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ if (rnode == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "seq_advanced: Router not found");
break;
}
- qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
- qd_iterator_t *iter = address->iterator;
- qdr_address_t *addr = 0;
-
- qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
- if (!addr) {
- qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Address not found");
- break;
- }
-
- qd_bitmask_clear_bit(addr->rnodes, router_maskbit);
- rnode->ref_count--;
- addr->cost_epoch--;
-
//
- // Raise an address event if this was the last destination for the address
+ // Raise the event to be picked up by core modules.
//
- if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0)
- qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr);
- else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1)
- qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
-
- qdr_check_addr_CT(core, addr);
+ qdrc_event_router_raise(core, QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED, rnode);
} while (false);
-
- qdr_field_free(address);
}
@@ -754,27 +674,15 @@
// Call-back Functions
//==================================================================================
-static void qdr_do_mobile_added(qdr_core_t *core, qdr_general_work_t *work)
+static void qdr_do_set_mobile_seq(qdr_core_t *core, qdr_general_work_t *work)
{
- char *address_hash = qdr_field_copy(work->field);
- if (address_hash) {
- core->rt_mobile_added(core->rt_context, address_hash, work->treatment);
- free(address_hash);
- }
-
- qdr_field_free(work->field);
+ core->rt_set_mobile_seq(core->rt_context, work->maskbit, work->mobile_seq);
}
-static void qdr_do_mobile_removed(qdr_core_t *core, qdr_general_work_t *work)
+static void qdr_do_set_my_mobile_seq(qdr_core_t *core, qdr_general_work_t *work)
{
- char *address_hash = qdr_field_copy(work->field);
- if (address_hash) {
- core->rt_mobile_removed(core->rt_context, address_hash);
- free(address_hash);
- }
-
- qdr_field_free(work->field);
+ core->rt_set_my_mobile_seq(core->rt_context, work->mobile_seq);
}
@@ -784,19 +692,19 @@
}
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, qd_address_treatment_t treatment)
+void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq)
{
- qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added);
- work->field = qdr_field(address_hash);
- work->treatment = treatment;
+ qdr_general_work_t *work = qdr_general_work(qdr_do_set_mobile_seq);
+ work->mobile_seq = mobile_seq;
+ work->maskbit = router_maskbit;
qdr_post_general_work_CT(core, work);
}
-void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash)
+void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq)
{
- qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_removed);
- work->field = qdr_field(address_hash);
+ qdr_general_work_t *work = qdr_general_work(qdr_do_set_my_mobile_seq);
+ work->mobile_seq = mobile_seq;
qdr_post_general_work_CT(core, work);
}
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 3ecea9c..bdbfb6e 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -37,6 +37,7 @@
ALLOC_DEFINE(qdr_link_work_t);
ALLOC_DEFINE(qdr_connection_ref_t);
ALLOC_DEFINE(qdr_connection_info_t);
+ALLOC_DEFINE(qdr_subscription_ref_t);
static void qdr_general_handler(void *context);
@@ -86,10 +87,10 @@
// Perform outside-of-thread setup for the management agent
//
core->agent_subscription_mobile = qdr_core_subscribe(core, "$management", 'M', '0',
- QD_TREATMENT_ANYCAST_CLOSEST,
+ QD_TREATMENT_ANYCAST_CLOSEST, false,
qdr_management_agent_on_message, core);
core->agent_subscription_local = qdr_core_subscribe(core, "$management", 'L', '0',
- QD_TREATMENT_ANYCAST_CLOSEST,
+ QD_TREATMENT_ANYCAST_CLOSEST, false,
qdr_management_agent_on_message, core);
return core;
@@ -220,6 +221,7 @@
DEQ_REMOVE(core->routers, rnode);
core->routers_by_mask_bit[rnode->mask_bit] = 0;
core->cost_epoch++;
+ free(rnode->wire_address_ma);
free_qdr_node_t(rnode);
}
@@ -376,7 +378,7 @@
if (addr) {
qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(core->addrs, addr);
- addr->block_deletion = true;
+ addr->ref_count++;
addr->local = (aclass == 'L');
}
}
@@ -553,8 +555,6 @@
if (link->link_direction == QD_OUTGOING) {
qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
if (DEQ_SIZE(addr->rlinks) == 1) {
- if (key && (*key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY || *key == QD_ITER_HASH_PREFIX_MOBILE))
- qdr_post_mobile_added_CT(core, key, addr->treatment);
qdr_addr_start_inlinks_CT(core, addr);
qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr);
} else if (DEQ_SIZE(addr->rlinks) == 2 && qd_bitmask_cardinality(addr->rnodes) == 0)
@@ -581,9 +581,6 @@
if (link->link_direction == QD_OUTGOING) {
qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
if (DEQ_SIZE(addr->rlinks) == 0) {
- const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
- if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY))
- qdr_post_mobile_removed_CT(core, key);
qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr);
} else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0)
qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
@@ -608,8 +605,6 @@
{
qdr_add_connection_ref(&addr->conns, conn);
if (DEQ_SIZE(addr->conns) == 1) {
- const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
- qdr_post_mobile_added_CT(core, key, addr->treatment);
qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr);
}
}
@@ -619,8 +614,6 @@
{
qdr_del_connection_ref(&addr->conns, conn);
if (DEQ_IS_EMPTY(addr->conns)) {
- const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
- qdr_post_mobile_removed_CT(core, key);
qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr);
}
}
@@ -691,6 +684,7 @@
qd_iterator_free(pattern);
}
+
void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
{
if (link->ref[cls] != 0)
@@ -763,6 +757,22 @@
}
+void qdr_add_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_t *sub)
+{
+ qdr_subscription_ref_t *ref = new_qdr_subscription_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->sub = sub;
+ DEQ_INSERT_TAIL(*list, ref);
+}
+
+
+void qdr_del_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_ref_t *ref)
+{
+ DEQ_REMOVE(*list, ref);
+ free_qdr_subscription_ref_t(ref);
+}
+
+
static void qdr_general_handler(void *context)
{
qdr_core_t *core = (qdr_core_t*) context;
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index cf6d044..614b8d5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -231,17 +231,16 @@
struct qdr_general_work_t {
DEQ_LINKS(qdr_general_work_t);
qdr_general_work_handler_t handler;
- qdr_field_t *field;
int maskbit;
int inter_router_cost;
qd_message_t *msg;
qdr_receive_t on_message;
void *on_message_context;
uint64_t in_conn_id;
- int treatment;
+ uint64_t mobile_seq;
qdr_delivery_cleanup_list_t delivery_cleanup_list;
- qdr_global_stats_handler_t stats_handler;
- void *context;
+ qdr_global_stats_handler_t stats_handler;
+ void *context;
};
ALLOC_DECLARE(qdr_general_work_t);
@@ -349,6 +348,8 @@
uint32_t ref_count;
qd_bitmask_t *valid_origins;
int cost;
+ uint64_t mobile_seq;
+ char *wire_address_ma; ///< The address of this router's mobile-sync agent in non-hashed form
};
DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
@@ -380,10 +381,22 @@
qdr_address_t *addr;
qdr_receive_t on_message;
void *on_message_context;
+ bool in_core;
};
DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t);
+typedef struct qdr_subscription_ref_t {
+ DEQ_LINKS(struct qdr_subscription_ref_t);
+ qdr_subscription_t *sub;
+} qdr_subscription_ref_t;
+
+ALLOC_DECLARE(qdr_subscription_ref_t);
+DEQ_DECLARE(qdr_subscription_ref_t, qdr_subscription_ref_list_t);
+
+void qdr_add_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_t *sub);
+void qdr_del_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_ref_t *ref);
+
DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv);
@@ -508,14 +521,20 @@
qdr_link_t *edge_outlink; ///< [ref] Out-link to connected Interior router (on edge router)
qd_address_treatment_t treatment;
qdr_forwarder_t *forwarder;
- int ref_count; ///< Number of link-routes + auto-links referencing this address
- bool block_deletion;
+ int ref_count; ///< Number of entities referencing this address
bool local;
bool router_control_only; ///< If set, address is only for deliveries arriving on a control link
uint32_t tracked_deliveries;
uint64_t cost_epoch;
//
+ // State for mobile-address synchronization
+ //
+ DEQ_LINKS_N(SYNC_ADD, qdr_address_t);
+ DEQ_LINKS_N(SYNC_DEL, qdr_address_t);
+ uint32_t sync_mask;
+
+ //
// State for tracking fallback destinations for undeliverable deliveries
//
qdr_address_t *fallback; ///< Pointer to this address's fallback destination
@@ -793,10 +812,10 @@
//
// Route table section
//
- void *rt_context;
- qdr_mobile_added_t rt_mobile_added;
- qdr_mobile_removed_t rt_mobile_removed;
- qdr_link_lost_t rt_link_lost;
+ void *rt_context;
+ qdr_set_mobile_seq_t rt_set_mobile_seq;
+ qdr_set_my_mobile_seq_t rt_set_my_mobile_seq;
+ qdr_link_lost_t rt_link_lost;
//
// Connection section
@@ -821,6 +840,7 @@
qdrc_event_subscription_list_t conn_event_subscriptions;
qdrc_event_subscription_list_t link_event_subscriptions;
qdrc_event_subscription_list_t addr_event_subscriptions;
+ qdrc_event_subscription_list_t router_event_subscriptions;
qd_router_mode_t router_mode;
const char *router_area;
@@ -918,8 +938,8 @@
void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, qd_address_treatment_t treatment);
-void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
+void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq);
+void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq);
void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 1d0c8d5..200e297 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -28,12 +28,12 @@
#include "entity_cache.h"
#include "python_private.h"
-static qd_log_source_t *log_source = 0;
-static PyObject *pyRouter = 0;
-static PyObject *pyTick = 0;
-static PyObject *pyAdded = 0;
-static PyObject *pyRemoved = 0;
-static PyObject *pyLinkLost = 0;
+static qd_log_source_t *log_source = 0;
+static PyObject *pyRouter = 0;
+static PyObject *pyTick = 0;
+static PyObject *pySetMobileSeq = 0;
+static PyObject *pySetMyMobileSeq = 0;
+static PyObject *pyLinkLost = 0;
typedef struct {
PyObject_HEAD
@@ -237,15 +237,13 @@
}
-static PyObject* qd_map_destination(PyObject *self, PyObject *args)
+static PyObject* qd_flush_destinations(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
- const char *addr_string;
- int treatment;
int maskbit;
- if (!PyArg_ParseTuple(args, "sii", &addr_string, &treatment, &maskbit))
+ if (!PyArg_ParseTuple(args, "i", &maskbit))
return 0;
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -253,21 +251,20 @@
return 0;
}
- qdr_core_map_destination(router->router_core, maskbit, addr_string, treatment);
+ qdr_core_flush_destinations(router->router_core, maskbit);
Py_INCREF(Py_None);
return Py_None;
}
-static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
+static PyObject* qd_mobile_seq_advanced(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
- const char *addr_string;
int maskbit;
- if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+ if (!PyArg_ParseTuple(args, "i", &maskbit))
return 0;
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -275,12 +272,13 @@
return 0;
}
- qdr_core_unmap_destination(router->router_core, maskbit, addr_string);
+ qdr_core_mobile_seq_advanced(router->router_core, maskbit);
Py_INCREF(Py_None);
return Py_None;
}
+
static PyObject* qd_get_agent(PyObject *self, PyObject *args) {
RouterAdapter *adapter = (RouterAdapter*) self;
PyObject *agent = adapter->router->qd->agent;
@@ -292,18 +290,18 @@
}
static PyMethodDef RouterAdapter_methods[] = {
- {"add_router", qd_add_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
- {"del_router", qd_del_router, METH_VARARGS, "We've lost reachability to a remote router"},
- {"set_link", qd_set_link, METH_VARARGS, "Set the link for a neighbor router"},
- {"remove_link", qd_remove_link, METH_VARARGS, "Remove the link for a neighbor router"},
- {"set_next_hop", qd_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
- {"remove_next_hop", qd_remove_next_hop, METH_VARARGS, "Remove the next hop for a remote router"},
- {"set_cost", qd_set_cost, METH_VARARGS, "Set the cost to reach a remote router"},
- {"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"},
- {"set_radius", qd_set_radius, METH_VARARGS, "Set the current topology radius"},
- {"map_destination", qd_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
- {"unmap_destination", qd_unmap_destination, METH_VARARGS, "Delete a destination mapping"},
- {"get_agent", qd_get_agent, METH_VARARGS, "Get the management agent"},
+ {"add_router", qd_add_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
+ {"del_router", qd_del_router, METH_VARARGS, "We've lost reachability to a remote router"},
+ {"set_link", qd_set_link, METH_VARARGS, "Set the link for a neighbor router"},
+ {"remove_link", qd_remove_link, METH_VARARGS, "Remove the link for a neighbor router"},
+ {"set_next_hop", qd_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
+ {"remove_next_hop", qd_remove_next_hop, METH_VARARGS, "Remove the next hop for a remote router"},
+ {"set_cost", qd_set_cost, METH_VARARGS, "Set the cost to reach a remote router"},
+ {"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"},
+ {"set_radius", qd_set_radius, METH_VARARGS, "Set the current topology radius"},
+ {"flush_destinations", qd_flush_destinations, METH_VARARGS, "Remove all mapped destinations from a router"},
+ {"mobile_seq_advanced", qd_mobile_seq_advanced, METH_VARARGS, "Mobile sequence for a router moved ahead of the local value"},
+ {"get_agent", qd_get_agent, METH_VARARGS, "Get the management agent"},
{0, 0, 0, 0}
};
@@ -317,18 +315,18 @@
};
-static void qd_router_mobile_added(void *context, const char *address_hash, qd_address_treatment_t treatment)
+static void qd_router_set_mobile_seq(void *context, int router_mask_bit, uint64_t mobile_seq)
{
qd_router_t *router = (qd_router_t*) context;
PyObject *pArgs;
PyObject *pValue;
- if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ if (pySetMobileSeq && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
qd_python_lock_state_t lock_state = qd_python_lock();
pArgs = PyTuple_New(2);
- PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(address_hash));
- PyTuple_SetItem(pArgs, 1, PyLong_FromLong((long) treatment));
- pValue = PyObject_CallObject(pyAdded, pArgs);
+ PyTuple_SetItem(pArgs, 0, PyLong_FromLong((long) router_mask_bit));
+ PyTuple_SetItem(pArgs, 1, PyLong_FromLong((long) mobile_seq));
+ pValue = PyObject_CallObject(pySetMobileSeq, pArgs);
qd_error_py();
Py_DECREF(pArgs);
Py_XDECREF(pValue);
@@ -337,17 +335,17 @@
}
-static void qd_router_mobile_removed(void *context, const char *address_hash)
+static void qd_router_set_my_mobile_seq(void *context, uint64_t mobile_seq)
{
qd_router_t *router = (qd_router_t*) context;
PyObject *pArgs;
PyObject *pValue;
- if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ if (pySetMobileSeq && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
qd_python_lock_state_t lock_state = qd_python_lock();
pArgs = PyTuple_New(1);
- PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(address_hash));
- pValue = PyObject_CallObject(pyRemoved, pArgs);
+ PyTuple_SetItem(pArgs, 0, PyLong_FromLong((long) mobile_seq));
+ pValue = PyObject_CallObject(pySetMyMobileSeq, pArgs);
qd_error_py();
Py_DECREF(pArgs);
Py_XDECREF(pValue);
@@ -362,7 +360,7 @@
PyObject *pArgs;
PyObject *pValue;
- if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ if (pyLinkLost && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
qd_python_lock_state_t lock_state = qd_python_lock();
pArgs = PyTuple_New(1);
PyTuple_SetItem(pArgs, 0, PyLong_FromLong((long) link_mask_bit));
@@ -382,8 +380,8 @@
qdr_core_route_table_handlers(router->router_core,
router,
- qd_router_mobile_added,
- qd_router_mobile_removed,
+ qd_router_set_mobile_seq,
+ qd_router_set_my_mobile_seq,
qd_router_link_lost);
//
@@ -450,10 +448,10 @@
Py_DECREF(adapterType);
QD_ERROR_PY_RET();
- pyTick = PyObject_GetAttrString(pyRouter, "handleTimerTick"); QD_ERROR_PY_RET();
- pyAdded = PyObject_GetAttrString(pyRouter, "addressAdded"); QD_ERROR_PY_RET();
- pyRemoved = PyObject_GetAttrString(pyRouter, "addressRemoved"); QD_ERROR_PY_RET();
- pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost"); QD_ERROR_PY_RET();
+ pyTick = PyObject_GetAttrString(pyRouter, "handleTimerTick"); QD_ERROR_PY_RET();
+ pySetMobileSeq = PyObject_GetAttrString(pyRouter, "setMobileSeq"); QD_ERROR_PY_RET();
+ pySetMyMobileSeq = PyObject_GetAttrString(pyRouter, "setMyMobileSeq"); QD_ERROR_PY_RET();
+ pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost"); QD_ERROR_PY_RET();
return qd_error_code();
}
diff --git a/tests/system_tests_interior_sync_up.py b/tests/system_tests_interior_sync_up.py
index 2b220f0..71e7f9d 100644
--- a/tests/system_tests_interior_sync_up.py
+++ b/tests/system_tests_interior_sync_up.py
@@ -129,7 +129,7 @@
self.timer = None
self.poll_timer = None
self.delay_timer = None
- self.count = 200
+ self.count = 2000
self.delay_count = 12 # This should be larger than MAX_KEPT_DELTAS in mobile.py
self.inter_router_port = inter_router_port
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 4d3f61b..5730340 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -352,12 +352,12 @@
blocking_connection = BlockingConnection(addr)
# Receive on org.apache.dev
- blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev")
+ blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev.1")
apply_options = AtMostOnce()
# Sender to to org.apache.dev
- blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options)
+ blocking_sender = blocking_connection.create_sender(address="org.apache.dev.1", options=apply_options)
msg = Message(body=hello_world_2)
# Send a message
blocking_sender.send(msg)
@@ -371,10 +371,10 @@
# Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
# that the message was link routed
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
- name='M0org.apache.dev').deliveriesEgress)
+ name='M0org.apache.dev.1').deliveriesEgress)
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
- name='M0org.apache.dev').deliveriesIngress)
+ name='M0org.apache.dev.1').deliveriesIngress)
blocking_connection.close()
@@ -563,12 +563,12 @@
blocking_connection = BlockingConnection(addr)
# Receive on org.apache
- blocking_receiver = blocking_connection.create_receiver(address="org.apache")
+ blocking_receiver = blocking_connection.create_receiver(address="org.apache.1")
apply_options = AtMostOnce()
# Sender to to org.apache
- blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options)
+ blocking_sender = blocking_connection.create_sender(address="org.apache.1", options=apply_options)
msg = Message(body=hello_world_4)
# Send a message
@@ -583,10 +583,10 @@
# Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
# that the message was link routed
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
- name='M0org.apache').deliveriesEgress)
+ name='M0org.apache.1').deliveriesEgress)
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
- name='M0org.apache').deliveriesIngress)
+ name='M0org.apache.1').deliveriesIngress)
blocking_connection.close()
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index 1a34cd3..486d75b 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -558,6 +558,7 @@
self.r_conns[name] = conn
self.create_receiver(event.container, conn, self.topic, name)
self.n_receivers += 1
+ self.c_received[name] = 0
def on_link_opened(self, event):
if event.receiver:
@@ -720,7 +721,7 @@
class MulticastPresettledRxFail(MulticastPresettled):
"""
- Spontaineously close a receiver or connection on message received
+ Spontaneously close a receiver or connection on message received
"""
def __init__(self, config, count, drop_clients, detach, body):
super(MulticastPresettledRxFail, self).__init__(config, count, body, SendPresettled())
@@ -871,7 +872,7 @@
class MulticastUnsettledRxFail(MulticastUnsettled3Ack):
"""
- Spontaineously close a receiver or connection on message received
+ Spontaneously close a receiver or connection on message received
"""
def __init__(self, config, count, drop_clients, detach, body):
super(MulticastUnsettledRxFail, self).__init__(config, count, body)