| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import getopt |
| import sys |
| import socket |
| import os |
| import locale |
| from qmf.console import Session, BrokerURL |
| |
| def Usage(): |
| print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]" |
| print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>" |
| print |
| print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]" |
| print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" |
| print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>" |
| print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>" |
| print " qpid-route [OPTIONS] route list [<dest-broker>]" |
| print " qpid-route [OPTIONS] route flush [<dest-broker>]" |
| print " qpid-route [OPTIONS] route map [<broker>]" |
| print |
| print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>" |
| print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" |
| print " qpid-route [OPTIONS] link list [<dest-broker>]" |
| print |
| print "Options:" |
| print " -v [ --verbose ] Verbose output" |
| print " -q [ --quiet ] Quiet output, don't print duplicate warnings" |
| print " -d [ --durable ] Added configuration shall be durable" |
| print " -e [ --del-empty-link ] Delete link after deleting last route on the link" |
| print " -s [ --src-local ] Make connection to source broker (push route)" |
| print " -t <transport> [ --transport <transport>]" |
| print " Specify transport to use for links, defaults to tcp" |
| print |
| print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]" |
| print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" |
| print |
| sys.exit(1) |
| |
| _verbose = False |
| _quiet = False |
| _durable = False |
| _dellink = False |
| _srclocal = False |
| _transport = "tcp" |
| |
| class RouteManager: |
| def __init__(self, localBroker): |
| self.local = BrokerURL(localBroker) |
| self.remote = None |
| self.qmf = Session() |
| self.broker = self.qmf.addBroker(localBroker) |
| |
| def disconnect(self): |
| self.qmf.delBroker(self.broker) |
| |
| def getLink(self): |
| links = self.qmf.getObjects(_class="link") |
| for link in links: |
| if self.remote.match(link.host, link.port): |
| return link |
| return None |
| |
| def addLink(self, remoteBroker): |
| self.remote = BrokerURL(remoteBroker) |
| if self.local.match(self.remote.host, self.remote.port): |
| raise Exception("Linking broker to itself is not permitted") |
| |
| brokers = self.qmf.getObjects(_class="broker") |
| broker = brokers[0] |
| link = self.getLink() |
| if link == None: |
| if self.remote.authName == "anonymous": |
| mech = "ANONYMOUS" |
| else: |
| mech = "PLAIN" |
| res = broker.connect(self.remote.host, self.remote.port, _durable, |
| mech, self.remote.authName, self.remote.authPass, |
| _transport) |
| if _verbose: |
| print "Connect method returned:", res.status, res.text |
| |
| def delLink(self, remoteBroker): |
| self.remote = BrokerURL(remoteBroker) |
| brokers = self.qmf.getObjects(_class="broker") |
| broker = brokers[0] |
| link = self.getLink() |
| if link == None: |
| raise Exception("Link not found") |
| |
| res = link.close() |
| if _verbose: |
| print "Close method returned:", res.status, res.text |
| |
| def listLinks(self): |
| links = self.qmf.getObjects(_class="link") |
| if len(links) == 0: |
| print "No Links Found" |
| else: |
| print |
| print "Host Port Transport Durable State Last Error" |
| print "=============================================================================" |
| for link in links: |
| print "%-16s%-8d%-13s%c %-18s%s" % \ |
| (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) |
| |
| def mapRoutes(self): |
| qmf = self.qmf |
| print |
| print "Finding Linked Brokers:" |
| |
| brokerList = {} |
| brokerList[self.local.name()] = self.broker |
| print " %s... Ok" % self.local |
| |
| added = True |
| while added: |
| added = False |
| links = qmf.getObjects(_class="link") |
| for link in links: |
| url = BrokerURL("%s:%d" % (link.host, link.port)) |
| if url.name() not in brokerList: |
| print " %s..." % url.name(), |
| try: |
| b = qmf.addBroker("%s:%d" % (link.host, link.port)) |
| brokerList[url.name()] = b |
| added = True |
| print "Ok" |
| except Exception, e: |
| print e |
| |
| print |
| print "Dynamic Routes:" |
| bridges = qmf.getObjects(_class="bridge", dynamic=True) |
| fedExchanges = [] |
| for bridge in bridges: |
| if bridge.src not in fedExchanges: |
| fedExchanges.append(bridge.src) |
| if len(fedExchanges) == 0: |
| print " none found" |
| print |
| |
| for ex in fedExchanges: |
| print " Exchange %s:" % ex |
| pairs = [] |
| for bridge in bridges: |
| if bridge.src == ex: |
| link = bridge._linkRef_ |
| fromUrl = "%s:%s" % (link.host, link.port) |
| toUrl = bridge.getBroker().getUrl() |
| found = False |
| for pair in pairs: |
| if pair.matches(fromUrl, toUrl): |
| found = True |
| if not found: |
| pairs.append(RoutePair(fromUrl, toUrl)) |
| for pair in pairs: |
| print " %s" % pair |
| print |
| |
| print "Static Routes:" |
| bridges = qmf.getObjects(_class="bridge", dynamic=False) |
| if len(bridges) == 0: |
| print " none found" |
| print |
| |
| for bridge in bridges: |
| link = bridge._linkRef_ |
| fromUrl = "%s:%s" % (link.host, link.port) |
| toUrl = bridge.getBroker().getUrl() |
| leftType = "ex" |
| rightType = "ex" |
| if bridge.srcIsLocal: |
| arrow = "=>" |
| left = bridge.src |
| right = bridge.dest |
| if bridge.srcIsQueue: |
| leftType = "queue" |
| else: |
| arrow = "<=" |
| left = bridge.dest |
| right = bridge.src |
| if bridge.srcIsQueue: |
| rightType = "queue" |
| |
| if bridge.srcIsQueue: |
| print " %s(%s=%s) %s %s(%s=%s)" % \ |
| (toUrl, leftType, left, arrow, fromUrl, rightType, right) |
| else: |
| print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ |
| (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) |
| print |
| |
| for broker in brokerList: |
| if broker != self.local.name(): |
| qmf.delBroker(brokerList[broker]) |
| |
| |
| def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): |
| if dynamic and _srclocal: |
| raise Exception("--src-local is not permitted on dynamic routes") |
| |
| self.addLink(remoteBroker) |
| link = self.getLink() |
| if link == None: |
| raise Exception("Link failed to create") |
| |
| bridges = self.qmf.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and \ |
| bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: |
| if not _quiet: |
| raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) |
| sys.exit(0) |
| |
| if _verbose: |
| print "Creating inter-broker binding..." |
| res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic) |
| if res.status != 0: |
| raise Exception(res.text) |
| if _verbose: |
| print "Bridge method returned:", res.status, res.text |
| |
| def addQueueRoute(self, remoteBroker, exchange, queue): |
| self.addLink(remoteBroker) |
| link = self.getLink() |
| if link == None: |
| raise Exception("Link failed to create") |
| |
| bridges = self.qmf.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and \ |
| bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: |
| if not _quiet: |
| raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) |
| sys.exit(0) |
| |
| if _verbose: |
| print "Creating inter-broker binding..." |
| res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False) |
| if res.status != 0: |
| raise Exception(res.text) |
| if _verbose: |
| print "Bridge method returned:", res.status, res.text |
| |
| def delQueueRoute(self, remoteBroker, exchange, queue): |
| self.remote = BrokerURL(remoteBroker) |
| link = self.getLink() |
| if link == None: |
| if not _quiet: |
| raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) |
| sys.exit(0) |
| |
| bridges = self.qmf.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and \ |
| bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: |
| if _verbose: |
| print "Closing bridge..." |
| res = bridge.close() |
| if res.status != 0: |
| raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) |
| if len(bridges) == 1 and _dellink: |
| link = self.getLink() |
| if link == None: |
| sys.exit(0) |
| if _verbose: |
| print "Last bridge on link, closing link..." |
| res = link.close() |
| if res.status != 0: |
| raise Exception("Error closing link: %d - %s" % (res.status, res.text)) |
| sys.exit(0) |
| if not _quiet: |
| raise Exception("Route not found") |
| |
| def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): |
| self.remote = BrokerURL(remoteBroker) |
| link = self.getLink() |
| if link == None: |
| if not _quiet: |
| raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) |
| sys.exit(0) |
| |
| bridges = self.qmf.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ |
| and bridge.dynamic == dynamic: |
| if _verbose: |
| print "Closing bridge..." |
| res = bridge.close() |
| if res.status != 0: |
| raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) |
| if len(bridges) == 1 and _dellink: |
| link = self.getLink() |
| if link == None: |
| sys.exit(0) |
| if _verbose: |
| print "Last bridge on link, closing link..." |
| res = link.close() |
| if res.status != 0: |
| raise Exception("Error closing link: %d - %s" % (res.status, res.text)) |
| sys.exit(0) |
| if not _quiet: |
| raise Exception("Route not found") |
| |
| def listRoutes(self): |
| links = self.qmf.getObjects(_class="link") |
| bridges = self.qmf.getObjects(_class="bridge") |
| |
| for bridge in bridges: |
| myLink = None |
| for link in links: |
| if bridge.linkRef == link.getObjectId(): |
| myLink = link |
| break |
| if myLink != None: |
| if bridge.dynamic: |
| keyText = "<dynamic>" |
| else: |
| keyText = bridge.key |
| print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) |
| |
| def clearAllRoutes(self): |
| links = self.qmf.getObjects(_class="link") |
| bridges = self.qmf.getObjects(_class="bridge") |
| |
| for bridge in bridges: |
| if _verbose: |
| myLink = None |
| for link in links: |
| if bridge.linkRef == link.getObjectId(): |
| myLink = link |
| break |
| if myLink != None: |
| print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), |
| res = bridge.close() |
| if res.status != 0: |
| print "Error: %d - %s" % (res.status, res.text) |
| elif _verbose: |
| print "Ok" |
| |
| if _dellink: |
| links = self.qmf.getObjects(_class="link") |
| for link in links: |
| if _verbose: |
| print "Deleting Link: %s:%d... " % (link.host, link.port), |
| res = link.close() |
| if res.status != 0: |
| print "Error: %d - %s" % (res.status, res.text) |
| elif _verbose: |
| print "Ok" |
| |
| class RoutePair: |
| def __init__(self, fromUrl, toUrl): |
| self.fromUrl = fromUrl |
| self.toUrl = toUrl |
| self.bidir = False |
| |
| def __repr__(self): |
| if self.bidir: |
| delimit = "<=>" |
| else: |
| delimit = " =>" |
| return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) |
| |
| def matches(self, fromUrl, toUrl): |
| if fromUrl == self.fromUrl and toUrl == self.toUrl: |
| return True |
| if toUrl == self.fromUrl and fromUrl == self.toUrl: |
| self.bidir = True |
| return True |
| return False |
| |
| |
| def YN(val): |
| if val == 1: |
| return 'Y' |
| return 'N' |
| |
| ## |
| ## Main Program |
| ## |
| |
| try: |
| longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=") |
| (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts) |
| except: |
| Usage() |
| |
| try: |
| encoding = locale.getpreferredencoding() |
| cargs = [a.decode(encoding) for a in encArgs] |
| except: |
| cargs = encArgs |
| |
| for opt in optlist: |
| if opt[0] == "-v" or opt[0] == "--verbose": |
| _verbose = True |
| if opt[0] == "-q" or opt[0] == "--quiet": |
| _quiet = True |
| if opt[0] == "-d" or opt[0] == "--durable": |
| _durable = True |
| if opt[0] == "-e" or opt[0] == "--del-empty-link": |
| _dellink = True |
| if opt[0] == "-s" or opt[0] == "--src-local": |
| _srclocal = True |
| if opt[0] == "-t" or opt[0] == "--transport": |
| _transport = opt[1] |
| |
| nargs = len(cargs) |
| if nargs < 2: |
| Usage() |
| if nargs == 2: |
| localBroker = "localhost" |
| else: |
| if _srclocal: |
| localBroker = cargs[3] |
| remoteBroker = cargs[2] |
| else: |
| localBroker = cargs[2] |
| if nargs > 3: |
| remoteBroker = cargs[3] |
| |
| group = cargs[0] |
| cmd = cargs[1] |
| |
| try: |
| rm = RouteManager(localBroker) |
| if group == "link": |
| if cmd == "add": |
| if nargs != 4: |
| Usage() |
| rm.addLink(remoteBroker) |
| elif cmd == "del": |
| if nargs != 4: |
| Usage() |
| rm.delLink(remoteBroker) |
| elif cmd == "list": |
| rm.listLinks() |
| |
| elif group == "dynamic": |
| if cmd == "add": |
| if nargs < 5 or nargs > 7: |
| Usage() |
| |
| tag = "" |
| excludes = "" |
| if nargs > 5: tag = cargs[5] |
| if nargs > 6: excludes = cargs[6] |
| rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True) |
| elif cmd == "del": |
| if nargs != 5: |
| Usage() |
| else: |
| rm.delRoute(remoteBroker, cargs[4], "", dynamic=True) |
| |
| elif group == "route": |
| if cmd == "add": |
| if nargs < 6 or nargs > 8: |
| Usage() |
| |
| tag = "" |
| excludes = "" |
| if nargs > 6: tag = cargs[6] |
| if nargs > 7: excludes = cargs[7] |
| rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False) |
| elif cmd == "del": |
| if nargs != 6: |
| Usage() |
| rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False) |
| elif cmd == "map": |
| rm.mapRoutes() |
| else: |
| if cmd == "list": |
| rm.listRoutes() |
| elif cmd == "flush": |
| rm.clearAllRoutes() |
| else: |
| Usage() |
| |
| elif group == "queue": |
| if nargs != 6: |
| Usage() |
| if cmd == "add": |
| rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) |
| elif cmd == "del": |
| rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) |
| else: |
| Usage() |
| |
| except Exception,e: |
| print "Failed:", e.args |
| sys.exit(1) |
| |
| rm.disconnect() |