| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import unicode_literals |
| from __future__ import division |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| from ..dispatch import LOG_INFO, LOG_TRACE, LOG_DEBUG |
| from .data import LinkState, ProtocolVersion |
| from .address import Address |
| from ..compat import dict_items |
| |
| |
| class NodeTracker(object): |
| """ |
| This module is responsible for tracking the set of router nodes that are known to this |
| router. It tracks whether they are neighbor or remote and whether they are reachable. |
| |
| This module is also responsible for assigning a unique mask bit value to each router. |
| The mask bit is used in the main router to represent sets of valid destinations for addresses. |
| """ |
| def __init__(self, container, max_routers): |
| self.container = container |
| self.my_id = container.id |
| self.max_routers = max_routers |
| self.link_state = LinkState(None, self.my_id, 0, {}) |
| self.link_state_changed = False |
| self.recompute_topology = False |
| self.last_topology_change = 0 |
| self.flux_mode = False |
| self.nodes = {} # id => RouterNode |
| self.nodes_by_link_id = {} # link-id => node-id |
| self.maskbits = [] |
| self.next_maskbit = 1 # Reserve bit '0' to represent this router |
| for i in range(max_routers): |
| self.maskbits.append(None) |
| self.maskbits[0] = True |
| self.neighbor_max_age = self.container.config.helloMaxAgeSeconds |
| self.ls_max_age = self.container.config.remoteLsMaxAgeSeconds |
| self.flux_interval = self.container.config.raIntervalFluxSeconds * 2 |
| self.container.router_adapter.get_agent().add_implementation(self, "router.node") |
| |
| |
| def refresh_entity(self, attributes): |
| """Refresh management attributes""" |
| attributes.update({ |
| "id": self.my_id, |
| "index": 0, |
| "protocolVersion": ProtocolVersion, |
| "instance": self.container.instance, # Boot number, integer |
| "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes |
| "nextHop": "(self)", |
| "validOrigins": [], |
| "address": Address.topological(self.my_id, area=self.container.area), |
| "lastTopoChange" : self.last_topology_change |
| }) |
| |
| |
| def _do_expirations(self, now): |
| """ |
| Run through the list of routers and check for expired conditions |
| """ |
| for node_id, node in dict_items(self.nodes): |
| ## |
| ## If the node is a neighbor, check the neighbor refresh time to see |
| ## if we've waited too long for a refresh. If so, disconnect the link |
| ## and remove the node from the local link state. |
| ## |
| if node.is_neighbor(): |
| if now - node.neighbor_refresh_time > self.neighbor_max_age: |
| node.remove_link() |
| if self.link_state.del_peer(node_id): |
| self.link_state_changed = True |
| |
| ## |
| ## Check the age of the node's link state. If it's too old, clear it out. |
| ## |
| if now - node.link_state.last_seen > self.ls_max_age: |
| if node.link_state.has_peers(): |
| node.link_state.del_all_peers() |
| self.recompute_topology = True |
| |
| ## |
| ## If the node has empty link state, check to see if it appears in any other |
| ## node's link state. If it does not, then delete the node. |
| ## |
| if not node.link_state.has_peers() and not node.is_neighbor(): |
| delete_node = True |
| for _id, _n in self.nodes.items(): |
| if _id != node_id: |
| if _n.link_state.is_peer(node_id): |
| delete_node = False |
| break |
| if delete_node: |
| ## |
| ## The keep_alive_count is set to zero when a new node is first |
| ## discovered. Since we can learn about a node before we receive |
| ## its link state, the keep_alive_count is used to prevent the |
| ## node from being deleted before we can learn more about it. |
| ## |
| node.keep_alive_count += 1 |
| if node.keep_alive_count > 2: |
| node.delete() |
| self.nodes.pop(node_id) |
| |
| |
| def tick(self, now): |
| send_ra = False |
| |
| ## |
| ## Expire neighbors and link state |
| ## |
| self._do_expirations(now) |
| |
| ## |
| ## Enter flux mode if things are changing |
| ## |
| if self.link_state_changed or self.recompute_topology: |
| self.last_topology_change = int(round(now)) |
| if not self.flux_mode: |
| self.flux_mode = True |
| self.container.log(LOG_TRACE, "Entered Router Flux Mode") |
| |
| ## |
| ## Handle local link state changes |
| ## |
| if self.link_state_changed: |
| self.link_state_changed = False |
| self.link_state.bump_sequence() |
| self.recompute_topology = True |
| send_ra = True |
| self.container.log_ls(LOG_TRACE, "Local Link State: %r" % self.link_state) |
| |
| ## |
| ## Recompute the topology |
| ## |
| if self.recompute_topology: |
| self.recompute_topology = False |
| collection = {self.my_id : self.link_state} |
| for node_id, node in self.nodes.items(): |
| collection[node_id] = node.link_state |
| next_hops, costs, valid_origins, radius = self.container.path_engine.calculate_routes(collection) |
| self.container.log_ls(LOG_INFO, "Computed next hops: %r" % next_hops) |
| self.container.log_ls(LOG_INFO, "Computed costs: %r" % costs) |
| self.container.log_ls(LOG_INFO, "Computed valid origins: %r" % valid_origins) |
| self.container.log_ls(LOG_INFO, "Computed radius: %d" % radius) |
| |
| ## |
| ## Update the topology radius |
| ## |
| self.container.router_adapter.set_radius(radius) |
| |
| ## |
| ## Update the next hops and valid origins for each node |
| ## |
| for node_id, next_hop_id in next_hops.items(): |
| node = self.nodes[node_id] |
| next_hop = self.nodes[next_hop_id] |
| vo = valid_origins[node_id] |
| cost = costs[node_id] |
| node.set_next_hop(next_hop) |
| node.set_valid_origins(vo) |
| node.set_cost(cost) |
| |
| ## |
| ## Send link-state requests and mobile-address requests to the nodes |
| ## that have pending requests and are reachable |
| ## |
| for node_id, node in self.nodes.items(): |
| if node.link_state_requested(): |
| self.container.link_state_engine.send_lsr(node_id) |
| if node.mobile_address_requested(): |
| self.container.router_adapter.mobile_seq_advanced(node.maskbit) |
| |
| ## |
| ## Send an immediate RA if our link state changed |
| ## |
| if send_ra: |
| self.container.link_state_engine.send_ra(now) |
| |
| |
| def neighbor_refresh(self, node_id, version, instance, link_id, cost, now): |
| """ |
| Invoked when the hello protocol has received positive confirmation |
| of continued bi-directional connectivity with a neighbor router. |
| """ |
| |
| ## |
| ## If the node id is not known, create a new RouterNode to track it. |
| ## |
| if node_id not in self.nodes: |
| self.nodes[node_id] = RouterNode(self, node_id, version, instance) |
| node = self.nodes[node_id] |
| |
| ## |
| ## Add the version if we haven't already done so. |
| ## |
| if node.version == None: |
| node.version = version |
| |
| ## |
| ## Set the link_id to indicate this is a neighbor router. If the link_id |
| ## changed, update the index and add the neighbor to the local link state. |
| ## |
| if node.set_link_id(link_id): |
| self.nodes_by_link_id[link_id] = node |
| node.request_link_state() |
| if self.link_state.add_peer(node_id, cost): |
| self.link_state_changed = True |
| |
| ## |
| ## Update the refresh time for later expiration checks |
| ## |
| node.neighbor_refresh_time = now |
| |
| ## |
| ## If the instance was updated (i.e. the neighbor restarted suddenly), |
| ## schedule a topology recompute and a link-state-request to that router. |
| ## |
| if node.update_instance(instance, version): |
| self.recompute_topology = True |
| node.request_link_state() |
| |
| |
| def link_lost(self, link_id): |
| """ |
| Invoked when an inter-router link is dropped. |
| """ |
| self.container.log_ls(LOG_INFO, "Link to Neighbor Router Lost - link_tag=%d" % link_id) |
| node_id = self.link_id_to_node_id(link_id) |
| if node_id: |
| self.nodes_by_link_id.pop(link_id) |
| node = self.nodes[node_id] |
| node.remove_link() |
| if self.link_state.del_peer(node_id): |
| self.link_state_changed = True |
| |
| |
| def set_mobile_seq(self, router_maskbit, mobile_seq): |
| """ |
| """ |
| for node in self.nodes.values(): |
| if node.maskbit == router_maskbit: |
| node.mobile_address_sequence = mobile_seq |
| return |
| |
| |
| def in_flux_mode(self, now): |
| result = (now - self.last_topology_change) <= self.flux_interval |
| if not result and self.flux_mode: |
| self.flux_mode = False |
| self.container.log(LOG_TRACE, "Exited Router Flux Mode") |
| return result |
| |
| |
| def ra_received(self, node_id, version, ls_seq, mobile_seq, instance, now): |
| """ |
| Invoked when a router advertisement is received from another router. |
| """ |
| ## |
| ## If the node id is not known, create a new RouterNode to track it. |
| ## |
| if node_id not in self.nodes: |
| self.nodes[node_id] = RouterNode(self, node_id, version, instance) |
| node = self.nodes[node_id] |
| |
| ## |
| ## Add the version if we haven't already done so. |
| ## |
| if node.version is None: |
| node.version = version |
| |
| ## |
| ## If the instance was updated (i.e. the router restarted suddenly), |
| ## schedule a topology recompute and a link-state-request to that router. |
| ## |
| if node.update_instance(instance, version): |
| self.recompute_topology = True |
| node.request_link_state() |
| |
| ## |
| ## Update the last seen time to now to control expiration of the link state. |
| ## |
| node.link_state.last_seen = now |
| |
| ## |
| ## Check the link state sequence. Send a link state request if our records are |
| ## not up to date. |
| ## |
| if node.link_state.ls_seq < ls_seq: |
| self.container.link_state_engine.send_lsr(node_id) |
| |
| ## |
| ## Check the mobile sequence. Send a mobile-address-request if we are |
| ## behind the advertized sequence. |
| ## |
| if node.mobile_address_sequence < mobile_seq: |
| node.mobile_address_request() |
| |
| |
| def router_learned(self, node_id, version): |
| """ |
| Invoked when we learn about another router by any means |
| """ |
| if node_id not in self.nodes and node_id != self.my_id: |
| self.nodes[node_id] = RouterNode(self, node_id, version, None) |
| |
| |
| def link_state_received(self, node_id, version, link_state, instance, now): |
| """ |
| Invoked when a link state update is received from another router. |
| """ |
| ## |
| ## If the node id is not known, create a new RouterNode to track it. |
| ## |
| if node_id not in self.nodes: |
| self.nodes[node_id] = RouterNode(self, node_id, version, instance) |
| node = self.nodes[node_id] |
| |
| ## |
| ## Add the version if we haven't already done so. |
| ## |
| if node.version is None: |
| node.version = version |
| |
| ## |
| ## If the new link state is more up-to-date than the stored link state, |
| ## update it and schedule a topology recompute. |
| ## |
| if link_state.ls_seq > node.link_state.ls_seq: |
| node.link_state = link_state |
| node.link_state.last_seen = now |
| self.recompute_topology = True |
| |
| ## |
| ## Look through the new link state for references to nodes that we don't |
| ## know about. Schedule link state requests for those nodes to be sent |
| ## after we next recompute the topology. |
| ## |
| for peer in node.link_state.peers: |
| if peer not in self.nodes: |
| self.router_learned(peer, None) |
| |
| |
| def router_node(self, node_id): |
| return self.nodes[node_id] |
| |
| |
| def link_id_to_node_id(self, link_id): |
| if link_id in self.nodes_by_link_id: |
| return self.nodes_by_link_id[link_id].id |
| return None |
| |
| |
| def _allocate_maskbit(self): |
| if self.next_maskbit is None: |
| raise Exception("Exceeded Maximum Router Count") |
| result = self.next_maskbit |
| self.next_maskbit = None |
| self.maskbits[result] = True |
| for n in range(result + 1, self.max_routers): |
| if self.maskbits[n] is None: |
| self.next_maskbit = n |
| break |
| return result |
| |
| |
| def _free_maskbit(self, i): |
| self.maskbits[i] = None |
| if self.next_maskbit is None or i < self.next_maskbit: |
| self.next_maskbit = i |
| |
| |
| |
| class RouterNode(object): |
| """ |
| RouterNode is used to track remote routers in the router network. |
| """ |
| |
| def __init__(self, parent, node_id, version, instance): |
| self.parent = parent |
| self.adapter = parent.container.router_adapter |
| self.log = parent.container.log |
| self.id = node_id |
| self.version = version |
| self.instance = instance |
| self.maskbit = self.parent._allocate_maskbit() |
| self.neighbor_refresh_time = 0.0 |
| self.peer_link_id = None |
| self.link_state = LinkState(None, self.id, 0, {}) |
| self.next_hop_router = None |
| self.cost = None |
| self.valid_origins = None |
| self.mobile_address_sequence = 0 |
| self.need_ls_request = True |
| self.need_mobile_request = False |
| self.keep_alive_count = 0 |
| self.adapter.add_router("amqp:/_topo/0/%s/qdrouter" % self.id, self.maskbit) |
| self.log(LOG_TRACE, "Node %s created: maskbit=%d" % (self.id, self.maskbit)) |
| self.adapter.get_agent().add_implementation(self, "router.node") |
| |
| def refresh_entity(self, attributes): |
| """Refresh management attributes""" |
| attributes.update({ |
| "id": self.id, |
| "index": self.maskbit, |
| "protocolVersion": self.version, |
| "instance": self.instance, # Boot number, integer |
| "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes |
| "nextHop": self.next_hop_router and self.next_hop_router.id, |
| "validOrigins": self.valid_origins, |
| "address": Address.topological(self.id, area=self.parent.container.area), |
| "routerLink": self.peer_link_id, |
| "cost": self.cost |
| }) |
| |
| def _logify(self, addr): |
| cls = addr[0] |
| phase = None |
| if cls == 'M': |
| phase = addr[1] |
| return "%s;class=%c;phase=%c" % (addr[2:], cls, phase) |
| return "%s;class=%c" % (addr[1:], cls) |
| |
| |
| def set_link_id(self, link_id): |
| if self.peer_link_id == link_id: |
| return False |
| self.peer_link_id = link_id |
| self.next_hop_router = None |
| self.adapter.set_link(self.maskbit, link_id) |
| self.adapter.remove_next_hop(self.maskbit) |
| self.log(LOG_TRACE, "Node %s link set: link_id=%r (removed next hop)" % (self.id, link_id)) |
| return True |
| |
| |
| def remove_link(self): |
| if self.peer_link_id is not None: |
| self.peer_link_id = None |
| self.adapter.remove_link(self.maskbit) |
| self.log(LOG_TRACE, "Node %s link removed" % self.id) |
| |
| |
| def delete(self): |
| self.adapter.get_agent().remove_implementation(self) |
| self.unmap_all_addresses() |
| self.adapter.del_router(self.maskbit) |
| self.parent._free_maskbit(self.maskbit) |
| self.log(LOG_TRACE, "Node %s deleted" % self.id) |
| |
| |
| def set_next_hop(self, next_hop): |
| if self.id == next_hop.id: |
| ## |
| ## If the next hop is self (destination is a neighbor) and there |
| ## was a next hop in place, explicitly remove the next hop (DISPATCH-873). |
| ## |
| self.remove_next_hop() |
| return |
| if self.next_hop_router and self.next_hop_router.id == next_hop.id: |
| return |
| self.next_hop_router = next_hop |
| self.adapter.set_next_hop(self.maskbit, next_hop.maskbit) |
| self.log(LOG_TRACE, "Node %s next hop set: %s" % (self.id, next_hop.id)) |
| |
| |
| def set_valid_origins(self, valid_origins): |
| if self.valid_origins == valid_origins: |
| return |
| self.valid_origins = valid_origins |
| vo_mb = [self.parent.nodes[N].maskbit for N in valid_origins] |
| self.adapter.set_valid_origins(self.maskbit, vo_mb) |
| self.log(LOG_TRACE, "Node %s valid origins: %r" % (self.id, valid_origins)) |
| |
| |
| def set_cost(self, cost): |
| if self.cost == cost: |
| return |
| self.cost = cost |
| self.adapter.set_cost(self.maskbit, cost) |
| self.log(LOG_TRACE, "Node %s cost: %d" % (self.id, cost)) |
| |
| |
| def remove_next_hop(self): |
| if self.next_hop_router: |
| self.next_hop_router = None |
| self.adapter.remove_next_hop(self.maskbit) |
| self.log(LOG_TRACE, "Node %s next hop removed" % self.id) |
| |
| |
| def is_neighbor(self): |
| return self.peer_link_id is not None |
| |
| |
| def request_link_state(self): |
| """ |
| Set the link-state-requested flag so we can send this node a link-state |
| request at the most opportune time. |
| """ |
| self.need_ls_request = True |
| |
| |
| def link_state_requested(self): |
| """ |
| Return True iff we need to request this node's link state AND the node is |
| reachable. There's no point in sending it a request if we don't know how to |
| reach it. |
| """ |
| if self.need_ls_request and (self.peer_link_id is not None or |
| self.next_hop_router is not None): |
| self.need_ls_request = False |
| return True |
| return False |
| |
| |
| def mobile_address_request(self): |
| self.need_mobile_request = True |
| |
| |
| def mobile_address_requested(self): |
| if self.need_mobile_request and (self.peer_link_id is not None or |
| self.next_hop_router is not None): |
| self.need_mobile_request = False |
| return True |
| return False |
| |
| |
| def unmap_all_addresses(self): |
| self.mobile_address_sequence = 0 |
| self.adapter.flush_destinations(self.maskbit) |
| self.log(LOG_DEBUG, "Remote destinations flushed from router %s" % (self.id)) |
| |
| |
| def update_instance(self, instance, version): |
| if instance is None: |
| return False |
| if self.instance is None: |
| self.instance = instance |
| return False |
| if self.instance == instance: |
| return False |
| |
| self.instance = instance |
| self.version = version |
| self.link_state.del_all_peers() |
| self.unmap_all_addresses() |
| self.log(LOG_INFO, "Detected Restart of Router Node %s" % self.id) |
| return True |
| |