| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import optparse, sys, time, os, re, math |
| from qpid.messaging import Connection |
| from qpid.util import URL |
| from qpidtoollibs.broker import BrokerAgent |
| from qpidtoollibs.config import parse_qpidd_conf |
| try: |
| from uuid import uuid4 |
| except ImportError: |
| from qpid.datatypes import uuid4 |
| |
| # QMF address for the HA broker object. |
| HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" |
| # Define these defaults here rather than in add_option because we want |
| # to use qpidd.conf for defaults if --config is specified and |
| # these defaults otherwise: |
| DEFAULTS = { "broker":"0.0.0.0", "timeout":10.0} |
| |
| class ExitStatus(Exception): |
| """Raised if a command want's a non-0 exit status from the script""" |
| def __init__(self, status): self.status = status |
| |
| def find_qpidd_conf(): |
| """Return the path to the local qpid.conf file or None if it is not found""" |
| p = os.path |
| prefix, bin = p.split(p.dirname(__file__)) |
| if bin == "bin": # Installed in a standard place. |
| conf = p.join(prefix, "etc", "qpid", "qpidd.conf") |
| if p.isfile(conf): return conf |
| return None |
| |
| class Command(object): |
| """ |
| Common options and logic for all commands. Subclasses provide additional |
| options and execution logic. |
| """ |
| |
| commands = [] |
| |
| def __init__(self, name, help, arg_names=[], connect_agent=True): |
| """@param connect_agent true if we should establish a QMF agent connection""" |
| Command.commands.append(self) |
| self.name = name |
| self.connect_agent = connect_agent |
| self.arg_names = arg_names |
| usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help) |
| self.help = help |
| self.op=optparse.OptionParser(usage) |
| common = optparse.OptionGroup(self.op, "Broker connection options") |
| def help_default(what): return " (Default %s)"%DEFAULTS[what] |
| common.add_option("-b", "--broker", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]"+help_default("broker")) |
| common.add_option("--timeout", type="float", metavar="<seconds>", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout")) |
| common.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override") |
| common.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use") |
| common.add_option("--ssl-certificate", metavar="<cert>", help="Client SSL certificate (PEM Format)") |
| common.add_option("--ssl-key", metavar="<key>", help="Client SSL private key (PEM Format)") |
| common.add_option("--config", metavar="<path/to/qpidd.conf>", help="Read default connection configuration from the qpidd.conf broker configuration file. Defaults are overridden by command-line options.)") |
| self.op.add_option_group(common) |
| |
| def connect(self, opts): |
| conn_options = {} |
| if not opts.broker: |
| opts.broker = DEFAULTS["broker"] |
| # If we are connecting locally, use local qpidd.conf by default |
| if not opts.config: opts.config = find_qpidd_conf() |
| url = URL(opts.broker) |
| if opts.config: # Use broker config file for defaults |
| config = parse_qpidd_conf(opts.config) |
| if not url.user: url.user = config.get("ha-username") |
| if not url.password: url.password = config.get("ha-password") |
| if not url.port: url.port = config.get("port") |
| opts.broker = str(url) |
| if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism") |
| if not opts.timeout: |
| timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval") |
| if timeout: opts.timeout = float(timeout) |
| else: # Use DEFAULTS |
| if not opts.timeout: opts.timeout = DEFAULTS["timeout"] |
| if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism |
| if opts.sasl_service_name: |
| conn_options['sasl_service'] = opts.sasl_service_name |
| if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate |
| if opts.ssl_key: |
| if not opts.ssl_certificate: |
| self.op.error("missing '--ssl-certificate' (required by '--ssl-key')") |
| conn_options['ssl_keyfile'] = opts.ssl_key |
| conn_options['client_properties'] = {'qpid.ha-admin' : 1} |
| if opts.timeout: |
| conn_options['timeout'] = opts.timeout |
| conn_options['heartbeat'] = int(math.ceil(opts.timeout/2)) |
| connection = Connection.establish(opts.broker, **conn_options) |
| qmf_broker = self.connect_agent and BrokerAgent(connection) |
| ha_broker = self.connect_agent and qmf_broker.getHaBroker() |
| return (connection, qmf_broker, ha_broker) |
| |
| def all_brokers(self, ha_broker, opts, func): |
| """@return: List of (broker_addr, ha_broker) for all brokers in the cluster. |
| Returns (broker_addr, Exception) if an exception is raised accessing a broker. |
| """ |
| # The brokersUrl setting is not in python URL format, simpler parsing here. |
| result = [] |
| brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(",")) |
| if brokers and opts.all: |
| if "@" in opts.broker: userpass = opts.broker.split("@")[0] |
| else: userpass = None |
| for b in brokers: |
| if userpass and not "@" in b: opts.broker = userpass+"@"+b |
| else: opts.broker = b |
| try: |
| connection, qmf_broker, ha_broker = self.connect(opts) |
| func(ha_broker, b) |
| except Exception,e: |
| func(ha_broker, b, e) |
| else: |
| func(ha_broker) |
| |
| def execute(self, args): |
| opts, args = self.op.parse_args(args) |
| if len(args) != len(self.arg_names)+1: |
| self.op.print_help() |
| raise Exception("Wrong number of arguments") |
| self.connection, qmf_broker, ha_broker = self.connect(opts) |
| if self.connect_agent and not ha_broker: |
| raise Exception("HA module is not loaded on broker at %s" % opts.broker) |
| try: self.do_execute(qmf_broker, ha_broker, opts, args) |
| finally: self.connection.close() |
| |
| def do_execute(self, qmf_broker, opts, args): |
| raise Exception("Command '%s' is not yet implemented"%self.name) |
| |
| class ManagerCommand(Command): |
| """ |
| Base for commands that should only be used by a cluster manager tool that ensures |
| cluster consistency. |
| """ |
| |
| manager_commands = [] # Cluster manager commands |
| |
| def __init__(self, name, help, arg_names=[], connect_agent=True): |
| """@param connect_agent true if we should establish a QMF agent connection""" |
| super(ManagerCommand, self).__init__(name, "[Cluster manager only] "+help, arg_names, connect_agent) |
| self.commands.remove(self) # Not a user command |
| self.manager_commands.append(self) |
| |
| |
| class PingCmd(Command): |
| def __init__(self): |
| Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False) |
| def do_execute(self, qmf_broker, ha_broker, opts, args): |
| self.connection.session() # Make sure we can establish a session. |
| PingCmd() |
| |
| class PromoteCmd(ManagerCommand): |
| def __init__(self): |
| super(PromoteCmd, self).__init__("promote", "Promote a backup broker to primary. This command should *only* be used by a cluster manager (such as rgmanager) that ensures only one broker is primary at a time. Promoting more than one broker to primary at the same time will make the cluster inconsistent and will cause data loss and unexpected behavior.") |
| |
| def do_execute(self, qmf_broker, ha_broker, opts, args): |
| qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout) |
| |
| PromoteCmd() |
| |
| |
| class StatusCmd(Command): |
| def __init__(self): |
| Command.__init__(self, "status", "Print HA status") |
| self.op.add_option( |
| "--expect", metavar="<status>", |
| help="Don't print status. Return 0 if it matches <status>, 1 otherwise") |
| self.op.add_option( |
| "--is-primary", action="store_true", default=False, |
| help="Don't print status. Return 0 if the broker is primary, 1 otherwise") |
| self.op.add_option( |
| "--all", action="store_true", default=False, |
| help="Print status for all brokers in the cluster") |
| |
| def do_execute(self, qmf_broker, ha_broker, opts, args): |
| if opts.is_primary: |
| if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1) |
| return |
| if opts.expect: |
| if opts.expect != ha_broker.status: raise ExitStatus(1) |
| return |
| |
| def status(hb, b=None, ex=None): |
| if ex: print b, ex |
| elif b: print b, hb.status |
| else: print hb.status |
| self.all_brokers(ha_broker, opts, status) |
| |
| StatusCmd() |
| |
| class ReplicateCmd(Command): |
| def __init__(self): |
| Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"]) |
| def do_execute(self, qmf_broker, ha_broker, opts, args): |
| qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout) |
| ReplicateCmd() |
| |
| class QueryCmd(Command): |
| def __init__(self): |
| Command.__init__(self, "query", "Print HA configuration and status") |
| self.op.add_option( |
| "--all", action="store_true", default=False, |
| help="Print configuration and status for all brokers in the cluster") |
| |
| def do_execute(self, qmf_broker, ha_broker, opts, args): |
| def query(hb, b=None, ex=None): |
| if ex: |
| print "%s %s\n" % (b, ex) |
| else: |
| if b: |
| print "%-20s %s"%("Address:", b) |
| for x in [("Status:", hb.status), |
| ("Broker ID:", hb.systemId), |
| ("Brokers URL:", hb.brokersUrl), |
| ("Public URL:", hb.publicUrl), |
| ("Replicate: ", hb.replicateDefault) |
| ]: |
| print "%-20s %s"%x |
| if b: print |
| self.all_brokers(ha_broker, opts, query) |
| |
| |
| QueryCmd() |
| |
| def print_usage(prog): |
| print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%prog |
| for cmd in Command.commands: |
| print " %-12s %s."%(cmd.name, cmd.help.split(".")[0]) |
| print "\nFor help with a command type: %s <command> --help\n"%prog |
| |
| def find_command(args, commands): |
| """Find a command among the arguments and options""" |
| for arg in args: |
| cmds = [cmd for cmd in commands if cmd.name == arg] |
| if cmds: return cmds[0] |
| return None |
| |
| def main_except(argv): |
| """This version of main raises exceptions""" |
| args = argv[1:] |
| commands = Command.commands |
| if "--cluster-manager" in args: |
| commands += ManagerCommand.manager_commands |
| args.remove("--cluster-manager") |
| if len(args) and args[0] in ['help', '--help', '-help', '-h', 'help-all', '--help-all']: |
| if 'help-all' in args[0]: |
| for c in commands: c.op.print_help(); print |
| else: |
| print_usage(os.path.basename(argv[0])); |
| else: |
| command = find_command(args, commands) |
| if command: |
| command.execute(args) |
| else: |
| # Check for attempt to use a manager command without --cluster-manager |
| command = find_command(args, ManagerCommand.manager_commands) |
| if command: |
| message="""'%s' should only be called by the cluster manager. |
| Incorrect use of '%s' will cause cluster malfunction. |
| To call from a cluster manager use '%s --cluster-manager'. """ |
| raise Exception(message%((command.name,)*3)) |
| else: |
| print_usage(os.path.basename(argv[0])); |
| raise Exception("No valid command") |
| |
| def main(argv): |
| try: |
| main_except(argv) |
| return 0 |
| except ExitStatus, e: |
| return e.status |
| except Exception, e: |
| print "%s: %s"%(type(e).__name__, e) |
| return 1 |
| |
| if __name__ == "__main__": |
| sys.exit(main(sys.argv)) |