| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| class Sim: |
| def __init__(self): |
| self.brokers = {} |
| self.clients = {} |
| self.errors = 0 |
| self.warnings = 0 |
| |
| def error(self, text): |
| self.errors += 1 |
| print "###### Error:", text |
| |
| def warning(self, text): |
| self.warnings += 1 |
| print "###### Warning:", text |
| |
| def end(self): |
| print "========================" |
| print "Errors: %d, Warnings: %d" % (self.errors, self.warnings) |
| print "========================" |
| |
| def dumpState(self): |
| print "============================" |
| print "===== Federation State =====" |
| print "============================" |
| for broker in self.brokers: |
| for exchange in self.brokers[broker].exchanges: |
| print "Exchange %s.%s" % (broker, exchange) |
| for key in self.brokers[broker].exchanges[exchange].keys: |
| print " Key %s" % key |
| for queue in self.brokers[broker].exchanges[exchange].keys[key]: |
| print " Queue %s origins=%s" % \ |
| (queue.name, self.brokers[broker].exchanges[exchange].keys[key][queue].originList) |
| |
| def addBroker(self, name): |
| if name in self.brokers: |
| raise Exception("Broker of same name already exists") |
| broker = Broker(self, name) |
| self.brokers[name] = broker |
| return broker |
| |
| def addClient(self, name, broker): |
| if name in self.clients: |
| raise Exception("Client of same name already exists") |
| client = Client(self, name, broker) |
| self.clients[name] = client |
| return client |
| |
| def link(self, left, right, bidir=True): |
| print "====== link %s to %s, bidir=%s" % (left.tag, right.tag, bidir) |
| l1 = left.createLink(right) |
| l1.bridge("amq.direct") |
| if bidir: |
| l2 = right.createLink(left) |
| l2.bridge("amq.direct") |
| |
| def bind(self, client, key): |
| print "====== bind Client(%s): k=%s" % (client.name, key) |
| client.bind(key) |
| |
| def unbind(self, client, key): |
| print "====== unbind Client(%s): k=%s" % (client.name, key) |
| client.unbind(key) |
| |
| def sendMessage(self, key, broker, body="Message Body"): |
| print "====== sendMessage: broker=%s k=%s" % (broker.tag, key) |
| msg = Message(key, body) |
| exchange = broker.exchanges["amq.direct"] |
| for client in self.clients: |
| self.clients[client].expect(key); |
| exchange.receive(key, msg, True) |
| for client in self.clients: |
| self.clients[client].checkReception() |
| |
| |
| class Destination: |
| def receive(self, key, msg, fromUser=False): |
| pass |
| |
| |
| class Client(Destination): |
| def __init__(self, sim, name, broker): |
| self.sim = sim |
| self.name = name |
| self.broker = broker |
| self.broker.connect(self) |
| self.queue = self.broker.declare_queue(name) |
| self.subscription = self.broker.subscribe(self, name) |
| self.expected = None |
| self.boundKeys = [] |
| |
| def bind(self, key): |
| self.boundKeys.append(key) |
| self.broker.bind("amq.direct", self.name, key) |
| |
| def unbind(self, key): |
| self.boundKeys.remove(key) |
| self.broker.unbind("amq.direct", self.name, key) |
| |
| def receive(self, key, msg, fromUser=False): |
| print "Client(%s) received [%s]: %s" % (self.name, key, msg.body) |
| if self.expected == key: |
| self.expected = None |
| else: |
| self.sim.error("Client(%s) received unexpected message with key [%s]" % \ |
| (self.name, self.expected)) |
| |
| def expect(self, key): |
| if key in self.boundKeys: |
| self.expected = key |
| |
| def checkReception(self): |
| if self.expected: |
| self.sim.error("Client(%s) never received message with key [%s]" % \ |
| (self.name, self.expected)) |
| |
| class Broker(Client): |
| def __init__(self, sim, tag): |
| self.sim = sim |
| self.tag = tag |
| self.connections = {} |
| self.exchanges = {} |
| self.queues = {} |
| self.subscriptions = {} |
| self.links = {} |
| self.directExchange = self.declare_exchange("amq.direct") |
| |
| def connect(self, client): |
| if client in self.connections: |
| raise Exception("Client already connected") |
| self.connections[client] = Connection(client) |
| |
| def declare_queue(self, name, tag=None, exclude=None): |
| if name in self.queues: |
| raise Exception("Queue already exists") |
| self.queues[name] = Queue(self, name, tag, exclude) |
| |
| def subscribe(self, dest, queueName): |
| if queueName not in self.queues: |
| raise Exception("Queue does not exist") |
| self.queues[queueName].setDest(dest) |
| |
| def declare_exchange(self, name): |
| if name in self.exchanges: |
| return |
| exchange = Exchange(self, name) |
| self.exchanges[name] = exchange |
| return exchange |
| |
| def bind(self, exchangeName, queueName, key, tagList=[], fedOp=None, origin=None): |
| if exchangeName not in self.exchanges: |
| raise Exception("Exchange not found") |
| if queueName not in self.queues: |
| raise Exception("Queue not found") |
| exchange = self.exchanges[exchangeName] |
| queue = self.queues[queueName] |
| exchange.bind(queue, key, tagList, fedOp, origin) |
| |
| def unbind(self, exchangeName, queueName, key): |
| if exchangeName not in self.exchanges: |
| raise Exception("Exchange not found") |
| if queueName not in self.queues: |
| raise Exception("Queue not found") |
| exchange = self.exchanges[exchangeName] |
| queue = self.queues[queueName] |
| exchange.unbind(queue, key) |
| |
| def createLink(self, other): |
| if other in self.links: |
| raise Exception("Peer broker already linked") |
| link = Link(self, other) |
| self.links[other] = link |
| return link |
| |
| |
| class Connection: |
| def __init__(self, client): |
| self.client = client |
| |
| |
| class Exchange(Destination): |
| def __init__(self, broker, name): |
| self.broker = broker |
| self.sim = broker.sim |
| self.name = name |
| self.keys = {} |
| self.bridges = [] |
| |
| def bind(self, queue, key, tagList, fedOp, origin): |
| if not fedOp: fedOp = "bind" |
| print "Exchange(%s.%s) bind q=%s, k=%s, tags=%s, op=%s, origin=%s" % \ |
| (self.broker.tag, self.name, queue.name, key, tagList, fedOp, origin), |
| |
| if self.broker.tag in tagList: |
| print "(tag ignored)" |
| return |
| |
| if fedOp == "bind" or fedOp == "unbind": |
| if key not in self.keys: |
| self.keys[key] = {} |
| queueMap = self.keys[key] |
| |
| if fedOp == "bind": |
| ## |
| ## Add local or federation binding case |
| ## |
| if queue in queueMap: |
| if origin and origin in queueMap[queue].originList: |
| print "(dup ignored)" |
| elif origin: |
| queueMap[queue].originList.append(origin) |
| print "(origin added)" |
| else: |
| binding = Binding(origin) |
| queueMap[queue] = binding |
| print "(binding added)" |
| |
| elif fedOp == "unbind": |
| ## |
| ## Delete federation binding case |
| ## |
| if queue in queueMap: |
| binding = queueMap[queue] |
| if origin and origin in binding.originList: |
| binding.originList.remove(origin) |
| if len(binding.originList) == 0: |
| queueMap.pop(queue) |
| if len(queueMap) == 0: |
| self.keys.pop(key) |
| print "(last origin del)" |
| else: |
| print "(removed origin)" |
| else: |
| print "(origin not found)" |
| else: |
| print "(queue not found)" |
| |
| elif fedOp == "reorigin": |
| print "(ok)" |
| self.reorigin() |
| |
| elif fedOp == "hello": |
| print "(ok)" |
| |
| else: |
| raise Exception("Unknown fed-opcode '%s'" % fedOp) |
| |
| newTagList = [] |
| newTagList.append(self.broker.tag) |
| for tag in tagList: |
| newTagList.append(tag) |
| if origin: |
| propOrigin = origin |
| else: |
| propOrigin = self.broker.tag |
| |
| for bridge in self.bridges: |
| if bridge.isDynamic(): |
| bridge.propagate(key, newTagList, fedOp, propOrigin) |
| |
| def reorigin(self): |
| myTag = [] |
| myTag.append(self.broker.tag) |
| for key in self.keys: |
| queueMap = self.keys[key] |
| found = False |
| for queue in queueMap: |
| binding = queueMap[queue] |
| if binding.isLocal(): |
| found = True |
| if found: |
| for bridge in self.bridges: |
| if bridge.isDynamic(): |
| bridge.propagate(key, myTag, "bind", self.broker.tag) |
| |
| def unbind(self, queue, key): |
| print "Exchange(%s.%s) unbind q=%s, k=%s" % (self.broker.tag, self.name, queue.name, key), |
| if key not in self.keys: |
| print "(key not known)" |
| return |
| queueMap = self.keys[key] |
| if queue not in queueMap: |
| print "(queue not bound)" |
| return |
| queueMap.pop(queue) |
| if len(queueMap) == 0: |
| self.keys.pop(key) |
| print "(ok, remove bound-key)" |
| else: |
| print "(ok)" |
| |
| count = 0 |
| for queue in queueMap: |
| if len(queueMap[queue].originList) == 0: |
| count += 1 |
| |
| if count == 0: |
| myTag = [] |
| myTag.append(self.broker.tag) |
| for bridge in self.bridges: |
| if bridge.isDynamic(): |
| bridge.propagate(key, myTag, "unbind", self.broker.tag) |
| |
| def receive(self, key, msg, fromUser=False): |
| sent = False |
| if key in self.keys: |
| queueMap = self.keys[key] |
| for queue in queueMap: |
| if queue.enqueue(msg): |
| sent = True |
| if not sent and not fromUser: |
| self.sim.warning("Exchange(%s.%s) received unroutable message: k=%s" % \ |
| (self.broker.tag, self.name, key)) |
| |
| def addDynamicBridge(self, bridge): |
| if bridge in self.bridges: |
| raise Exception("Dynamic bridge already added to exchange") |
| self.bridges.append(bridge) |
| |
| for b in self.bridges: |
| if b != bridge: |
| b.sendReorigin() |
| self.reorigin() |
| |
| class Queue: |
| def __init__(self, broker, name, tag=None, exclude=None): |
| self.broker = broker |
| self.name = name |
| self.tag = tag |
| self.exclude = exclude |
| self.dest = None |
| |
| def setDest(self, dest): |
| self.dest = dest |
| |
| def enqueue(self, msg): |
| print "Queue(%s.%s) rcvd k=%s, tags=%s" % (self.broker.tag, self.name, msg.key, msg.tags), |
| if self.dest == None: |
| print "(dropped, no dest)" |
| return False |
| if self.exclude and msg.tagFound(self.exclude): |
| print "(dropped, tag)" |
| return False |
| if self.tag: |
| msg.appendTag(self.tag) |
| print "(ok)" |
| self.dest.receive(msg.key, msg) |
| return True |
| |
| |
| class Binding: |
| def __init__(self, origin): |
| self.originList = [] |
| if origin: |
| self.originList.append(origin) |
| |
| def isLocal(self): |
| return len(self.originList) == 0 |
| |
| |
| class Link: |
| def __init__(self, local, remote): |
| self.local = local |
| self.remote = remote |
| self.remote.connect(self) |
| self.bridges = [] |
| |
| def bridge(self, exchangeName): |
| bridge = Bridge(self, exchangeName) |
| |
| |
| class Bridge: |
| def __init__(self, link, exchangeName): |
| self.link = link |
| self.exchangeName = exchangeName |
| if self.exchangeName not in link.local.exchanges: |
| raise Exception("Exchange not found") |
| self.exchange = link.local.exchanges[self.exchangeName] |
| self.queueName = "bridge." + link.local.tag |
| self.link.remote.declare_queue(self.queueName, self.link.remote.tag, self.link.local.tag) |
| self.link.remote.subscribe(self.exchange, self.queueName) |
| self.exchange.addDynamicBridge(self) |
| |
| def isDynamic(self): |
| return True |
| |
| def localTag(self): |
| return self.link.local.tag |
| |
| def remoteTag(self): |
| return self.link.remote.tag |
| |
| def propagate(self, key, tagList, fedOp, origin): |
| if self.link.remote.tag not in tagList: |
| self.link.remote.bind(self.exchangeName, self.queueName, key, tagList, fedOp, origin) |
| |
| def sendReorigin(self): |
| myTag = [] |
| myTag.append(self.link.local.tag) |
| self.link.remote.bind(self.exchangeName, self.queueName, "", myTag, "reorigin", "") |
| |
| |
| class Message: |
| def __init__(self, key, body): |
| self.key = key |
| self.body = body |
| self.tags = [] |
| |
| def appendTag(self, tag): |
| if tag not in self.tags: |
| self.tags.append(tag) |
| |
| def tagFound(self, tag): |
| return tag in self.tags |
| |
| |