| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import getopt |
| import sys |
| import socket |
| import qpid |
| import os |
| from qpid.management import managementClient |
| from qpid.managementdata import Broker |
| from qpid.peer import Closed |
| from qpid.connection import Connection, ConnectionFailed |
| from qpid.util import connect |
| |
| def Usage (): |
| print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>" |
| print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" |
| print " qpid-route [OPTIONS] link list [<dest-broker>]" |
| print |
| print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [id] [exclude-list]" |
| print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" |
| print " qpid-route [OPTIONS] route list [<dest-broker>]" |
| print " qpid-route [OPTIONS] route flush [<dest-broker>]" |
| print |
| print "Options:" |
| print " -v [ --verbose ] Verbose output" |
| print " -q [ --quiet ] Quiet output, don't print duplicate warnings" |
| print " -d [ --durable ] Added configuration shall be durable" |
| print " -e [ --del-empty-link ] Delete link after deleting last route on the link" |
| print |
| print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]" |
| print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" |
| print |
| sys.exit (1) |
| |
| _verbose = False |
| _quiet = False |
| _durable = False |
| _dellink = False |
| |
| class RouteManager: |
| def __init__ (self, destBroker): |
| self.dest = Broker (destBroker) |
| self.src = None |
| |
| def ConnectToBroker (self): |
| broker = self.dest |
| if _verbose: |
| print "Connecting to broker: %s:%d" % (broker.host, broker.port) |
| try: |
| self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) |
| self.conn = Connection (connect (broker.host, broker.port), \ |
| username=broker.username, password=broker.password) |
| self.conn.start () |
| self.session = self.conn.session(self.sessionId) |
| self.mclient = managementClient (self.conn.spec) |
| self.mch = self.mclient.addChannel (self.session) |
| self.mclient.syncWaitForStable (self.mch) |
| except socket.error, e: |
| print "Socket Error %s - %s" % (e[0], e[1]) |
| sys.exit (1) |
| except Closed, e: |
| print "Connect Failed %d - %s" % (e[0], e[1]) |
| sys.exit (1) |
| except ConnectionFailed, e: |
| print "Connect Failed %d - %s" % (e[0], e[1]) |
| sys.exit(1) |
| |
| def Disconnect (self): |
| self.mclient.removeChannel (self.mch) |
| self.session.close(timeout=10) |
| self.conn.close(timeout=10) |
| |
| def getLink (self): |
| links = self.mclient.syncGetObjects (self.mch, "link") |
| for link in links: |
| if "%s:%d" % (link.host, link.port) == self.src.name (): |
| return link |
| return None |
| |
| def AddLink (self, srcBroker): |
| self.src = Broker (srcBroker) |
| mc = self.mclient |
| |
| if self.dest.name() == self.src.name(): |
| print "Linking broker to itself is not permitted" |
| sys.exit(1) |
| |
| brokers = mc.syncGetObjects (self.mch, "broker") |
| broker = brokers[0] |
| link = self.getLink() |
| if link != None: |
| print "Link already exists" |
| sys.exit(1) |
| |
| connectArgs = {} |
| connectArgs["host"] = self.src.host |
| connectArgs["port"] = self.src.port |
| connectArgs["useSsl"] = False |
| connectArgs["durable"] = _durable |
| if self.src.username == "anonymous": |
| connectArgs["authMechanism"] = "ANONYMOUS" |
| else: |
| connectArgs["authMechanism"] = "PLAIN" |
| connectArgs["username"] = self.src.username |
| connectArgs["password"] = self.src.password |
| res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) |
| if _verbose: |
| print "Connect method returned:", res.status, res.statusText |
| link = self.getLink () |
| |
| def DelLink (self, srcBroker): |
| self.src = Broker (srcBroker) |
| mc = self.mclient |
| |
| brokers = mc.syncGetObjects (self.mch, "broker") |
| broker = brokers[0] |
| link = self.getLink() |
| if link == None: |
| print "Link not found" |
| sys.exit(1) |
| |
| res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") |
| if _verbose: |
| print "Close method returned:", res.status, res.statusText |
| |
| def ListLinks (self): |
| mc = self.mclient |
| links = mc.syncGetObjects (self.mch, "link") |
| if len(links) == 0: |
| print "No Links Found" |
| else: |
| print |
| print "Host Port Durable State Last Error" |
| print "===================================================================" |
| for link in links: |
| print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError) |
| |
| def AddRoute (self, srcBroker, exchange, routingKey, id, excludes): |
| self.src = Broker (srcBroker) |
| mc = self.mclient |
| |
| if self.dest.name() == self.src.name(): |
| print "Linking broker to itself is not permitted" |
| sys.exit(1) |
| |
| brokers = mc.syncGetObjects (self.mch, "broker") |
| broker = brokers[0] |
| |
| link = self.getLink () |
| if link == None: |
| if _verbose: |
| print "Inter-broker link not found, creating..." |
| |
| connectArgs = {} |
| connectArgs["host"] = self.src.host |
| connectArgs["port"] = self.src.port |
| connectArgs["useSsl"] = False |
| connectArgs["durable"] = _durable |
| if self.src.username == "anonymous": |
| connectArgs["authMechanism"] = "ANONYMOUS" |
| else: |
| connectArgs["authMechanism"] = "PLAIN" |
| connectArgs["username"] = self.src.username |
| connectArgs["password"] = self.src.password |
| res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) |
| if _verbose: |
| print "Connect method returned:", res.status, res.statusText |
| link = self.getLink () |
| |
| if link == None: |
| print "Protocol Error - Missing link ID" |
| sys.exit (1) |
| |
| bridges = mc.syncGetObjects (self.mch, "bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: |
| if not _quiet: |
| print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey) |
| sys.exit (1) |
| sys.exit (0) |
| |
| if _verbose: |
| print "Creating inter-broker binding..." |
| bridgeArgs = {} |
| bridgeArgs["durable"] = _durable |
| bridgeArgs["src"] = exchange |
| bridgeArgs["dest"] = exchange |
| bridgeArgs["key"] = routingKey |
| bridgeArgs["tag"] = id |
| bridgeArgs["excludes"] = excludes |
| bridgeArgs["srcIsQueue"] = 0 |
| bridgeArgs["srcIsLocal"] = 0 |
| res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) |
| if res.status == 4: |
| print "Can't create a durable route on a non-durable link" |
| sys.exit(1) |
| if _verbose: |
| print "Bridge method returned:", res.status, res.statusText |
| |
| def DelRoute (self, srcBroker, exchange, routingKey): |
| self.src = Broker (srcBroker) |
| mc = self.mclient |
| |
| link = self.getLink () |
| if link == None: |
| if not _quiet: |
| print "No link found from %s to %s" % (self.src.name(), self.dest.name()) |
| sys.exit (1) |
| sys.exit (0) |
| |
| bridges = mc.syncGetObjects (self.mch, "bridge") |
| for bridge in bridges: |
| if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: |
| if _verbose: |
| print "Closing bridge..." |
| res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") |
| if res.status != 0: |
| print "Error closing bridge: %d - %s" % (res.status, res.statusText) |
| sys.exit (1) |
| if len (bridges) == 1 and _dellink: |
| link = self.getLink () |
| if link == None: |
| sys.exit (0) |
| if _verbose: |
| print "Last bridge on link, closing link..." |
| res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") |
| if res.status != 0: |
| print "Error closing link: %d - %s" % (res.status, res.statusText) |
| sys.exit (1) |
| sys.exit (0) |
| if not _quiet: |
| print "Route not found" |
| sys.exit (1) |
| |
| def ListRoutes (self): |
| mc = self.mclient |
| links = mc.syncGetObjects (self.mch, "link") |
| bridges = mc.syncGetObjects (self.mch, "bridge") |
| |
| for bridge in bridges: |
| myLink = None |
| for link in links: |
| if bridge.linkRef == link.id: |
| myLink = link |
| break |
| if myLink != None: |
| print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) |
| |
| def ClearAllRoutes (self): |
| mc = self.mclient |
| links = mc.syncGetObjects (self.mch, "link") |
| bridges = mc.syncGetObjects (self.mch, "bridge") |
| |
| for bridge in bridges: |
| if _verbose: |
| myLink = None |
| for link in links: |
| if bridge.linkRef == link.id: |
| myLink = link |
| break |
| if myLink != None: |
| print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), |
| res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") |
| if res.status != 0: |
| print "Error: %d - %s" % (res.status, res.statusText) |
| elif _verbose: |
| print "Ok" |
| |
| if _dellink: |
| links = mc.syncGetObjects (self.mch, "link") |
| for link in links: |
| if _verbose: |
| print "Deleting Link: %s:%d... " % (link.host, link.port), |
| res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") |
| if res.status != 0: |
| print "Error: %d - %s" % (res.status, res.statusText) |
| elif _verbose: |
| print "Ok" |
| |
| def YN(val): |
| if val == 1: |
| return 'Y' |
| return 'N' |
| |
| ## |
| ## Main Program |
| ## |
| |
| try: |
| longOpts = ("verbose", "quiet", "durable", "del-empty-link") |
| (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "vqde", longOpts) |
| except: |
| Usage () |
| |
| for opt in optlist: |
| if opt[0] == "-v" or opt[0] == "--verbose": |
| _verbose = True |
| if opt[0] == "-q" or opt[0] == "--quiet": |
| _quiet = True |
| if opt[0] == "-d" or opt[0] == "--durable": |
| _durable = True |
| if opt[0] == "-e" or opt[0] == "--del-empty-link": |
| _dellink = True |
| |
| nargs = len (cargs) |
| if nargs < 2: |
| Usage () |
| if nargs == 2: |
| destBroker = "localhost" |
| else: |
| destBroker = cargs[2] |
| |
| group = cargs[0] |
| cmd = cargs[1] |
| rm = RouteManager (destBroker) |
| rm.ConnectToBroker () |
| |
| if group == "link": |
| if cmd == "add": |
| if nargs != 4: |
| Usage() |
| rm.AddLink (cargs[3]) |
| elif cmd == "del": |
| if nargs != 4: |
| Usage() |
| rm.DelLink (cargs[3]) |
| elif cmd == "list": |
| rm.ListLinks () |
| |
| elif group == "route": |
| if cmd == "add": |
| if nargs < 6 or nargs > 8: |
| Usage () |
| |
| id = "" |
| excludes = "" |
| if nargs > 6: id = cargs[6] |
| if nargs > 7: excludes = cargs[7] |
| rm.AddRoute (cargs[3], cargs[4], cargs[5], id, excludes) |
| elif cmd == "del": |
| if nargs != 6: |
| Usage () |
| else: |
| rm.DelRoute (cargs[3], cargs[4], cargs[5]) |
| else: |
| if cmd == "list": |
| rm.ListRoutes () |
| elif cmd == "flush": |
| rm.ClearAllRoutes () |
| else: |
| Usage () |
| rm.Disconnect () |