| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import os |
| from optparse import OptionParser, OptionGroup |
| import sys |
| import locale |
| import socket |
| import re |
| from qmf.console import Session |
| |
| class Config: |
| def __init__(self): |
| self._host = "localhost" |
| self._connTimeout = 10 |
| self._stopId = None |
| self._stopAll = False |
| self._force = False |
| self._numeric = False |
| self._showConn = False |
| self._delConn = None |
| |
| class IpAddr: |
| def __init__(self, text): |
| if text.find("@") != -1: |
| tokens = text.split("@") |
| text = tokens[1] |
| if text.find(":") != -1: |
| tokens = text.split(":") |
| text = tokens[0] |
| self.port = int(tokens[1]) |
| else: |
| self.port = 5672 |
| self.dottedQuad = socket.gethostbyname(text) |
| nums = self.dottedQuad.split(".") |
| self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) |
| |
| def bestAddr(self, addrPortList): |
| bestDiff = 0xFFFFFFFFL |
| bestAddr = None |
| for addrPort in addrPortList: |
| diff = IpAddr(addrPort[0]).addr ^ self.addr |
| if diff < bestDiff: |
| bestDiff = diff |
| bestAddr = addrPort |
| return bestAddr |
| |
| class BrokerManager: |
| def __init__(self, config, conn_options): |
| self.config = config |
| self.cert = None |
| self.conn_options = conn_options |
| self.brokerName = None |
| self.qmf = None |
| self.broker = None |
| self.brokers = [] |
| |
| def SetBroker(self, brokerUrl): |
| self.url = brokerUrl |
| self.qmf = Session() |
| self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout, **self.conn_options) |
| agents = self.qmf.getAgents() |
| for a in agents: |
| if a.getAgentBank() == '0': |
| self.brokerAgent = a |
| |
| def Disconnect(self): |
| """ Release any allocated brokers. Ignore any failures as the tool is |
| shutting down. |
| """ |
| try: |
| if self.broker: |
| self.qmf.delBroker(self.broker) |
| self.broker = None |
| while len(self.brokers): |
| b = self.brokers.pop() |
| self.qmf.delBroker(b) |
| except: |
| pass |
| |
| def _getClusters(self): |
| packages = self.qmf.getPackages() |
| if "org.apache.qpid.cluster" not in packages: |
| raise Exception("Clustering is not installed on the broker.") |
| |
| clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) |
| if len(clusters) == 0: |
| raise Exception("Clustering is installed but not enabled on the broker.") |
| |
| return clusters |
| |
| def _getHostList(self, urlList): |
| hosts = [] |
| hostAddr = IpAddr(self.config._host) |
| for url in urlList: |
| if url.find("amqp:") != 0: |
| raise Exception("Invalid URL 1") |
| url = url[5:] |
| addrs = str(url).split(",") |
| addrList = [] |
| for addr in addrs: |
| tokens = addr.split(":") |
| if len(tokens) != 3: |
| raise Exception("Invalid URL 2") |
| addrList.append((tokens[1], tokens[2])) |
| |
| # Find the address in the list that is most likely to be in the same subnet as the address |
| # with which we made the original QMF connection. This increases the probability that we will |
| # be able to reach the cluster member. |
| |
| best = hostAddr.bestAddr(addrList) |
| bestUrl = best[0] + ":" + best[1] |
| hosts.append(bestUrl) |
| return hosts |
| |
| def overview(self): |
| clusters = self._getClusters() |
| cluster = clusters[0] |
| memberList = cluster.members.split(";") |
| idList = cluster.memberIDs.split(";") |
| |
| print " Cluster Name: %s" % cluster.clusterName |
| print "Cluster Status: %s" % cluster.status |
| print " Cluster Size: %d" % cluster.clusterSize |
| print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) |
| for idx in range(1,len(idList)): |
| print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) |
| |
| def stopMember(self, id): |
| clusters = self._getClusters() |
| cluster = clusters[0] |
| idList = cluster.memberIDs.split(";") |
| if id not in idList: |
| raise Exception("No member with matching ID found") |
| |
| if not self.config._force: |
| prompt = "Warning: " |
| if len(idList) == 1: |
| prompt += "This command will shut down the last running cluster member." |
| else: |
| prompt += "This command will shut down a cluster member." |
| prompt += " Are you sure? [N]: " |
| |
| confirm = raw_input(prompt) |
| if len(confirm) == 0 or confirm[0].upper() != 'Y': |
| raise Exception("Operation canceled") |
| |
| cluster.stopClusterNode(id) |
| |
| def stopAll(self): |
| clusters = self._getClusters() |
| if not self.config._force: |
| prompt = "Warning: This command will shut down the entire cluster." |
| prompt += " Are you sure? [N]: " |
| |
| confirm = raw_input(prompt) |
| if len(confirm) == 0 or confirm[0].upper() != 'Y': |
| raise Exception("Operation canceled") |
| |
| cluster = clusters[0] |
| cluster.stopFullCluster() |
| |
| def showConnections(self): |
| clusters = self._getClusters() |
| cluster = clusters[0] |
| memberList = cluster.members.split(";") |
| idList = cluster.memberIDs.split(";") |
| displayList = [] |
| hostList = self._getHostList(memberList) |
| self.qmf.delBroker(self.broker) |
| self.broker = None |
| |
| idx = 0 |
| for host in hostList: |
| if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn: |
| self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout)) |
| displayList.append(idList[idx]) |
| idx += 1 |
| |
| idx = 0 |
| found = False |
| for broker in self.brokers: |
| if not self.config._delConn: |
| print "Clients on Member: ID=%s:" % displayList[idx] |
| connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) |
| for conn in connList: |
| if not conn.shadow: |
| if self.config._numeric or self.config._delConn: |
| a = conn.address |
| else: |
| tokens = conn.address.split(":") |
| try: |
| hostList = socket.gethostbyaddr(tokens[0]) |
| host = hostList[0] |
| except: |
| host = tokens[0] |
| a = host + ":" + tokens[1] |
| if self.config._delConn: |
| tokens = self.config._delConn.split(":") |
| ip = socket.gethostbyname(tokens[0]) |
| toDelete = ip + ":" + tokens[1] |
| if a == toDelete: |
| print "Closing connection from client: %s" % a |
| conn.close() |
| found = True |
| else: |
| print " %s" % a |
| idx += 1 |
| if not self.config._delConn: |
| print |
| if self.config._delConn and not found: |
| print "Client connection '%s' not found" % self.config._delConn |
| |
| while len(self.brokers): |
| broker = self.brokers.pop() |
| self.qmf.delBroker(broker) |
| |
| def main(argv=None): |
| |
| try: |
| config = Config() |
| |
| parser = OptionParser(usage="usage: %prog [options] BROKER", |
| description="Example: $ qpid-cluster -C broker-host:10000") |
| |
| parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)") |
| parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") |
| parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") |
| parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") |
| parser.add_option("-C", "--all-connections", action="store_true", default=False, help="View client connections to all cluster members") |
| parser.add_option("-c", "--connections", metavar="ID", help="View client connections to specified member") |
| parser.add_option("-d", "--del-connection", metavar="HOST:PORT", help="Disconnect a client connection") |
| parser.add_option("-s", "--stop", metavar="ID", help="Stop one member of the cluster by its ID") |
| parser.add_option("-k", "--all-stop", action="store_true", default=False, help="Shut down the whole cluster") |
| parser.add_option("-f", "--force", action="store_true", default=False, help="Suppress the 'are you sure' prompt") |
| parser.add_option("-n", "--numeric", action="store_true", default=False, help="Don't resolve names") |
| |
| opts, args = parser.parse_args(args=argv) |
| |
| if args: |
| config._host = args[0] |
| |
| if opts.timeout != 0: |
| config._connTimeout = opts.timeout |
| else: |
| config._connTimeout = None |
| |
| if opts.all_connections: |
| config._showConn = "all" |
| |
| if opts.connections: |
| config._connections = opts.connections |
| if len(config._connections.split(":")) != 2: |
| parser.error("Member ID must be of form: <host or ip>:<number>") |
| |
| if opts.del_connection: |
| config._delConn = opts.del_connection |
| if len(config._delConn.split(":")) != 2: |
| parser.error("Member ID must be of form: <host or ip>:<number>") |
| |
| if opts.stop: |
| config._stopId = opts.stop |
| if len(config._stopId.split(":")) != 2: |
| parser.error("Member ID must be of form: <host or ip>:<number>") |
| |
| if opts.ssl_key and not opts.ssl_certificate: |
| parser.error("missing '--ssl-certificate' (required by '--ssl-key')") |
| |
| config._stopAll = opts.all_stop |
| config._force = opts.force |
| config._numeric = opts.numeric |
| |
| conn_options = {} |
| if opts.sasl_mechanism: |
| conn_options['mechanisms'] = opts.sasl_mechanism |
| if opts.ssl_certificate: |
| conn_options['ssl_certfile'] = opts.ssl_certificate |
| if opts.ssl_key: |
| conn_options['ssl_keyfile'] = opts.ssl_key |
| |
| bm = BrokerManager(config, conn_options) |
| |
| try: |
| bm.SetBroker(config._host) |
| if config._stopId: |
| bm.stopMember(config._stopId) |
| elif config._stopAll: |
| bm.stopAll() |
| elif config._showConn or config._delConn: |
| bm.showConnections() |
| else: |
| bm.overview() |
| except KeyboardInterrupt: |
| print |
| except Exception,e: |
| bm.Disconnect() # try to deallocate brokers - ignores errors |
| if str(e).find("connection aborted") > 0: |
| # we expect this when asking the connected broker to shut down |
| return 0 |
| raise Exception("Failed: %s - %s" % (e.__class__.__name__, e)) |
| |
| bm.Disconnect() |
| except Exception, e: |
| print str(e) |
| return 1 |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |