blob: e6ee48e1546c7e538020562f44792f3ffcac7920 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from .data import MessageMAR, MessageMAU
from ..dispatch import LOG_TRACE
MAX_KEPT_DELTAS = 10
class MobileAddressEngine(object):
"""
This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
Note that this routing table maps from the mobile address to the remote router where that address
is directly bound.
"""
def __init__(self, container, node_tracker):
self.container = container
self.node_tracker = node_tracker
self.id = self.container.id
self.mobile_seq = 0
self.local_addrs = set([])
self.added_addrs = set([])
self.deleted_addrs = set([])
self.sent_deltas = {}
self.treatments = {}
def tick(self, now):
##
## If local addrs have changed, collect the changes and send a MAU with the diffs
## Note: it is important that the differential-MAU be sent before a RA is sent
##
if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
self.mobile_seq += 1
hints = [self.treatments[a] for a in self.added_addrs]
msg = MessageMAU(None, self.id, self.mobile_seq, list(self.added_addrs), list(self.deleted_addrs), _hints=hints)
self.sent_deltas[self.mobile_seq] = msg
if len(self.sent_deltas) > MAX_KEPT_DELTAS:
self.sent_deltas.pop(self.mobile_seq - MAX_KEPT_DELTAS)
self.container.send('amqp:/_topo/0/all/qdrouter.ma', msg)
self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
self.local_addrs.update(self.added_addrs)
self.local_addrs.difference_update(self.deleted_addrs)
self.added_addrs.clear()
self.deleted_addrs.clear()
return self.mobile_seq
def add_local_address(self, addr, treatment):
"""
"""
self.treatments[addr] = treatment
if addr not in self.local_addrs:
if addr not in self.added_addrs:
self.added_addrs.add(addr)
else:
if addr in self.deleted_addrs:
self.deleted_addrs.remove(addr)
def del_local_address(self, addr):
"""
"""
del self.treatments[addr]
if addr in self.local_addrs:
if addr not in self.deleted_addrs:
self.deleted_addrs.add(addr)
else:
if addr in self.added_addrs:
self.added_addrs.remove(addr)
def handle_mau(self, msg, now):
##
## If the MAU is differential, we can only use it if its sequence is exactly one greater
## than our stored sequence. If not, we will ignore the content and schedule a MAR.
##
## If the MAU is absolute, we can use it in all cases.
##
if msg.id == self.id:
return
node = self.node_tracker.router_node(msg.id)
if msg.exist_list != None:
##
## Absolute MAU
##
if msg.mobile_seq == node.mobile_address_sequence:
return
node.mobile_address_sequence = msg.mobile_seq
node.overwrite_addresses(msg.exist_list)
else:
##
## Differential MAU
##
if node.mobile_address_sequence + 1 == msg.mobile_seq:
##
## This message represents the next expected sequence, incorporate the deltas
##
node.mobile_address_sequence += 1
treatments = msg.hints or []
for a in msg.add_list:
if len(treatments):
treatment = treatments.pop(0)
else:
treatment = -1
node.map_address(a, treatment)
for a in msg.del_list:
node.unmap_address(a)
elif node.mobile_address_sequence == msg.mobile_seq:
##
## Ignore duplicates
##
return
else:
##
## This is an out-of-sequence delta. Don't use it. Schedule a MAR to
## get back on track.
##
node.mobile_address_request()
def handle_mar(self, msg, now):
if msg.id == self.id:
return
if msg.have_seq == self.mobile_seq:
return
if self.mobile_seq - (msg.have_seq + 1) < len(self.sent_deltas):
##
## We can catch the peer up with a series of stored differential updates
##
for s in range(msg.have_seq + 1, self.mobile_seq + 1):
self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, self.sent_deltas[s])
self.container.log_ma(LOG_TRACE, "SENT: %r" % self.sent_deltas[s])
return
##
## The peer needs to be sent an absolute update with the whole address list
##
smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, list(self.local_addrs))
self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, smsg)
self.container.log_ma(LOG_TRACE, "SENT: %r" % smsg)
def send_mar(self, node_id, seq):
msg = MessageMAR(None, self.id, seq)
self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % node_id, msg)
self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)