blob: e499fb68643ea9cf652adcbaf1192fbf92ca8aba [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' update.py '''
from heron.common.src.python.utils.log import Log
from heron.tools.cli.src.python.result import SimpleResult, Status
import heron.tools.cli.src.python.args as args
import heron.tools.cli.src.python.cli_helper as cli_helper
import heron.tools.cli.src.python.jars as jars
import heron.tools.common.src.python.utils.config as config
import argparse
import re
def create_parser(subparsers):
""" Create the parse for the update command """
parser = subparsers.add_parser(
'update',
help='Update a topology',
usage="%(prog)s [options] cluster/[role]/[env] <topology-name> "
+ "[--component-parallelism <name:value>] "
+ "[--container-number value] "
+ "[--runtime-config [component:]<name:value>]",
add_help=True)
args.add_titles(parser)
args.add_cluster_role_env(parser)
args.add_topology(parser)
args.add_config(parser)
args.add_dry_run(parser)
args.add_service_url(parser)
args.add_verbose(parser)
# Special parameters for update command
def parallelism_type(value):
pattern = re.compile(r"^[\w\.-]+:[\d]+$")
if not pattern.match(value):
raise argparse.ArgumentTypeError(
"Invalid syntax for component parallelism (<component_name:value>): %s" % value)
return value
parser.add_argument(
'--component-parallelism',
action='append',
type=parallelism_type,
required=False,
help='Component name and the new parallelism value '
+ 'colon-delimited: <component_name>:<parallelism>')
def runtime_config_type(value):
pattern = re.compile(r"^([\w\.-]+:){1,2}[\w\.-]+$")
if not pattern.match(value):
raise argparse.ArgumentTypeError(
"Invalid syntax for runtime config ([component:]<name:value>): %s"
% value)
return value
parser.add_argument(
'--runtime-config',
action='append',
type=runtime_config_type,
required=False,
help='Runtime configurations for topology and components '
+ 'colon-delimited: [component:]<name>:<value>')
def container_number_type(value):
pattern = re.compile(r"^\d+$")
if not pattern.match(value):
raise argparse.ArgumentTypeError(
"Invalid syntax for container number (value): %s"
% value)
return value
parser.add_argument(
'--container-number',
action='append',
type=container_number_type,
required=False,
help='Number of containers <value>')
parser.set_defaults(subcommand='update')
return parser
def build_extra_args_dict(cl_args):
""" Build extra args map """
# Check parameters
component_parallelism = cl_args['component_parallelism']
runtime_configs = cl_args['runtime_config']
container_number = cl_args['container_number']
# Users need to provide either (component-parallelism || container_number) or runtime-config
if (component_parallelism and runtime_configs) or (container_number and runtime_configs):
raise Exception(
"(component-parallelism or container_num) and runtime-config " +
"can't be updated at the same time")
dict_extra_args = {}
nothing_set = True
if component_parallelism:
dict_extra_args.update({'component_parallelism': component_parallelism})
nothing_set = False
if container_number:
dict_extra_args.update({'container_number': container_number})
nothing_set = False
if runtime_configs:
dict_extra_args.update({'runtime_config': runtime_configs})
nothing_set = False
if nothing_set:
raise Exception(
"Missing arguments --component-parallelism or --runtime-config or --container-number")
if cl_args['dry_run']:
dict_extra_args.update({'dry_run': True})
if 'dry_run_format' in cl_args:
dict_extra_args.update({'dry_run_format': cl_args["dry_run_format"]})
return dict_extra_args
def convert_args_dict_to_list(dict_extra_args):
""" flatten extra args """
list_extra_args = []
if 'component_parallelism' in dict_extra_args:
list_extra_args += ["--component_parallelism",
','.join(dict_extra_args['component_parallelism'])]
if 'runtime_config' in dict_extra_args:
list_extra_args += ["--runtime_config",
','.join(dict_extra_args['runtime_config'])]
if 'container_number' in dict_extra_args:
list_extra_args += ["--container_number",
','.join(dict_extra_args['container_number'])]
if 'dry_run' in dict_extra_args and dict_extra_args['dry_run']:
list_extra_args += ['--dry_run']
if 'dry_run_format' in dict_extra_args:
list_extra_args += ['--dry_run_format', dict_extra_args['dry_run_format']]
return list_extra_args
# pylint: disable=unused-argument
def run(command, parser, cl_args, unknown_args):
""" run the update command """
Log.debug("Update Args: %s", cl_args)
# Build jar list
extra_lib_jars = jars.packing_jars()
action = "update topology%s" % (' in dry-run mode' if cl_args["dry_run"] else '')
# Build extra args
dict_extra_args = {}
try:
dict_extra_args = build_extra_args_dict(cl_args)
except Exception as err:
return SimpleResult(Status.InvocationError, err.message)
# Execute
if cl_args['deploy_mode'] == config.SERVER_MODE:
return cli_helper.run_server(command, cl_args, action, dict_extra_args)
else:
# Convert extra argument to commandline format and then execute
list_extra_args = convert_args_dict_to_list(dict_extra_args)
return cli_helper.run_direct(command, cl_args, action, list_extra_args, extra_lib_jars)