blob: 7cf52e0a6759ef1fbc7dcf203c3f7a5d70a8f6a2 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import sys
import socket
import os
import locale
from qmf.console import Session, BrokerURL
usage = """
Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism]
qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>
qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism]
qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>
qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism]
qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>
qpid-route [OPTIONS] route list [<dest-broker>]
qpid-route [OPTIONS] route flush [<dest-broker>]
qpid-route [OPTIONS] route map [<broker>]
qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism]
qpid-route [OPTIONS] link del <dest-broker> <src-broker>
qpid-route [OPTIONS] link list [<dest-broker>]"""
description = """
ADDRESS syntax:
[username/password@] hostname
ip-address [:<port>]"""
def Usage():
print usage
class Config:
def __init__(self):
self._verbose = False
self._quiet = False
self._durable = False
self._dellink = False
self._srclocal = False
self._transport = "tcp"
self._ack = 0
self._connTimeout = 10
self._conn_options = {}
config = Config()
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
"""
def format_usage(self, usage):
return usage
def format_description(self, description):
if description:
return description + "\n"
else:
return ""
def OptionsAndArguments(argv):
parser = OptionParser(usage=usage,
description=description,
formatter=JHelpFormatter())
parser.add_option("--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
parser.add_option("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings")
parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable")
parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link")
parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)")
parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N")
parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
opts, encArgs = parser.parse_args(args=argv)
try:
encoding = locale.getpreferredencoding()
args = [a.decode(encoding) for a in encArgs]
except:
args = encArgs
if opts.timeout:
config._connTimeout = opts.timeout
if config._connTimeout == 0:
config._connTimeout = None
if opts.verbose:
config._verbose = True
if opts.quiet:
config._quiet = True
if opts.durable:
config._durable = True
if opts.del_empty_link:
config._dellink = True
if opts.src_local:
config._srclocal = True
if opts.transport:
config._transport = opts.transport
if opts.ha_admin:
config._conn_options['client_properties'] = {'qpid.ha-admin' : 1}
if opts.ack:
config._ack = opts.ack
if opts.client_sasl_mechanism:
config._conn_options['mechanisms'] = opts.client_sasl_mechanism
if opts.ssl_certificate:
config._conn_options['ssl_certfile'] = opts.ssl_certificate
if opts.ssl_key:
if not opts.ssl_certificate:
parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
config._conn_options['ssl_keyfile'] = opts.ssl_key
return args
class RouteManager:
def __init__(self, localBroker):
self.brokerList = {}
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options)
self.broker._waitForStable()
self.agent = self.broker.getBrokerAgent()
def disconnect(self):
try:
if self.broker:
self.qmf.delBroker(self.broker)
self.broker = None
while len(self.brokerList):
b = self.brokerList.popitem()
if b[0] != self.local.name():
self.qmf.delBroker(b[1])
except:
pass # ignore errors while shutting down
def getLink(self):
links = self.agent.getObjects(_class="link")
for link in links:
if self.remote.match(link.host, link.port):
return link
return None
def addLink(self, remoteBroker, interbroker_mechanism=""):
self.remote = BrokerURL(remoteBroker)
if self.local.match(self.remote.host, self.remote.port):
raise Exception("Linking broker to itself is not permitted")
brokers = self.agent.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
res = broker.connect(self.remote.host, self.remote.port, config._durable,
interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "",
config._transport)
if config._verbose:
print "Connect method returned:", res.status, res.text
def delLink(self, remoteBroker):
self.remote = BrokerURL(remoteBroker)
brokers = self.agent.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
raise Exception("Link not found")
res = link.close()
if config._verbose:
print "Close method returned:", res.status, res.text
def listLinks(self):
links = self.agent.getObjects(_class="link")
if len(links) == 0:
print "No Links Found"
else:
print
print "Host Port Transport Durable State Last Error"
print "============================================================================="
for link in links:
print "%-16s%-8d%-13s%c %-18s%s" % \
(link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
def mapRoutes(self):
print
print "Finding Linked Brokers:"
self.brokerList[self.local.name()] = self.broker
print " %s... Ok" % self.local
added = True
while added:
added = False
links = self.qmf.getObjects(_class="link")
for link in links:
url = BrokerURL(host=link.host, port=link.port)
if url.name() not in self.brokerList:
print " %s..." % url.name(),
try:
b = self.qmf.addBroker(url, config._connTimeout)
self.brokerList[url.name()] = b
added = True
print "Ok"
except Exception, e:
print e
print
print "Dynamic Routes:"
bridges = self.qmf.getObjects(_class="bridge", dynamic=True)
fedExchanges = []
for bridge in bridges:
if bridge.src not in fedExchanges:
fedExchanges.append(bridge.src)
if len(fedExchanges) == 0:
print " none found"
print
for ex in fedExchanges:
print " Exchange %s:" % ex
pairs = []
for bridge in bridges:
if bridge.src == ex:
link = bridge._linkRef_
fromUrl = BrokerURL(host=link.host, port=link.port)
toUrl = bridge.getBroker().getUrl()
found = False
for pair in pairs:
if pair.matches(fromUrl, toUrl):
found = True
if not found:
pairs.append(RoutePair(fromUrl, toUrl))
for pair in pairs:
print " %s" % pair
print
print "Static Routes:"
bridges = self.qmf.getObjects(_class="bridge", dynamic=False)
if len(bridges) == 0:
print " none found"
print
for bridge in bridges:
link = bridge._linkRef_
fromUrl = "%s:%s" % (link.host, link.port)
toUrl = bridge.getBroker().getUrl()
leftType = "ex"
rightType = "ex"
if bridge.srcIsLocal:
arrow = "=>"
left = bridge.src
right = bridge.dest
if bridge.srcIsQueue:
leftType = "queue"
else:
arrow = "<="
left = bridge.dest
right = bridge.src
if bridge.srcIsQueue:
rightType = "queue"
if bridge.srcIsQueue:
print " %s(%s=%s) %s %s(%s=%s)" % \
(toUrl, leftType, left, arrow, fromUrl, rightType, right)
else:
print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
(toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
print
while len(self.brokerList):
b = self.brokerList.popitem()
if b[0] != self.local.name():
self.qmf.delBroker(b[1])
def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False):
if dynamic and config._srclocal:
raise Exception("--src-local is not permitted on dynamic routes")
self.addLink(remoteBroker, interbroker_mechanism)
link = self.getLink()
if link == None:
raise Exception("Link failed to create")
bridges = self.agent.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
if not config._quiet:
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
sys.exit(0)
if config._verbose:
print "Creating inter-broker binding..."
res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack)
if res.status != 0:
raise Exception(res.text)
if config._verbose:
print "Bridge method returned:", res.status, res.text
def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ):
self.addLink(remoteBroker, interbroker_mechanism)
link = self.getLink()
if link == None:
raise Exception("Link failed to create")
bridges = self.agent.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
if not config._quiet:
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
sys.exit(0)
if config._verbose:
print "Creating inter-broker binding..."
res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack)
if res.status != 0:
raise Exception(res.text)
if config._verbose:
print "Bridge method returned:", res.status, res.text
def delQueueRoute(self, remoteBroker, exchange, queue):
self.remote = BrokerURL(remoteBroker)
link = self.getLink()
if link == None:
if not config._quiet:
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
sys.exit(0)
bridges = self.agent.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
if config._verbose:
print "Closing bridge..."
res = bridge.close()
if res.status != 0:
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
if len(bridges) == 1 and config._dellink:
link = self.getLink()
if link == None:
sys.exit(0)
if config._verbose:
print "Last bridge on link, closing link..."
res = link.close()
if res.status != 0:
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
sys.exit(0)
if not config._quiet:
raise Exception("Route not found")
def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
self.remote = BrokerURL(remoteBroker)
link = self.getLink()
if link == None:
if not config._quiet:
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
sys.exit(0)
bridges = self.agent.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
and bridge.dynamic == dynamic:
if config._verbose:
print "Closing bridge..."
res = bridge.close()
if res.status != 0:
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
if len(bridges) == 1 and config._dellink:
link = self.getLink()
if link == None:
sys.exit(0)
if config._verbose:
print "Last bridge on link, closing link..."
res = link.close()
if res.status != 0:
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
return
if not config._quiet:
raise Exception("Route not found")
def listRoutes(self):
links = self.qmf.getObjects(_class="link")
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
myLink = None
for link in links:
if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
if bridge.dynamic:
keyText = "<dynamic>"
else:
keyText = bridge.key
print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
def clearAllRoutes(self):
links = self.qmf.getObjects(_class="link")
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if config._verbose:
myLink = None
for link in links:
if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
res = bridge.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.text)
elif config._verbose:
print "Ok"
if config._dellink:
links = self.qmf.getObjects(_class="link")
for link in links:
if config._verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
res = link.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.text)
elif config._verbose:
print "Ok"
class RoutePair:
def __init__(self, fromUrl, toUrl):
self.fromUrl = fromUrl
self.toUrl = toUrl
self.bidir = False
def __repr__(self):
if self.bidir:
delimit = "<=>"
else:
delimit = " =>"
return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
def matches(self, fromUrl, toUrl):
if fromUrl == self.fromUrl and toUrl == self.toUrl:
return True
if toUrl == self.fromUrl and fromUrl == self.toUrl:
self.bidir = True
return True
return False
def YN(val):
if val == 1:
return 'Y'
return 'N'
def main(argv=None):
args = OptionsAndArguments(argv)
nargs = len(args)
if nargs < 2:
Usage()
return(-1)
if nargs == 2:
localBroker = socket.gethostname()
else:
if config._srclocal:
localBroker = args[3]
remoteBroker = args[2]
else:
localBroker = args[2]
if nargs > 3:
remoteBroker = args[3]
group = args[0]
cmd = args[1]
rm = None
try:
rm = RouteManager(localBroker)
if group == "link":
if cmd == "add":
if nargs < 3 or nargs > 5:
Usage()
return(-1)
interbroker_mechanism = ""
if nargs > 4: interbroker_mechanism = args[4]
rm.addLink(remoteBroker, interbroker_mechanism)
elif cmd == "del":
if nargs != 4:
Usage()
return(-1)
rm.delLink(remoteBroker)
elif cmd == "list":
rm.listLinks()
elif group == "dynamic":
if cmd == "add":
if nargs < 5 or nargs > 8:
Usage()
return(-1)
tag = ""
excludes = ""
interbroker_mechanism = ""
if nargs > 5: tag = args[5]
if nargs > 6: excludes = args[6]
if nargs > 7: interbroker_mechanism = args[7]
rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True)
elif cmd == "del":
if nargs != 5:
Usage()
return(-1)
else:
rm.delRoute(remoteBroker, args[4], "", dynamic=True)
elif group == "route":
if cmd == "add":
if nargs < 6 or nargs > 9:
Usage()
return(-1)
tag = ""
excludes = ""
interbroker_mechanism = ""
if nargs > 6: tag = args[6]
if nargs > 7: excludes = args[7]
if nargs > 8: interbroker_mechanism = args[8]
rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False)
elif cmd == "del":
if nargs != 6:
Usage()
return(-1)
rm.delRoute(remoteBroker, args[4], args[5], dynamic=False)
elif cmd == "map":
rm.mapRoutes()
else:
if cmd == "list":
rm.listRoutes()
elif cmd == "flush":
rm.clearAllRoutes()
else:
Usage()
return(-1)
elif group == "queue":
if nargs < 6 or nargs > 7:
Usage()
return(-1)
if cmd == "add":
interbroker_mechanism = ""
if nargs > 6: interbroker_mechanism = args[6]
rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] )
elif cmd == "del":
rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
else:
Usage()
return(-1)
else:
Usage()
return(-1)
except Exception,e:
if rm:
rm.disconnect() # try to release broker resources
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
rm.disconnect()
return 0
if __name__ == "__main__":
sys.exit(main())