| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from optparse import OptionParser, OptionGroup, IndentedHelpFormatter |
| import sys |
| import socket |
| import os |
| import locale |
| from qmf.console import Session, BrokerURL |
| |
| usage = """ |
| Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism] |
| qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange> |
| |
| qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism] |
| qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key> |
| qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism] |
| qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue> |
| qpid-route [OPTIONS] route list [<dest-broker>] |
| qpid-route [OPTIONS] route flush [<dest-broker>] |
| qpid-route [OPTIONS] route map [<broker>] |
| |
| qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism] |
| qpid-route [OPTIONS] link del <dest-broker> <src-broker> |
| qpid-route [OPTIONS] link list [<dest-broker>]""" |
| |
| description = """ |
| ADDRESS syntax: |
| |
| [username/password@] hostname |
| ip-address [:<port>]""" |
| |
| def Usage(): |
| print usage |
| |
| class Config: |
| def __init__(self): |
| self._verbose = False |
| self._quiet = False |
| self._durable = False |
| self._dellink = False |
| self._srclocal = False |
| self._transport = "tcp" |
| self._ack = 0 |
| self._connTimeout = 10 |
| self._conn_options = {} |
| |
| config = Config() |
| |
| class JHelpFormatter(IndentedHelpFormatter): |
| """Format usage and description without stripping newlines from usage strings |
| """ |
| |
| def format_usage(self, usage): |
| return usage |
| |
| |
| def format_description(self, description): |
| if description: |
| return description + "\n" |
| else: |
| return "" |
| |
| def OptionsAndArguments(argv): |
| parser = OptionParser(usage=usage, |
| description=description, |
| formatter=JHelpFormatter()) |
| |
| parser.add_option("--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") |
| parser.add_option("-v", "--verbose", action="store_true", help="Verbose output") |
| parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings") |
| parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable") |
| |
| parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link") |
| parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)") |
| |
| parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N") |
| parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") |
| |
| parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") |
| parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") |
| parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") |
| parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") |
| opts, encArgs = parser.parse_args(args=argv) |
| |
| try: |
| encoding = locale.getpreferredencoding() |
| args = [a.decode(encoding) for a in encArgs] |
| except: |
| args = encArgs |
| |
| if opts.timeout: |
| config._connTimeout = opts.timeout |
| if config._connTimeout == 0: |
| config._connTimeout = None |
| |
| if opts.verbose: |
| config._verbose = True |
| |
| if opts.quiet: |
| config._quiet = True |
| |
| if opts.durable: |
| config._durable = True |
| |
| if opts.del_empty_link: |
| config._dellink = True |
| |
| if opts.src_local: |
| config._srclocal = True |
| |
| if opts.transport: |
| config._transport = opts.transport |
| |
| if opts.ha_admin: |
| config._conn_options['client_properties'] = {'qpid.ha-admin' : 1} |
| |
| if opts.ack: |
| config._ack = opts.ack |
| |
| if opts.client_sasl_mechanism: |
| config._conn_options['mechanisms'] = opts.client_sasl_mechanism |
| |
| if opts.ssl_certificate: |
| config._conn_options['ssl_certfile'] = opts.ssl_certificate |
| |
| if opts.ssl_key: |
| if not opts.ssl_certificate: |
| parser.error("missing '--ssl-certificate' (required by '--ssl-key')") |
| config._conn_options['ssl_keyfile'] = opts.ssl_key |
| |
| return args |
| |
| |
| class RouteManager: |
| def __init__(self, localBroker): |
| self.brokerList = {} |
| self.local = BrokerURL(localBroker) |
| self.remote = None |
| self.qmf = Session() |
| self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options) |
| self.broker._waitForStable() |
| self.agent = self.broker.getBrokerAgent() |
| |
| def disconnect(self): |
| try: |
| if self.broker: |
| self.qmf.delBroker(self.broker) |
| self.broker = None |
| while len(self.brokerList): |
| b = self.brokerList.popitem() |
| if b[0] != self.local.name(): |
| self.qmf.delBroker(b[1]) |
| except: |
| pass # ignore errors while shutting down |
| |
| def getLink(self): |
| links = self.agent.getObjects(_class="link") |
| for link in links: |
| if self.remote.match(link.host, link.port): |
| return link |
| return None |
| |
| def addLink(self, remoteBroker, interbroker_mechanism=""): |
| self.remote = BrokerURL(remoteBroker) |
| if self.local.match(self.remote.host, self.remote.port): |
| raise Exception("Linking broker to itself is not permitted") |
| |
| brokers = self.agent.getObjects(_class="broker") |
| broker = brokers[0] |
| link = self.getLink() |
| if link == None: |
| res = broker.connect(self.remote.host, self.remote.port, config._durable, |
| interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "", |
| config._transport) |
| if config._verbose: |
| print "Connect method returned:", res.status, res.text |
| |
| def delLink(self, remoteBroker): |
| self.remote = BrokerURL(remoteBroker) |
| brokers = self.agent.getObjects(_class="broker") |
| broker = brokers[0] |
| link = self.getLink() |
| if link == None: |
| raise Exception("Link not found") |
| |
| res = link.close() |
| if config._verbose: |
| print "Close method returned:", res.status, res.text |
| |
| def listLinks(self): |
| links = self.agent.getObjects(_class="link") |
| if len(links) == 0: |
| print "No Links Found" |
| else: |
| print |
| print "Host Port Transport Durable State Last Error" |
| print "=============================================================================" |
| for link in links: |
| print "%-16s%-8d%-13s%c %-18s%s" % \ |
| (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) |
| |
| def mapRoutes(self): |
| print |
| print "Finding Linked Brokers:" |
| |
| self.brokerList[self.local.name()] = self.broker |
| print " %s... Ok" % self.local |
| |
| added = True |
| while added: |
| added = False |
| links = self.qmf.getObjects(_class="link") |
| for link in links: |
| url = BrokerURL(host=link.host, port=link.port) |
| if url.name() not in self.brokerList: |
| print " %s..." % url.name(), |
| try: |
| b = self.qmf.addBroker(url, config._connTimeout) |
| self.brokerList[url.name()] = b |
| added = True |
| print "Ok" |
| except Exception, e: |
| print e |
| |
| print |
| print "Dynamic Routes:" |
| bridges = self.qmf.getObjects(_class="bridge", dynamic=True) |
| fedExchanges = [] |
| for bridge in bridges: |
| if bridge.src not in fedExchanges: |
| fedExchanges.append(bridge.src) |
| if len(fedExchanges) == 0: |
| print " none found" |
| print |
| |
| for ex in fedExchanges: |
| print " Exchange %s:" % ex |
| pairs = [] |
| for bridge in bridges: |
| if bridge.src == ex: |
| link = bridge._linkRef_ |
| fromUrl = BrokerURL(host=link.host, port=link.port) |
| toUrl = bridge.getBroker().getUrl() |
| found = False |
| for pair in pairs: |
| if pair.matches(fromUrl, toUrl): |
| found = True |
| if not found: |
| pairs.append(RoutePair(fromUrl, toUrl)) |
| for pair in pairs: |
| print " %s" % pair |
| print |
| |
| print "Static Routes:" |
| bridges = self.qmf.getObjects(_class="bridge", dynamic=False) |
| if len(bridges) == 0: |
| print " none found" |
| print |
| |
| for bridge in bridges: |
| link = bridge._linkRef_ |
| fromUrl = "%s:%s" % (link.host, link.port) |
| toUrl = bridge.getBroker().getUrl() |
| leftType = "ex" |
| rightType = "ex" |
| if bridge.srcIsLocal: |
| arrow = "=>" |
| left = bridge.src |
| right = bridge.dest |
| if bridge.srcIsQueue: |
| leftType = "queue" |
| else: |
| arrow = "<=" |
| left = bridge.dest |
| right = bridge.src |
| if bridge.srcIsQueue: |
| rightType = "queue" |
| |
| if bridge.srcIsQueue: |
| print " %s(%s=%s) %s %s(%s=%s)" % \ |
| (toUrl, leftType, left, arrow, fromUrl, rightType, right) |
| else: |
| print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ |
| (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) |
| print |
| |
| while len(self.brokerList): |
| b = self.brokerList.popitem() |
| if b[0] != self.local.name(): |
| self.qmf.delBroker(b[1]) |
| |
| def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False): |
| if dynamic and config._srclocal: |
| raise Exception("--src-local is not permitted on dynamic routes") |
| |
| self.addLink(remoteBroker, interbroker_mechanism) |
| link = self.getLink() |
| if link == None: |
| raise Exception("Link failed to create") |
| |
| bridges = self.agent.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and \ |
| bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: |
| if not config._quiet: |
| raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) |
| sys.exit(0) |
| |
| if config._verbose: |
| print "Creating inter-broker binding..." |
| res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack) |
| if res.status != 0: |
| raise Exception(res.text) |
| if config._verbose: |
| print "Bridge method returned:", res.status, res.text |
| |
| def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ): |
| self.addLink(remoteBroker, interbroker_mechanism) |
| link = self.getLink() |
| if link == None: |
| raise Exception("Link failed to create") |
| |
| bridges = self.agent.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and \ |
| bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: |
| if not config._quiet: |
| raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) |
| sys.exit(0) |
| |
| if config._verbose: |
| print "Creating inter-broker binding..." |
| res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack) |
| if res.status != 0: |
| raise Exception(res.text) |
| if config._verbose: |
| print "Bridge method returned:", res.status, res.text |
| |
| def delQueueRoute(self, remoteBroker, exchange, queue): |
| self.remote = BrokerURL(remoteBroker) |
| link = self.getLink() |
| if link == None: |
| if not config._quiet: |
| raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) |
| sys.exit(0) |
| |
| bridges = self.agent.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and \ |
| bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: |
| if config._verbose: |
| print "Closing bridge..." |
| res = bridge.close() |
| if res.status != 0: |
| raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) |
| if len(bridges) == 1 and config._dellink: |
| link = self.getLink() |
| if link == None: |
| sys.exit(0) |
| if config._verbose: |
| print "Last bridge on link, closing link..." |
| res = link.close() |
| if res.status != 0: |
| raise Exception("Error closing link: %d - %s" % (res.status, res.text)) |
| sys.exit(0) |
| if not config._quiet: |
| raise Exception("Route not found") |
| |
| def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): |
| self.remote = BrokerURL(remoteBroker) |
| link = self.getLink() |
| if link == None: |
| if not config._quiet: |
| raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) |
| sys.exit(0) |
| |
| bridges = self.agent.getObjects(_class="bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ |
| and bridge.dynamic == dynamic: |
| if config._verbose: |
| print "Closing bridge..." |
| res = bridge.close() |
| if res.status != 0: |
| raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) |
| if len(bridges) == 1 and config._dellink: |
| link = self.getLink() |
| if link == None: |
| sys.exit(0) |
| if config._verbose: |
| print "Last bridge on link, closing link..." |
| res = link.close() |
| if res.status != 0: |
| raise Exception("Error closing link: %d - %s" % (res.status, res.text)) |
| return |
| if not config._quiet: |
| raise Exception("Route not found") |
| |
| def listRoutes(self): |
| links = self.qmf.getObjects(_class="link") |
| bridges = self.qmf.getObjects(_class="bridge") |
| |
| for bridge in bridges: |
| myLink = None |
| for link in links: |
| if bridge.linkRef == link.getObjectId(): |
| myLink = link |
| break |
| if myLink != None: |
| if bridge.dynamic: |
| keyText = "<dynamic>" |
| else: |
| keyText = bridge.key |
| print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) |
| |
| def clearAllRoutes(self): |
| links = self.qmf.getObjects(_class="link") |
| bridges = self.qmf.getObjects(_class="bridge") |
| |
| for bridge in bridges: |
| if config._verbose: |
| myLink = None |
| for link in links: |
| if bridge.linkRef == link.getObjectId(): |
| myLink = link |
| break |
| if myLink != None: |
| print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), |
| res = bridge.close() |
| if res.status != 0: |
| print "Error: %d - %s" % (res.status, res.text) |
| elif config._verbose: |
| print "Ok" |
| |
| if config._dellink: |
| links = self.qmf.getObjects(_class="link") |
| for link in links: |
| if config._verbose: |
| print "Deleting Link: %s:%d... " % (link.host, link.port), |
| res = link.close() |
| if res.status != 0: |
| print "Error: %d - %s" % (res.status, res.text) |
| elif config._verbose: |
| print "Ok" |
| |
| class RoutePair: |
| def __init__(self, fromUrl, toUrl): |
| self.fromUrl = fromUrl |
| self.toUrl = toUrl |
| self.bidir = False |
| |
| def __repr__(self): |
| if self.bidir: |
| delimit = "<=>" |
| else: |
| delimit = " =>" |
| return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) |
| |
| def matches(self, fromUrl, toUrl): |
| if fromUrl == self.fromUrl and toUrl == self.toUrl: |
| return True |
| if toUrl == self.fromUrl and fromUrl == self.toUrl: |
| self.bidir = True |
| return True |
| return False |
| |
| |
| def YN(val): |
| if val == 1: |
| return 'Y' |
| return 'N' |
| |
| |
| def main(argv=None): |
| |
| args = OptionsAndArguments(argv) |
| nargs = len(args) |
| if nargs < 2: |
| Usage() |
| return(-1) |
| |
| if nargs == 2: |
| localBroker = socket.gethostname() |
| else: |
| if config._srclocal: |
| localBroker = args[3] |
| remoteBroker = args[2] |
| else: |
| localBroker = args[2] |
| if nargs > 3: |
| remoteBroker = args[3] |
| |
| group = args[0] |
| cmd = args[1] |
| |
| rm = None |
| try: |
| rm = RouteManager(localBroker) |
| if group == "link": |
| if cmd == "add": |
| if nargs < 3 or nargs > 5: |
| Usage() |
| return(-1) |
| interbroker_mechanism = "" |
| if nargs > 4: interbroker_mechanism = args[4] |
| rm.addLink(remoteBroker, interbroker_mechanism) |
| elif cmd == "del": |
| if nargs != 4: |
| Usage() |
| return(-1) |
| rm.delLink(remoteBroker) |
| elif cmd == "list": |
| rm.listLinks() |
| |
| elif group == "dynamic": |
| if cmd == "add": |
| if nargs < 5 or nargs > 8: |
| Usage() |
| return(-1) |
| |
| tag = "" |
| excludes = "" |
| interbroker_mechanism = "" |
| if nargs > 5: tag = args[5] |
| if nargs > 6: excludes = args[6] |
| if nargs > 7: interbroker_mechanism = args[7] |
| rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True) |
| elif cmd == "del": |
| if nargs != 5: |
| Usage() |
| return(-1) |
| else: |
| rm.delRoute(remoteBroker, args[4], "", dynamic=True) |
| |
| elif group == "route": |
| if cmd == "add": |
| if nargs < 6 or nargs > 9: |
| Usage() |
| return(-1) |
| |
| tag = "" |
| excludes = "" |
| interbroker_mechanism = "" |
| if nargs > 6: tag = args[6] |
| if nargs > 7: excludes = args[7] |
| if nargs > 8: interbroker_mechanism = args[8] |
| rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False) |
| elif cmd == "del": |
| if nargs != 6: |
| Usage() |
| return(-1) |
| rm.delRoute(remoteBroker, args[4], args[5], dynamic=False) |
| elif cmd == "map": |
| rm.mapRoutes() |
| else: |
| if cmd == "list": |
| rm.listRoutes() |
| elif cmd == "flush": |
| rm.clearAllRoutes() |
| else: |
| Usage() |
| return(-1) |
| |
| elif group == "queue": |
| if nargs < 6 or nargs > 7: |
| Usage() |
| return(-1) |
| if cmd == "add": |
| interbroker_mechanism = "" |
| if nargs > 6: interbroker_mechanism = args[6] |
| rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] ) |
| elif cmd == "del": |
| rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) |
| else: |
| Usage() |
| return(-1) |
| else: |
| Usage() |
| return(-1) |
| |
| except Exception,e: |
| if rm: |
| rm.disconnect() # try to release broker resources |
| print "Failed: %s - %s" % (e.__class__.__name__, e) |
| return 1 |
| |
| rm.disconnect() |
| return 0 |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |