| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import os |
| import sys |
| import unittest |
| |
| sys.path.append(os.path.join(os.environ["SOURCE_DIR"], "python")) |
| |
| from qpid_dispatch_internal.router.engine import NeighborEngine, PathEngine, Configuration, NodeTracker |
| from qpid_dispatch_internal.router.data import LinkState, MessageHELLO |
| |
| class Adapter(object): |
| def __init__(self, domain): |
| self._domain = domain |
| |
| def log(self, level, text): |
| print "Adapter.log(%d): domain=%s, text=%s" % (level, self._domain, text) |
| |
| def send(self, dest, opcode, body): |
| print "Adapter.send: domain=%s, dest=%s, opcode=%s, body=%s" % (self._domain, dest, opcode, body) |
| |
| def remote_bind(self, subject, peer): |
| print "Adapter.remote_bind: subject=%s, peer=%s" % (subject, peer) |
| |
| def remote_unbind(self, subject, peer): |
| print "Adapter.remote_unbind: subject=%s, peer=%s" % (subject, peer) |
| |
| def node_updated(self, address, reachable, neighbor, link_bit, router_bit): |
| print "Adapter.node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \ |
| (address, reachable, neighbor, link_bit, router_bit) |
| |
| |
| class DataTest(unittest.TestCase): |
| def test_link_state(self): |
| ls = LinkState(None, 'R1', 'area', 1, ['R2', 'R3']) |
| self.assertEqual(ls.id, 'R1') |
| self.assertEqual(ls.area, 'area') |
| self.assertEqual(ls.ls_seq, 1) |
| self.assertEqual(ls.peers, ['R2', 'R3']) |
| ls.bump_sequence() |
| self.assertEqual(ls.id, 'R1') |
| self.assertEqual(ls.area, 'area') |
| self.assertEqual(ls.ls_seq, 2) |
| self.assertEqual(ls.peers, ['R2', 'R3']) |
| |
| result = ls.add_peer('R4') |
| self.assertTrue(result) |
| self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) |
| result = ls.add_peer('R2') |
| self.assertFalse(result) |
| self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) |
| |
| result = ls.del_peer('R3') |
| self.assertTrue(result) |
| self.assertEqual(ls.peers, ['R2', 'R4']) |
| result = ls.del_peer('R5') |
| self.assertFalse(result) |
| self.assertEqual(ls.peers, ['R2', 'R4']) |
| |
| encoded = ls.to_dict() |
| new_ls = LinkState(encoded) |
| self.assertEqual(new_ls.id, 'R1') |
| self.assertEqual(new_ls.area, 'area') |
| self.assertEqual(new_ls.ls_seq, 2) |
| self.assertEqual(new_ls.peers, ['R2', 'R4']) |
| |
| |
| def test_hello_message(self): |
| msg1 = MessageHELLO(None, 'R1', 'area', ['R2', 'R3', 'R4']) |
| self.assertEqual(msg1.get_opcode(), "HELLO") |
| self.assertEqual(msg1.id, 'R1') |
| self.assertEqual(msg1.area, 'area') |
| self.assertEqual(msg1.seen_peers, ['R2', 'R3', 'R4']) |
| encoded = msg1.to_dict() |
| msg2 = MessageHELLO(encoded) |
| self.assertEqual(msg2.get_opcode(), "HELLO") |
| self.assertEqual(msg2.id, 'R1') |
| self.assertEqual(msg2.area, 'area') |
| self.assertEqual(msg2.seen_peers, ['R2', 'R3', 'R4']) |
| self.assertTrue(msg2.is_seen('R3')) |
| self.assertFalse(msg2.is_seen('R9')) |
| |
| |
| class NodeTrackerTest(unittest.TestCase): |
| def log(self, level, text): |
| pass |
| |
| def add_neighbor_router(self, address, router_bit, link_bit): |
| self.address = address |
| self.router_bit = router_bit |
| self.link_bit = link_bit |
| self.calls += 1 |
| |
| def del_neighbor_router(self, router_bit): |
| self.address = None |
| self.router_bit = router_bit |
| self.link_bit = None |
| self.calls += 1 |
| |
| def add_remote_router(self, address, router_bit): |
| self.address = address |
| self.router_bit = router_bit |
| self.link_bit = None |
| self.calls += 1 |
| |
| def del_remote_router(self, router_bit): |
| self.address = None |
| self.router_bit = router_bit |
| self.link_bit = None |
| self.calls += 1 |
| |
| def reset(self): |
| self.address = None |
| self.router_bit = None |
| self.link_bit = None |
| self.area = "area" |
| self.calls = 0 |
| |
| def test_node_tracker_limits(self): |
| tracker = NodeTracker(self, 5) |
| |
| self.reset() |
| tracker.new_neighbor('A', 1) |
| self.assertEqual(self.address, 'amqp:/_topo/area/A') |
| self.assertEqual(self.link_bit, 1) |
| self.assertEqual(self.router_bit, 1) |
| self.assertEqual(self.calls, 1) |
| |
| self.reset() |
| tracker.new_neighbor('B', 5) |
| self.assertEqual(self.address, 'amqp:/_topo/area/B') |
| self.assertEqual(self.link_bit, 5) |
| self.assertEqual(self.router_bit, 2) |
| self.assertEqual(self.calls, 1) |
| |
| self.reset() |
| tracker.new_neighbor('C', 6) |
| self.assertEqual(self.address, 'amqp:/_topo/area/C') |
| self.assertEqual(self.link_bit, 6) |
| self.assertEqual(self.router_bit, 3) |
| self.assertEqual(self.calls, 1) |
| |
| self.reset() |
| tracker.new_neighbor('D', 7) |
| self.assertEqual(self.address, 'amqp:/_topo/area/D') |
| self.assertEqual(self.link_bit, 7) |
| self.assertEqual(self.router_bit, 4) |
| self.assertEqual(self.calls, 1) |
| |
| self.reset() |
| try: |
| tracker.new_neighbor('E', 9) |
| AssertFalse("We shouldn't be here") |
| except: |
| pass |
| |
| self.reset() |
| tracker.lost_neighbor('C') |
| self.assertEqual(self.router_bit, 3) |
| self.assertEqual(self.calls, 1) |
| |
| self.reset() |
| tracker.new_neighbor('E', 9) |
| self.assertEqual(self.address, 'amqp:/_topo/area/E') |
| self.assertEqual(self.link_bit, 9) |
| self.assertEqual(self.router_bit, 3) |
| self.assertEqual(self.calls, 1) |
| |
| |
| def test_node_tracker_remote_neighbor(self): |
| tracker = NodeTracker(self, 5) |
| |
| self.reset() |
| tracker.new_node('A') |
| self.assertEqual(self.address, 'amqp:/_topo/area/A') |
| self.assertFalse(self.link_bit) |
| self.assertEqual(self.router_bit, 1) |
| self.assertEqual(self.calls, 1) |
| |
| self.reset() |
| tracker.new_neighbor('A', 3) |
| self.assertEqual(self.address, 'amqp:/_topo/area/A') |
| self.assertEqual(self.link_bit, 3) |
| self.assertEqual(self.router_bit, 1) |
| self.assertEqual(self.calls, 2) |
| |
| self.reset() |
| tracker.lost_node('A') |
| self.assertFalse(self.address) |
| self.assertFalse(self.link_bit) |
| self.assertFalse(self.router_bit) |
| self.assertEqual(self.calls, 0) |
| |
| self.reset() |
| tracker.lost_neighbor('A') |
| self.assertEqual(self.router_bit, 1) |
| self.assertEqual(self.calls, 1) |
| |
| |
| def test_node_tracker_neighbor_remote(self): |
| tracker = NodeTracker(self, 5) |
| |
| self.reset() |
| tracker.new_neighbor('A', 3) |
| self.assertEqual(self.address, 'amqp:/_topo/area/A') |
| self.assertEqual(self.link_bit, 3) |
| self.assertEqual(self.router_bit, 1) |
| self.assertEqual(self.calls, 1) |
| |
| self.reset() |
| tracker.new_node('A') |
| self.assertFalse(self.address) |
| self.assertFalse(self.link_bit) |
| self.assertFalse(self.router_bit) |
| self.assertEqual(self.calls, 0) |
| |
| self.reset() |
| tracker.lost_neighbor('A') |
| self.assertEqual(self.address, 'amqp:/_topo/area/A') |
| self.assertEqual(self.router_bit, 1) |
| self.assertEqual(self.calls, 2) |
| |
| self.reset() |
| tracker.lost_node('A') |
| self.assertEqual(self.router_bit, 1) |
| self.assertEqual(self.calls, 1) |
| |
| |
| class NeighborTest(unittest.TestCase): |
| def log(self, level, text): |
| pass |
| |
| def send(self, dest, msg): |
| self.sent.append((dest, msg)) |
| |
| def local_link_state_changed(self, link_state): |
| self.local_link_state = link_state |
| |
| def new_neighbor(self, rid, lbit): |
| self.neighbors[rid] = None |
| |
| def lost_neighbor(self, rid): |
| self.neighbors.pop(rid) |
| |
| def setUp(self): |
| self.sent = [] |
| self.local_link_state = None |
| self.id = "R1" |
| self.area = "area" |
| self.config = Configuration() |
| self.neighbors = {} |
| |
| def test_hello_sent(self): |
| self.sent = [] |
| self.local_link_state = None |
| self.engine = NeighborEngine(self) |
| self.engine.tick(0.5) |
| self.assertEqual(self.sent, []) |
| self.engine.tick(1.5) |
| self.assertEqual(len(self.sent), 1) |
| dest, msg = self.sent.pop(0) |
| self.assertEqual(dest, "amqp:/_local/qdhello") |
| self.assertEqual(msg.get_opcode(), "HELLO") |
| self.assertEqual(msg.id, self.id) |
| self.assertEqual(msg.area, self.area) |
| self.assertEqual(msg.seen_peers, []) |
| self.assertEqual(self.local_link_state, None) |
| |
| def test_sees_peer(self): |
| self.sent = [] |
| self.local_link_state = None |
| self.engine = NeighborEngine(self) |
| self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', []), 2.0, 0) |
| self.engine.tick(5.0) |
| self.assertEqual(len(self.sent), 1) |
| dest, msg = self.sent.pop(0) |
| self.assertEqual(msg.seen_peers, ['R2']) |
| |
| def test_establish_peer(self): |
| self.sent = [] |
| self.local_link_state = None |
| self.engine = NeighborEngine(self) |
| self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5, 0) |
| self.engine.tick(1.0) |
| self.engine.tick(2.0) |
| self.engine.tick(3.0) |
| self.assertEqual(self.local_link_state.id, 'R1') |
| self.assertEqual(self.local_link_state.area, 'area') |
| self.assertEqual(self.local_link_state.ls_seq, 1) |
| self.assertEqual(self.local_link_state.peers, ['R2']) |
| |
| def test_establish_multiple_peers(self): |
| self.sent = [] |
| self.local_link_state = None |
| self.engine = NeighborEngine(self) |
| self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5, 0) |
| self.engine.tick(1.0) |
| self.engine.handle_hello(MessageHELLO(None, 'R3', 'area', ['R1', 'R2']), 1.5, 0) |
| self.engine.tick(2.0) |
| self.engine.handle_hello(MessageHELLO(None, 'R4', 'area', ['R1']), 2.5, 0) |
| self.engine.handle_hello(MessageHELLO(None, 'R5', 'area', ['R2']), 2.5, 0) |
| self.engine.handle_hello(MessageHELLO(None, 'R6', 'area', ['R1']), 2.5, 0) |
| self.engine.tick(3.0) |
| self.assertEqual(self.local_link_state.id, 'R1') |
| self.assertEqual(self.local_link_state.area, 'area') |
| self.assertEqual(self.local_link_state.ls_seq, 3) |
| self.local_link_state.peers.sort() |
| self.assertEqual(self.local_link_state.peers, ['R2', 'R3', 'R4', 'R6']) |
| |
| def test_timeout_peer(self): |
| self.sent = [] |
| self.local_link_state = None |
| self.engine = NeighborEngine(self) |
| self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R3', 'R1']), 2.0, 0) |
| self.engine.tick(5.0) |
| self.engine.tick(17.1) |
| self.assertEqual(self.local_link_state.id, 'R1') |
| self.assertEqual(self.local_link_state.area, 'area') |
| self.assertEqual(self.local_link_state.ls_seq, 2) |
| self.assertEqual(self.local_link_state.peers, []) |
| |
| |
| class PathTest(unittest.TestCase): |
| def setUp(self): |
| self.id = 'R1' |
| self.area = 'area' |
| self.next_hops = None |
| self.valid_origins = None |
| self.engine = PathEngine(self) |
| |
| def log(self, level, text): |
| pass |
| |
| def next_hops_changed(self, nh): |
| self.next_hops = nh |
| |
| def valid_origins_changed(self, vo): |
| self.valid_origins = vo |
| |
| def test_topology1(self): |
| """ |
| |
| +====+ +----+ +----+ |
| | R1 |------| R2 |------| R3 | |
| +====+ +----+ +----+ |
| |
| """ |
| collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), |
| 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R2']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 2) |
| self.assertEqual(self.next_hops['R2'], 'R2') |
| self.assertEqual(self.next_hops['R3'], 'R2') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.assertEqual(self.valid_origins['R2'], []) |
| self.assertEqual(self.valid_origins['R3'], []) |
| |
| def test_topology2(self): |
| """ |
| |
| +====+ +----+ +----+ |
| | R1 |------| R2 |------| R4 | |
| +====+ +----+ +----+ |
| | | |
| +----+ +----+ +----+ |
| | R3 |------| R5 |------| R6 | |
| +----+ +----+ +----+ |
| |
| """ |
| collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), |
| 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3', 'R4']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R2', 'R5']), |
| 'R4': LinkState(None, 'R4', 'area', 1, ['R2', 'R5']), |
| 'R5': LinkState(None, 'R5', 'area', 1, ['R3', 'R4', 'R6']), |
| 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 5) |
| self.assertEqual(self.next_hops['R2'], 'R2') |
| self.assertEqual(self.next_hops['R3'], 'R2') |
| self.assertEqual(self.next_hops['R4'], 'R2') |
| self.assertEqual(self.next_hops['R5'], 'R2') |
| self.assertEqual(self.next_hops['R6'], 'R2') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.valid_origins['R4'].sort() |
| self.valid_origins['R5'].sort() |
| self.valid_origins['R6'].sort() |
| self.assertEqual(self.valid_origins['R2'], []) |
| self.assertEqual(self.valid_origins['R3'], []) |
| self.assertEqual(self.valid_origins['R4'], []) |
| self.assertEqual(self.valid_origins['R5'], []) |
| self.assertEqual(self.valid_origins['R6'], []) |
| |
| def test_topology3(self): |
| """ |
| |
| +----+ +----+ +----+ |
| | R2 |------| R3 |------| R4 | |
| +----+ +----+ +----+ |
| | | |
| +====+ +----+ +----+ |
| | R1 |------| R5 |------| R6 | |
| +====+ +----+ +----+ |
| |
| """ |
| collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), |
| 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), |
| 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), |
| 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), |
| 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 5) |
| self.assertEqual(self.next_hops['R2'], 'R3') |
| self.assertEqual(self.next_hops['R3'], 'R3') |
| self.assertEqual(self.next_hops['R4'], 'R3') |
| self.assertEqual(self.next_hops['R5'], 'R5') |
| self.assertEqual(self.next_hops['R6'], 'R5') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.valid_origins['R4'].sort() |
| self.valid_origins['R5'].sort() |
| self.valid_origins['R6'].sort() |
| self.assertEqual(self.valid_origins['R2'], ['R5', 'R6']) |
| self.assertEqual(self.valid_origins['R3'], ['R5', 'R6']) |
| self.assertEqual(self.valid_origins['R4'], []) |
| self.assertEqual(self.valid_origins['R5'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R6'], ['R2', 'R3']) |
| |
| def test_topology4(self): |
| """ |
| |
| +----+ +----+ +----+ |
| | R2 |------| R3 |------| R4 | |
| +----+ +----+ +----+ |
| | | |
| +====+ +----+ +----+ |
| | R1 |------| R5 |------| R6 |------ R7 (no ls from R7) |
| +====+ +----+ +----+ |
| |
| """ |
| collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), |
| 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), |
| 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), |
| 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), |
| 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 6) |
| self.assertEqual(self.next_hops['R2'], 'R3') |
| self.assertEqual(self.next_hops['R3'], 'R3') |
| self.assertEqual(self.next_hops['R4'], 'R3') |
| self.assertEqual(self.next_hops['R5'], 'R5') |
| self.assertEqual(self.next_hops['R6'], 'R5') |
| self.assertEqual(self.next_hops['R7'], 'R5') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.valid_origins['R4'].sort() |
| self.valid_origins['R5'].sort() |
| self.valid_origins['R6'].sort() |
| self.valid_origins['R7'].sort() |
| self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R4'], []) |
| self.assertEqual(self.valid_origins['R5'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R6'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R7'], ['R2', 'R3']) |
| |
| def test_topology5(self): |
| """ |
| |
| +----+ +----+ +----+ |
| | R2 |------| R3 |------| R4 | |
| +----+ +----+ +----+ |
| | | | |
| | +====+ +----+ +----+ |
| +--------| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) |
| +====+ +----+ +----+ |
| |
| """ |
| collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), |
| 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), |
| 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), |
| 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), |
| 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 6) |
| self.assertEqual(self.next_hops['R2'], 'R2') |
| self.assertEqual(self.next_hops['R3'], 'R3') |
| self.assertEqual(self.next_hops['R4'], 'R3') |
| self.assertEqual(self.next_hops['R5'], 'R5') |
| self.assertEqual(self.next_hops['R6'], 'R5') |
| self.assertEqual(self.next_hops['R7'], 'R5') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.valid_origins['R4'].sort() |
| self.valid_origins['R5'].sort() |
| self.valid_origins['R6'].sort() |
| self.valid_origins['R7'].sort() |
| self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R4'], []) |
| self.assertEqual(self.valid_origins['R5'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R6'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R7'], ['R2', 'R3']) |
| |
| def test_topology5_with_asymmetry1(self): |
| """ |
| |
| +----+ +----+ +----+ |
| | R2 |------| R3 |------| R4 | |
| +----+ +----+ +----+ |
| ^ | | |
| ^ +====+ +----+ +----+ |
| +-<-<-<--| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) |
| +====+ +----+ +----+ |
| |
| """ |
| collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), |
| 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), |
| 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), |
| 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), |
| 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 6) |
| self.assertEqual(self.next_hops['R2'], 'R2') |
| self.assertEqual(self.next_hops['R3'], 'R3') |
| self.assertEqual(self.next_hops['R4'], 'R3') |
| self.assertEqual(self.next_hops['R5'], 'R5') |
| self.assertEqual(self.next_hops['R6'], 'R5') |
| self.assertEqual(self.next_hops['R7'], 'R5') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.valid_origins['R4'].sort() |
| self.valid_origins['R5'].sort() |
| self.valid_origins['R6'].sort() |
| self.valid_origins['R7'].sort() |
| self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R4'], []) |
| self.assertEqual(self.valid_origins['R5'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R6'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R7'], ['R2', 'R3']) |
| |
| def test_topology5_with_asymmetry2(self): |
| """ |
| |
| +----+ +----+ +----+ |
| | R2 |------| R3 |------| R4 | |
| +----+ +----+ +----+ |
| v | | |
| v +====+ +----+ +----+ |
| +->->->->| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) |
| +====+ +----+ +----+ |
| |
| """ |
| collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), |
| 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), |
| 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), |
| 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), |
| 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 6) |
| self.assertEqual(self.next_hops['R2'], 'R3') |
| self.assertEqual(self.next_hops['R3'], 'R3') |
| self.assertEqual(self.next_hops['R4'], 'R3') |
| self.assertEqual(self.next_hops['R5'], 'R5') |
| self.assertEqual(self.next_hops['R6'], 'R5') |
| self.assertEqual(self.next_hops['R7'], 'R5') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.valid_origins['R4'].sort() |
| self.valid_origins['R5'].sort() |
| self.valid_origins['R6'].sort() |
| self.valid_origins['R7'].sort() |
| self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7']) |
| self.assertEqual(self.valid_origins['R4'], []) |
| self.assertEqual(self.valid_origins['R5'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R6'], ['R2', 'R3']) |
| self.assertEqual(self.valid_origins['R7'], ['R2', 'R3']) |
| |
| def test_topology5_with_asymmetry3(self): |
| """ |
| |
| +----+ +----+ +----+ |
| | R2 |------| R3 |------| R4 | |
| +----+ +----+ +----+ |
| v | | |
| v +====+ +----+ +----+ |
| +->->->->| R1 |------| R5 |<-<-<-| R6 |------ R7 (no ls from R7) |
| +====+ +----+ +----+ |
| |
| """ |
| collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), |
| 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), |
| 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), |
| 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), |
| 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4']), |
| 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } |
| self.engine.ls_collection_changed(collection) |
| self.engine.tick(1.0) |
| self.assertEqual(len(self.next_hops), 4) |
| self.assertEqual(self.next_hops['R2'], 'R3') |
| self.assertEqual(self.next_hops['R3'], 'R3') |
| self.assertEqual(self.next_hops['R4'], 'R3') |
| self.assertEqual(self.next_hops['R5'], 'R5') |
| |
| self.valid_origins['R2'].sort() |
| self.valid_origins['R3'].sort() |
| self.valid_origins['R4'].sort() |
| self.valid_origins['R5'].sort() |
| self.assertEqual(self.valid_origins['R2'], ['R5']) |
| self.assertEqual(self.valid_origins['R3'], ['R5']) |
| self.assertEqual(self.valid_origins['R4'], []) |
| self.assertEqual(self.valid_origins['R5'], ['R2', 'R3']) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |