blob: 3b0636f549a12479eba92f08695f371a8664ee07 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License
#
"""
Utilities for command-line programs.
"""
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import sys
import json
import argparse
import os
try:
from collections.abc import Mapping, Sequence
except ImportError:
from collections import Mapping, Sequence
from qpid_dispatch_site import VERSION
from proton import SSLDomain, Url
from proton.utils import SyncRequestResponse, BlockingConnection
class UsageError(Exception):
"""
Raise this exception to indicate the usage message should be printed.
Handled by L{main}
"""
pass
def main(run, argv=sys.argv, parser=None):
"""
Call run(argv) with exception handling, do appropriate sys.exit().
@param parser: a Parser to use for usage related error messages.
@return: exit value for sys.exit
"""
try:
run(argv)
return 0
except KeyboardInterrupt:
print()
except UsageError as e:
parser.error(e)
except Exception as e:
if "_QPID_DISPATCH_TOOLS_DEBUG_" in os.environ:
raise
print("%s: %s" % (type(e).__name__, e))
return 1
def check_args(args, maxargs=0, minargs=0):
"""
Check number of arguments, raise UsageError if in correct.
@param maxargs: max number of allowed args after command or None to skip check.
@param minargs: min number of allowed args after command or None to skip check.
@return args padded with None to maxargs.
"""
if minargs is not None and len(args) < minargs:
raise UsageError("Not enough arguments, got %s need %s" % (len(args), minargs))
if maxargs is not None and len(args) > maxargs:
raise UsageError("Unexpected arguments: %s" % (" ".join(args[maxargs:])))
return args + [None] * (maxargs - len(args))
def parse_args_qdstat(BusManager, argv=None):
parser = _qdstat_parser(BusManager)
return parser.parse_args(args=argv)
def parse_args_qdmanage(operations, argv=None):
parser = _qdmanage_parser(operations)
return parser.parse_known_args(args=argv)
common_parser = argparse.ArgumentParser(add_help=False)
common_parser.add_argument('--version', action='version', version=VERSION)
common_parser.add_argument("-v", "--verbose", help="Show maximum detail",
action="count") # support -vvv
def _custom_optional_arguments_parser(*args, **kwargs):
parser = argparse.ArgumentParser(*args, **kwargs)
parser._optionals.title = "Optional Arguments"
return parser
def add_connection_options(parser):
group = parser.add_argument_group('Connection Options')
group.add_argument("-b", "--bus", default="0.0.0.0",
metavar="URL", help="URL of the messaging bus to connect to default %(default)s")
group.add_argument("-t", "--timeout", type=float, default=5, metavar="SECS",
help="Maximum time to wait for connection in seconds default %(default)s")
group.add_argument("--ssl-certificate", metavar="CERT",
help="Client SSL certificate (PEM Format)")
group.add_argument("--ssl-key", metavar="KEY",
help="Client SSL private key (PEM Format)")
group.add_argument("--ssl-trustfile", metavar="TRUSTED-CA-DB",
help="Trusted Certificate Authority Database file (PEM Format)")
group.add_argument("--ssl-password", metavar="PASSWORD",
help="Certificate password, will be prompted if not specifed.")
# Use the --ssl-password-file option to avoid having the --ssl-password in history or scripts.
group.add_argument("--ssl-password-file", metavar="SSL-PASSWORD-FILE",
help="Certificate password, will be prompted if not specifed.")
group.add_argument("--sasl-mechanisms", metavar="SASL-MECHANISMS",
help="Allowed sasl mechanisms to be supplied during the sasl handshake.")
group.add_argument("--sasl-username", metavar="SASL-USERNAME",
help="User name for SASL plain authentication")
group.add_argument("--sasl-password", metavar="SASL-PASSWORD",
help="Password for SASL plain authentication")
# Use the --sasl-password-file option to avoid having the --sasl-password in history or scripts.
group.add_argument("--sasl-password-file", metavar="SASL-PASSWORD-FILE",
help="Password for SASL plain authentication")
group.add_argument("--ssl-disable-peer-name-verify", action="store_true",
help="Disables SSL peer name verification. WARNING - This option is insecure and must not be used "
"in production environments")
def _qdstat_add_display_args(parser, BusManager):
_group = parser.add_argument_group('Display', 'Choose what kind of \
information you want to be displayed')
display = _group.add_mutually_exclusive_group(required=False)
display.add_argument("-g", "--general", action="store_const", dest="show",
help="Show General Router Stats",
const=BusManager.displayGeneral.__name__)
display.add_argument("-c", "--connections", action="store_const", dest="show",
help="Show Connections",
const=BusManager.displayConnections.__name__)
display.add_argument("-l", "--links", action="store_const", dest="show",
help="Show Router Links",
const=BusManager.displayRouterLinks.__name__)
display.add_argument("-n", "--nodes", action="store_const", dest="show",
help="Show Router Nodes",
const=BusManager.displayRouterNodes.__name__)
display.add_argument("-e", "--edge", action="store_const", dest="show",
help="Show edge connections",
const=BusManager.displayEdges.__name__)
display.add_argument("-a", "--address", action="store_const", dest="show",
help="Show Router Addresses",
const=BusManager.displayAddresses.__name__)
display.add_argument("-m", "--memory", action="store_const", dest="show",
help="Show Router Memory Stats",
const=BusManager.displayMemory.__name__)
display.add_argument("-p", "--policy", action="store_const", dest="show",
help="Show Router Policy",
const=BusManager.displayPolicy.__name__)
display.add_argument("--autolinks", action="store_const", dest="show",
help="Show Auto Links",
const=BusManager.displayAutolinks.__name__)
display.add_argument("--linkroutes", action="store_const", dest="show",
help="Show Link Routes",
const=BusManager.displayLinkRoutes.__name__)
display.add_argument("--vhosts", action="store_const", dest="show",
help="Show Vhosts",
const=BusManager.displayVhosts.__name__)
display.add_argument("--vhostgroups", action="store_const", dest="show",
help="Show Vhost Groups",
const=BusManager.displayVhostgroups.__name__)
display.add_argument("--vhoststats", action="store_const", dest="show",
help="Show Vhost Stats",
const=BusManager.displayVhoststats.__name__)
display.add_argument("--log", action="store_const", dest="show",
help="Show recent log entries",
const=BusManager.displayLog.__name__)
display.add_argument("--all-entities", action="store_const", dest="show",
help="Show all router entities. Can be combined with --all-routers option",
const=BusManager.show_all.__name__)
display.set_defaults(show=BusManager.displayGeneral.__name__)
def _qdstat_parser(BusManager):
parser = _custom_optional_arguments_parser(prog="qdstat", parents=[common_parser])
_qdstat_add_display_args(parser, BusManager)
_group = parser.add_argument_group('Target', 'Choose destination router to \
required, default the one you connect to.')
target = _group.add_mutually_exclusive_group(required=False)
target.add_argument("--all-routers", action="store_true",
help="Show entities for all routers in network. \
Can also be used in combination with other options")
target.add_argument("-r", "--router",
metavar="ROUTER-ID", help="Router to be queried")
target.add_argument("-d", "--edge-router", metavar="EDGE-ROUTER-ID", help="Edge Router to be queried")
# This limit can be used to limit the number of output rows and
# can be used in conjunction with options
# like -c, -l, -a, --autolinks, --linkroutes and --log.
# By default, the limit is not set, which means the limit is unlimited.
parser.add_argument("--limit", help="Limit number of output rows. Unlimited if limit is zero or if limit not specified", type=int, default=None)
parser.add_argument("--csv", help="Render tabular output in csv format", action="store_true")
add_connection_options(parser)
return parser
def _qdmanage_add_args(parser):
parser.add_argument("-r", "--router",
metavar="ROUTER-ID", help="Router to be queried")
# Edge routers are not part of the router network. Hence we need a separate option
# to be able to query edge routers
parser.add_argument("-d", '--edge-router', metavar="EDGE-ROUTER-ID", help='Edge Router to be queried')
parser.add_argument('--type', help='Type of entity to operate on.') # add choices
parser.add_argument('--name', help='Name of entity to operate on.')
parser.add_argument('--identity', help='Identity of entity to operate on.',
metavar="ID")
parser.add_argument("--indent", type=int, default=2,
help="Pretty-printing indent. -1 means don't pretty-print (default %(default)s)")
parser.add_argument('--stdin', action='store_true',
help='Read attributes as JSON map or list of maps from stdin.')
parser.add_argument('--body', help='JSON value to use as body of a non-standard operation call.')
parser.add_argument('--properties', help='JSON map to use as properties for a non-standard operation call.')
def _qdmanage_parser(operations):
description = "Standard operations: %s. Use GET-OPERATIONS to find additional operations." % (", ".join(operations))
parser = _custom_optional_arguments_parser(prog="qdmanage <operation>",
parents=[common_parser],
description=description)
_qdmanage_add_args(parser)
add_connection_options(parser)
return parser
def get_password(file=None):
if file:
with open(file, 'r') as password_file:
return str(password_file.read()).strip() # Remove leading and trailing characters
return None
class Sasl(object):
"""
A simple object to hold sasl mechanisms, sasl username and password
"""
def __init__(self, mechs=None, user=None, password=None, sasl_password_file=None):
self.mechs = mechs
self.user = user
self.password = password
self.sasl_password_file = sasl_password_file
if self.sasl_password_file:
self.password = get_password(self.sasl_password_file)
def opts_url(opts):
"""Fix up default URL settings based on options"""
url = Url(opts.bus)
# If the options indicate SSL, make sure we use the amqps scheme.
if opts.ssl_certificate or opts.ssl_trustfile:
url.scheme = "amqps"
return url
def opts_sasl(opts):
url = Url(opts.bus)
mechs, user, password, sasl_password_file = opts.sasl_mechanisms, (opts.sasl_username or url.username), (opts.sasl_password or url.password), opts.sasl_password_file
if not (mechs or user or password or sasl_password_file):
return None
return Sasl(mechs, user, password, sasl_password_file)
def opts_ssl_domain(opts, mode=SSLDomain.MODE_CLIENT):
"""Return proton.SSLDomain from command line options or None if no SSL options specified.
@param opts: Parsed optoins including connection_options()
"""
certificate, key, trustfile, password, password_file, ssl_disable_peer_name_verify = opts.ssl_certificate,\
opts.ssl_key,\
opts.ssl_trustfile,\
opts.ssl_password,\
opts.ssl_password_file, \
opts.ssl_disable_peer_name_verify
if not (certificate or trustfile):
return None
if password_file:
password = get_password(password_file)
domain = SSLDomain(mode)
if trustfile:
domain.set_trusted_ca_db(str(trustfile))
if ssl_disable_peer_name_verify:
domain.set_peer_authentication(SSLDomain.VERIFY_PEER, str(trustfile))
else:
domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, str(trustfile))
if certificate:
domain.set_credentials(str(certificate), str(key), str(password))
return domain