blob: e0e655683a90e46a82195b21a195792343b5e93e [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import getopt
import sys
import socket
import os
import locale
from qmf.console import Session, BrokerURL
def Usage():
print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
print
print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]"
print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>"
print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>"
print " qpid-route [OPTIONS] route list [<dest-broker>]"
print " qpid-route [OPTIONS] route flush [<dest-broker>]"
print " qpid-route [OPTIONS] route map [<broker>]"
print
print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
print " qpid-route [OPTIONS] link list [<dest-broker>]"
print
print "Options:"
print " -v [ --verbose ] Verbose output"
print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
print " -d [ --durable ] Added configuration shall be durable"
print " -e [ --del-empty-link ] Delete link after deleting last route on the link"
print " -s [ --src-local ] Make connection to source broker (push route)"
print " -t <transport> [ --transport <transport>]"
print " Specify transport to use for links, defaults to tcp"
print
print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
sys.exit(1)
_verbose = False
_quiet = False
_durable = False
_dellink = False
_srclocal = False
_transport = "tcp"
class RouteManager:
def __init__(self, localBroker):
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
self.broker = self.qmf.addBroker(localBroker)
def disconnect(self):
self.qmf.delBroker(self.broker)
def getLink(self):
links = self.qmf.getObjects(_class="link")
for link in links:
if self.remote.match(link.host, link.port):
return link
return None
def addLink(self, remoteBroker):
self.remote = BrokerURL(remoteBroker)
if self.local.match(self.remote.host, self.remote.port):
raise Exception("Linking broker to itself is not permitted")
brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
if self.remote.authName == "anonymous":
mech = "ANONYMOUS"
else:
mech = "PLAIN"
res = broker.connect(self.remote.host, self.remote.port, _durable,
mech, self.remote.authName, self.remote.authPass,
_transport)
if _verbose:
print "Connect method returned:", res.status, res.text
def delLink(self, remoteBroker):
self.remote = BrokerURL(remoteBroker)
brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
raise Exception("Link not found")
res = link.close()
if _verbose:
print "Close method returned:", res.status, res.text
def listLinks(self):
links = self.qmf.getObjects(_class="link")
if len(links) == 0:
print "No Links Found"
else:
print
print "Host Port Transport Durable State Last Error"
print "============================================================================="
for link in links:
print "%-16s%-8d%-13s%c %-18s%s" % \
(link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
def mapRoutes(self):
qmf = self.qmf
print
print "Finding Linked Brokers:"
brokerList = {}
brokerList[self.local.name()] = self.broker
print " %s... Ok" % self.local
added = True
while added:
added = False
links = qmf.getObjects(_class="link")
for link in links:
url = BrokerURL("%s:%d" % (link.host, link.port))
if url.name() not in brokerList:
print " %s..." % url.name(),
try:
b = qmf.addBroker("%s:%d" % (link.host, link.port))
brokerList[url.name()] = b
added = True
print "Ok"
except Exception, e:
print e
print
print "Dynamic Routes:"
bridges = qmf.getObjects(_class="bridge", dynamic=True)
fedExchanges = []
for bridge in bridges:
if bridge.src not in fedExchanges:
fedExchanges.append(bridge.src)
if len(fedExchanges) == 0:
print " none found"
print
for ex in fedExchanges:
print " Exchange %s:" % ex
pairs = []
for bridge in bridges:
if bridge.src == ex:
link = bridge._linkRef_
fromUrl = "%s:%s" % (link.host, link.port)
toUrl = bridge.getBroker().getUrl()
found = False
for pair in pairs:
if pair.matches(fromUrl, toUrl):
found = True
if not found:
pairs.append(RoutePair(fromUrl, toUrl))
for pair in pairs:
print " %s" % pair
print
print "Static Routes:"
bridges = qmf.getObjects(_class="bridge", dynamic=False)
if len(bridges) == 0:
print " none found"
print
for bridge in bridges:
link = bridge._linkRef_
fromUrl = "%s:%s" % (link.host, link.port)
toUrl = bridge.getBroker().getUrl()
leftType = "ex"
rightType = "ex"
if bridge.srcIsLocal:
arrow = "=>"
left = bridge.src
right = bridge.dest
if bridge.srcIsQueue:
leftType = "queue"
else:
arrow = "<="
left = bridge.dest
right = bridge.src
if bridge.srcIsQueue:
rightType = "queue"
if bridge.srcIsQueue:
print " %s(%s=%s) %s %s(%s=%s)" % \
(toUrl, leftType, left, arrow, fromUrl, rightType, right)
else:
print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
(toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
print
for broker in brokerList:
if broker != self.local.name():
qmf.delBroker(brokerList[broker])
def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
if dynamic and _srclocal:
raise Exception("--src-local is not permitted on dynamic routes")
self.addLink(remoteBroker)
link = self.getLink()
if link == None:
raise Exception("Link failed to create")
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
if not _quiet:
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
sys.exit(0)
if _verbose:
print "Creating inter-broker binding..."
res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic)
if res.status != 0:
raise Exception(res.text)
if _verbose:
print "Bridge method returned:", res.status, res.text
def addQueueRoute(self, remoteBroker, exchange, queue):
self.addLink(remoteBroker)
link = self.getLink()
if link == None:
raise Exception("Link failed to create")
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
if not _quiet:
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
sys.exit(0)
if _verbose:
print "Creating inter-broker binding..."
res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False)
if res.status != 0:
raise Exception(res.text)
if _verbose:
print "Bridge method returned:", res.status, res.text
def delQueueRoute(self, remoteBroker, exchange, queue):
self.remote = BrokerURL(remoteBroker)
link = self.getLink()
if link == None:
if not _quiet:
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
sys.exit(0)
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
if _verbose:
print "Closing bridge..."
res = bridge.close()
if res.status != 0:
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
if len(bridges) == 1 and _dellink:
link = self.getLink()
if link == None:
sys.exit(0)
if _verbose:
print "Last bridge on link, closing link..."
res = link.close()
if res.status != 0:
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
sys.exit(0)
if not _quiet:
raise Exception("Route not found")
def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
self.remote = BrokerURL(remoteBroker)
link = self.getLink()
if link == None:
if not _quiet:
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
sys.exit(0)
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
and bridge.dynamic == dynamic:
if _verbose:
print "Closing bridge..."
res = bridge.close()
if res.status != 0:
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
if len(bridges) == 1 and _dellink:
link = self.getLink()
if link == None:
sys.exit(0)
if _verbose:
print "Last bridge on link, closing link..."
res = link.close()
if res.status != 0:
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
sys.exit(0)
if not _quiet:
raise Exception("Route not found")
def listRoutes(self):
links = self.qmf.getObjects(_class="link")
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
myLink = None
for link in links:
if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
if bridge.dynamic:
keyText = "<dynamic>"
else:
keyText = bridge.key
print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
def clearAllRoutes(self):
links = self.qmf.getObjects(_class="link")
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if _verbose:
myLink = None
for link in links:
if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
res = bridge.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.text)
elif _verbose:
print "Ok"
if _dellink:
links = self.qmf.getObjects(_class="link")
for link in links:
if _verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
res = link.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.text)
elif _verbose:
print "Ok"
class RoutePair:
def __init__(self, fromUrl, toUrl):
self.fromUrl = fromUrl
self.toUrl = toUrl
self.bidir = False
def __repr__(self):
if self.bidir:
delimit = "<=>"
else:
delimit = " =>"
return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
def matches(self, fromUrl, toUrl):
if fromUrl == self.fromUrl and toUrl == self.toUrl:
return True
if toUrl == self.fromUrl and fromUrl == self.toUrl:
self.bidir = True
return True
return False
def YN(val):
if val == 1:
return 'Y'
return 'N'
##
## Main Program
##
try:
longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
except:
Usage()
try:
encoding = locale.getpreferredencoding()
cargs = [a.decode(encoding) for a in encArgs]
except:
cargs = encArgs
for opt in optlist:
if opt[0] == "-v" or opt[0] == "--verbose":
_verbose = True
if opt[0] == "-q" or opt[0] == "--quiet":
_quiet = True
if opt[0] == "-d" or opt[0] == "--durable":
_durable = True
if opt[0] == "-e" or opt[0] == "--del-empty-link":
_dellink = True
if opt[0] == "-s" or opt[0] == "--src-local":
_srclocal = True
if opt[0] == "-t" or opt[0] == "--transport":
_transport = opt[1]
nargs = len(cargs)
if nargs < 2:
Usage()
if nargs == 2:
localBroker = "localhost"
else:
if _srclocal:
localBroker = cargs[3]
remoteBroker = cargs[2]
else:
localBroker = cargs[2]
if nargs > 3:
remoteBroker = cargs[3]
group = cargs[0]
cmd = cargs[1]
try:
rm = RouteManager(localBroker)
if group == "link":
if cmd == "add":
if nargs != 4:
Usage()
rm.addLink(remoteBroker)
elif cmd == "del":
if nargs != 4:
Usage()
rm.delLink(remoteBroker)
elif cmd == "list":
rm.listLinks()
elif group == "dynamic":
if cmd == "add":
if nargs < 5 or nargs > 7:
Usage()
tag = ""
excludes = ""
if nargs > 5: tag = cargs[5]
if nargs > 6: excludes = cargs[6]
rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True)
elif cmd == "del":
if nargs != 5:
Usage()
else:
rm.delRoute(remoteBroker, cargs[4], "", dynamic=True)
elif group == "route":
if cmd == "add":
if nargs < 6 or nargs > 8:
Usage()
tag = ""
excludes = ""
if nargs > 6: tag = cargs[6]
if nargs > 7: excludes = cargs[7]
rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False)
elif cmd == "del":
if nargs != 6:
Usage()
rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False)
elif cmd == "map":
rm.mapRoutes()
else:
if cmd == "list":
rm.listRoutes()
elif cmd == "flush":
rm.clearAllRoutes()
else:
Usage()
elif group == "queue":
if nargs != 6:
Usage()
if cmd == "add":
rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
elif cmd == "del":
rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
else:
Usage()
except Exception,e:
print "Failed:", e.args
sys.exit(1)
rm.disconnect()