blob: 3a224403357dcf65eb348b15f85a38815d67c864 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import argparse
import sys
from typing import List, Optional, Dict
from cli.constants import Arguments, DEFAULT_HEADER
from cli.options.option_tree import OptionTree, Option, Argument
class Parser(object):
"""
`Parser.parse()` is used to parse CLI input into an argparse.Namespace. The arguments expected by the parser are
defined by `OptionTree.getTree()` and by the arguments in `Parser._ROOT_ARGUMENTS`. This class is responsible for
translating the option tree into an ArgumentParser, for applying that ArgumentParser to the user input, and for
generating a custom help message based on the option tree.
"""
"""
Generates an argparse parser based on the option tree.
"""
_ROOT_ARGUMENTS = [
Argument(Arguments.HOST, str, hint="hostname"),
Argument(Arguments.PORT, int, hint="port"),
Argument(
Arguments.BASE_URL, str, hint="complete base URL instead of hostname:port"
),
Argument(
Arguments.CLIENT_ID, str, hint="client ID for token-based authentication"
),
Argument(
Arguments.CLIENT_SECRET,
str,
hint="client secret for token-based authentication",
),
Argument(
Arguments.ACCESS_TOKEN,
str,
hint="access token for token-based authentication",
),
Argument(
Arguments.REALM,
str,
hint="realm to use with header. if not defined will be used default realm from Polaris server. read more: https://polaris.apache.org/releases/1.1.0/configuration/",
default=None
),
Argument(
Arguments.HEADER,
str,
hint="header is defining a header name to use as context header name. if not defined will be used default from Polaris server",
default=DEFAULT_HEADER
),
Argument(Arguments.PROFILE, str, hint="profile for token-based authentication"),
Argument(Arguments.PROXY, str, hint="proxy URL"),
Argument(Arguments.DEBUG, bool, hint="Enable debug mode"),
]
@staticmethod
def _build_parser() -> argparse.ArgumentParser:
# Add everything from the option tree to the parser:
def add_arguments(parser, args: List[Argument]):
for arg in args:
kwargs = {"help": arg.hint, "type": arg.type}
if arg.choices:
kwargs["choices"] = arg.choices
if arg.lower:
kwargs["type"] = kwargs["type"].lower
if arg.default:
kwargs["default"] = arg.default
if arg.type is bool:
del kwargs['type']
parser.add_argument(arg.get_flag_name(), **kwargs, action='store_true')
elif arg.allow_repeats:
parser.add_argument(arg.get_flag_name(), **kwargs, action="append")
else:
parser.add_argument(arg.get_flag_name(), **kwargs)
def recurse_options(subparser, options: List[Option]):
for option in options:
option_parser = subparser.add_parser(
option.name, help=option.hint or option.name
)
add_arguments(option_parser, option.args)
if option.input_name:
option_parser.add_argument(
option.input_name,
type=str,
help=option.input_name.replace("_", " "),
default=None,
)
if option.children:
children_subparser = option_parser.add_subparsers(
dest=f"{option.name}_subcommand", required=False
)
recurse_options(children_subparser, option.children)
parser = TreeHelpParser(description="Polaris CLI")
add_arguments(parser, Parser._ROOT_ARGUMENTS)
subparser = parser.add_subparsers(dest="command", required=False)
recurse_options(subparser, OptionTree.get_tree())
return parser
@staticmethod
def parse(input: Optional[List[str]] = None) -> argparse.Namespace:
parser = Parser._build_parser()
return parser.parse_args(input)
@staticmethod
def parse_properties(properties: List[str]) -> Optional[Dict[str, str]]:
if not properties:
return None
results = dict()
for property in properties:
if "=" not in property:
raise Exception(f"Could not parse property `{property}`")
key, value = property.split("=", 1)
if not value:
raise Exception(f"Could not parse property `{property}`")
if key in results:
raise Exception(f"Duplicate property key `{key}`")
results[key] = value
return results
class TreeHelpParser(argparse.ArgumentParser):
"""
Replaces the default help behavior with a more readable message.
"""
INDENT = " " * 2
def parse_args(self, args=None, namespace=None):
if args is None:
args = sys.argv[1:]
help_index = min(
[float("inf")] + [args.index(x) for x in ["-h", "--help"] if x in args]
)
if help_index < float("inf"):
tree_str = self._get_tree_str(args[:help_index])
if tree_str:
print(f'input: polaris {" ".join(args)}')
print('options:')
print(tree_str)
print("\n")
self.print_usage()
super().exit()
else:
return super().parse_args(args, namespace)
else:
return super().parse_args(args, namespace)
def _get_tree_str(self, args: List[str]) -> Optional[str]:
command_path = self._get_command_path(args, OptionTree.get_tree())
if len(command_path) == 0:
result = TreeHelpParser.INDENT + "polaris"
for arg in Parser._ROOT_ARGUMENTS:
result += (
"\n"
+ (TreeHelpParser.INDENT * 2)
+ f"{arg.get_flag_name()} {arg.hint}"
)
for option in OptionTree.get_tree():
result += "\n" + self._get_tree_for_option(option, indent=2)
return result
else:
option_node = self._get_option_node(command_path, OptionTree.get_tree())
if option_node is None:
return None
else:
return self._get_tree_for_option(option_node)
def _get_tree_for_option(self, option: Option, indent=1) -> str:
result = ""
result += (TreeHelpParser.INDENT * indent) + option.name
if option.args:
result += "\n" + (TreeHelpParser.INDENT * (indent + 1)) + "Named arguments:"
for arg in option.args:
result += (
"\n"
+ (TreeHelpParser.INDENT * (indent + 2))
+ f"{arg.get_flag_name()} {arg.hint}"
)
if option.input_name:
result += (
"\n" + (TreeHelpParser.INDENT * (indent + 1)) + "Positional arguments:"
)
result += "\n" + (TreeHelpParser.INDENT * (indent + 2)) + option.input_name
if len(option.args) > 0 and len(option.children) > 0:
result += "\n"
for child in sorted(option.children, key=lambda o: o.name):
result += "\n" + self._get_tree_for_option(child, indent + 1)
return result
def _get_command_path(self, args: List[str], options: List[Option]) -> List[str]:
command_path = []
parser = self
while args:
arg = args.pop(0)
if arg in {o.name for o in options}:
command_path.append(arg)
try:
parser = parser._subparsers._group_actions[0].choices.get(arg)
if not parser:
break
except Exception:
break
options = list(filter(lambda o: o.name == arg, options))[0].children
if options is None:
break
return command_path
def _get_option_node(
self, command_path: List[str], nodes: List[Option]
) -> Optional[Option]:
if len(command_path) > 0:
for node in nodes:
if node.name == command_path[0]:
if len(command_path) == 1:
return node
else:
return self._get_option_node(command_path[1:], node.children)
return None