blob: a69ee7278abfc589019d97c01f578bf72d0de017 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''heron executor unittest'''
import os
import socket
import unittest2 as unittest
import json
from heron.executor.src.python.heron_executor import ProcessInfo
from heron.executor.src.python.heron_executor import HeronExecutor
from heron.proto.packing_plan_pb2 import PackingPlan
# pylint: disable=unused-argument
# pylint: disable=missing-docstring
def get_test_heron_internal_yaml():
"""Get the path to test_heron_internal.yaml
For example, __file__ would be
/tmp/_bazel_heron/randgen_dir/heron/heron/executor/tests/python/heron_executor_unittest.py
"""
heron_dir = '/'.join(__file__.split('/')[:-5])
yaml_path = os.path.join(heron_dir, 'heron/config/src/yaml/conf/test/test_heron_internals.yaml')
override_path = os.path.join(
heron_dir, 'heron/config/src/yaml/conf/test/test_override.yaml')
return yaml_path, override_path
INTERNAL_CONF_PATH, OVERRIDE_PATH = get_test_heron_internal_yaml()
HOSTNAME = socket.gethostname()
class CommandEncoder(json.JSONEncoder):
"""Customized JSONEncoder that works with Command object"""
def default(self, o):
return o.cmd
class MockPOpen(object):
"""fake subprocess.Popen object that we can use to mock processes and pids"""
next_pid = 0
def __init__(self):
self.pid = MockPOpen.next_pid
MockPOpen.next_pid += 1
@staticmethod
def set_next_pid(next_pid):
MockPOpen.next_pid = next_pid
class MockExecutor(HeronExecutor):
"""mock executor that overrides methods that don't apply to unit tests, like running processes"""
def __init__(self, args):
self.processes = []
super(MockExecutor, self).__init__(args, None)
# pylint: disable=no-self-use
def _load_logging_dir(self, heron_internals_config_file):
return "fake_dir"
def _run_process(self, name, cmd, env=None):
popen = MockPOpen()
self.processes.append(ProcessInfo(popen, name, cmd))
return popen
def _get_jvm_version(self):
return "1.8.y.x"
class HeronExecutorTest(unittest.TestCase):
"""Unittest for Heron Executor"""
def get_expected_shell_command(container_id):
return 'heron_shell_binary --port=shell-port ' \
'--log_file_prefix=fake_dir/heron-shell-%s.log ' \
'--secret=topid' % container_id
def build_packing_plan(self, instance_distribution):
packing_plan = PackingPlan()
for container_id in list(instance_distribution.keys()):
container_plan = packing_plan.container_plans.add()
container_plan.id = int(container_id)
for (component_name, global_task_id, component_index) in instance_distribution[container_id]:
instance_plan = container_plan.instance_plans.add()
instance_plan.component_name = component_name
instance_plan.task_id = int(global_task_id)
instance_plan.component_index = int(component_index)
return packing_plan
# pylint: disable=no-self-argument
def get_expected_metricsmgr_command(container_id):
return "heron_java_home/bin/java -Xmx1024M -XX:+PrintCommandLineFlags -verbosegc " \
"-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+PrintCommandLineFlags " \
"-Xloggc:log-files/gc.metricsmgr-%d.log -Djava.net.preferIPv4Stack=true " \
"-cp metricsmgr_classpath org.apache.heron.metricsmgr.MetricsManager " \
"--id=metricsmgr-%d --port=metricsmgr_port " \
"--topology=topname --cluster=cluster --role=role --environment=environ --topology-id=topid " \
"--system-config-file=%s --override-config-file=%s --sink-config-file=metrics_sinks_config_file" %\
(container_id, container_id, INTERNAL_CONF_PATH, OVERRIDE_PATH)
def get_expected_metricscachemgr_command():
return "heron_java_home/bin/java -Xmx1024M -XX:+PrintCommandLineFlags -verbosegc " \
"-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+PrintCommandLineFlags " \
"-Xloggc:log-files/gc.metricscache.log -Djava.net.preferIPv4Stack=true " \
"-cp metricscachemgr_classpath org.apache.heron.metricscachemgr.MetricsCacheManager " \
"--metricscache_id metricscache-0 --master_port metricscachemgr_masterport " \
"--stats_port metricscachemgr_statsport --topology_name topname --topology_id topid " \
"--system_config_file %s --override_config_file %s " \
"--sink_config_file metrics_sinks_config_file " \
"--cluster cluster --role role --environment environ" %\
(INTERNAL_CONF_PATH, OVERRIDE_PATH)
def get_expected_healthmgr_command():
return "heron_java_home/bin/java -Xmx1024M -XX:+PrintCommandLineFlags -verbosegc " \
"-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+PrintCommandLineFlags " \
"-Xloggc:log-files/gc.healthmgr.log -Djava.net.preferIPv4Stack=true " \
"-cp scheduler_classpath:healthmgr_classpath " \
"org.apache.heron.healthmgr.HealthManager --cluster cluster --role role " \
"--environment environ --topology_name topname --metricsmgr_port metricsmgr_port"
def get_expected_instance_command(component_name, instance_id, container_id):
instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
return "heron_java_home/bin/java -Xmx320M -Xms320M -Xmn160M -XX:MaxMetaspaceSize=128M " \
"-XX:MetaspaceSize=128M -XX:ReservedCodeCacheSize=64M -XX:+CMSScavengeBeforeRemark " \
"-XX:TargetSurvivorRatio=90 -XX:+PrintCommandLineFlags -verbosegc -XX:+PrintGCDetails " \
"-XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 " \
"-Xloggc:log-files/gc.%s.log -Djava.net.preferIPv4Stack=true " \
"-cp instance_classpath:classpath -XX:+HeapDumpOnOutOfMemoryError " \
"org.apache.heron.instance.HeronInstance -topology_name topname -topology_id topid -instance_id %s -component_name %s -task_id %d -component_index 0 -stmgr_id stmgr-%d " \
"-stmgr_port tmaster_controller_port -metricsmgr_port metricsmgr_port -system_config_file %s -override_config_file %s" \
% (instance_name, instance_name, component_name, instance_id,
container_id, INTERNAL_CONF_PATH, OVERRIDE_PATH)
MockPOpen.set_next_pid(37)
expected_processes_container_0 = [
ProcessInfo(MockPOpen(), 'heron-tmaster',
'tmaster_binary --topology_name=topname --topology_id=topid '
'--zkhostportlist=zknode --zkroot=zkroot --myhost=%s --master_port=master_port '
'--controller_port=tmaster_controller_port --stats_port=tmaster_stats_port '
'--config_file=%s --override_config_file=%s '
'--metrics_sinks_yaml=metrics_sinks_config_file '
'--metricsmgr_port=metricsmgr_port '
'--ckptmgr_port=ckptmgr-port' % (HOSTNAME, INTERNAL_CONF_PATH, OVERRIDE_PATH)),
ProcessInfo(MockPOpen(), 'heron-shell-0', get_expected_shell_command(0)),
ProcessInfo(MockPOpen(), 'metricsmgr-0', get_expected_metricsmgr_command(0)),
ProcessInfo(MockPOpen(), 'heron-metricscache', get_expected_metricscachemgr_command()),
ProcessInfo(MockPOpen(), 'heron-healthmgr', get_expected_healthmgr_command()),
]
MockPOpen.set_next_pid(37)
expected_processes_container_1 = [
ProcessInfo(MockPOpen(), 'stmgr-1',
'stmgr_binary --topology_name=topname --topology_id=topid '
'--topologydefn_file=topdefnfile --zkhostportlist=zknode --zkroot=zkroot '
'--stmgr_id=stmgr-1 '
'--instance_ids=container_1_word_3,container_1_exclaim1_2,container_1_exclaim1_1 '
'--myhost=%s --data_port=master_port '
'--local_data_port=tmaster_controller_port --metricsmgr_port=metricsmgr_port '
'--shell_port=shell-port --config_file=%s --override_config_file=%s '
'--ckptmgr_port=ckptmgr-port --ckptmgr_id=ckptmgr-1 '
'--metricscachemgr_mode=cluster'
% (HOSTNAME, INTERNAL_CONF_PATH, OVERRIDE_PATH)),
ProcessInfo(MockPOpen(), 'container_1_word_3', get_expected_instance_command('word', 3, 1)),
ProcessInfo(MockPOpen(), 'container_1_exclaim1_1',
get_expected_instance_command('exclaim1', 1, 1)),
ProcessInfo(MockPOpen(), 'container_1_exclaim1_2',
get_expected_instance_command('exclaim1', 2, 1)),
ProcessInfo(MockPOpen(), 'heron-shell-1', get_expected_shell_command(1)),
ProcessInfo(MockPOpen(), 'metricsmgr-1', get_expected_metricsmgr_command(1)),
]
MockPOpen.set_next_pid(37)
expected_processes_container_7 = [
ProcessInfo(MockPOpen(), 'container_7_word_11', get_expected_instance_command('word', 11, 7)),
ProcessInfo(MockPOpen(), 'container_7_exclaim1_210',
get_expected_instance_command('exclaim1', 210, 7)),
ProcessInfo(MockPOpen(), 'stmgr-7',
'stmgr_binary --topology_name=topname --topology_id=topid '
'--topologydefn_file=topdefnfile --zkhostportlist=zknode --zkroot=zkroot '
'--stmgr_id=stmgr-7 '
'--instance_ids=container_7_word_11,container_7_exclaim1_210 --myhost=%s '
'--data_port=master_port '
'--local_data_port=tmaster_controller_port --metricsmgr_port=metricsmgr_port '
'--shell_port=shell-port --config_file=%s --override_config_file=%s '
'--ckptmgr_port=ckptmgr-port --ckptmgr_id=ckptmgr-7 '
'--metricscachemgr_mode=cluster'
% (HOSTNAME, INTERNAL_CONF_PATH, OVERRIDE_PATH)),
ProcessInfo(MockPOpen(), 'metricsmgr-7', get_expected_metricsmgr_command(7)),
ProcessInfo(MockPOpen(), 'heron-shell-7', get_expected_shell_command(7)),
]
def setUp(self):
MockPOpen.set_next_pid(37)
self.maxDiff = None
self.executor_0 = MockExecutor(self.get_args(0))
self.executor_1 = MockExecutor(self.get_args(1))
self.executor_7 = MockExecutor(self.get_args(7))
self.packing_plan_expected = self.build_packing_plan({
1:[('word', '3', '0'), ('exclaim1', '2', '0'), ('exclaim1', '1', '0')],
7:[('word', '11', '0'), ('exclaim1', '210', '0')],
})
# ./heron-executor <shardid> <topname> <topid> <topdefnfile>
# <zknode> <zkroot> <tmaster_binary> <stmgr_binary>
# <metricsmgr_classpath> <instance_jvm_opts_in_base64> <classpath>
# <master_port> <tmaster_controller_port> <tmaster_stats_port> <heron_internals_config_file>
# <override_config_file> <component_rammap> <component_jvm_opts_in_base64> <pkg_type>
# <topology_bin_file> <heron_java_home> <shell-port> <heron_shell_binary> <metricsmgr_port>
# <cluster> <role> <environ> <instance_classpath> <metrics_sinks_config_file>
# <scheduler_classpath> <scheduler_port> <python_instance_binary>
@staticmethod
def get_args(shard_id):
executor_args = [
("--shard", shard_id),
("--topology-name", "topname"),
("--topology-id", "topid"),
("--topology-defn-file", "topdefnfile"),
("--state-manager-connection", "zknode"),
("--state-manager-root", "zkroot"),
("--state-manager-config-file", "state_manager_config_file"),
("--tmaster-binary", "tmaster_binary"),
("--stmgr-binary", "stmgr_binary"),
("--metrics-manager-classpath", "metricsmgr_classpath"),
("--instance-jvm-opts", "LVhYOitIZWFwRHVtcE9uT3V0T2ZNZW1vcnlFcnJvcg(61)(61)"),
("--classpath", "classpath"),
("--master-port", "master_port"),
("--tmaster-controller-port", "tmaster_controller_port"),
("--tmaster-stats-port", "tmaster_stats_port"),
("--heron-internals-config-file", INTERNAL_CONF_PATH),
("--override-config-file", OVERRIDE_PATH),
("--component-ram-map", "exclaim1:536870912,word:536870912"),
("--component-jvm-opts", ""),
("--pkg-type", "jar"),
("--topology-binary-file", "topology_bin_file"),
("--heron-java-home", "heron_java_home"),
("--shell-port", "shell-port"),
("--heron-shell-binary", "heron_shell_binary"),
("--metrics-manager-port", "metricsmgr_port"),
("--cluster", "cluster"),
("--role", "role"),
("--environment", "environ"),
("--instance-classpath", "instance_classpath"),
("--metrics-sinks-config-file", "metrics_sinks_config_file"),
("--scheduler-classpath", "scheduler_classpath"),
("--scheduler-port", "scheduler_port"),
("--python-instance-binary", "python_instance_binary"),
("--cpp-instance-binary", "cpp_instance_binary"),
("--metricscache-manager-classpath", "metricscachemgr_classpath"),
("--metricscache-manager-master-port", "metricscachemgr_masterport"),
("--metricscache-manager-stats-port", "metricscachemgr_statsport"),
("--is-stateful", "is_stateful_enabled"),
("--checkpoint-manager-classpath", "ckptmgr_classpath"),
("--checkpoint-manager-port", "ckptmgr-port"),
("--checkpoint-manager-ram", "1073741824"),
("--stateful-config-file", "stateful_config_file"),
("--health-manager-mode", "cluster"),
("--health-manager-classpath", "healthmgr_classpath"),
("--metricscache-manager-mode", "cluster")
]
args = ("%s=%s" % (arg[0], (str(arg[1]))) for arg in executor_args)
command = "./heron-executor %s" % (" ".join(args))
return command.split()
def test_update_packing_plan(self):
self.executor_0.update_packing_plan(self.packing_plan_expected)
self.assertEqual(self.packing_plan_expected, self.executor_0.packing_plan)
self.assertEqual({1: "stmgr-1", 7: "stmgr-7"}, self.executor_0.stmgr_ids)
self.assertEqual(
{0: "metricsmgr-0", 1: "metricsmgr-1", 7: "metricsmgr-7"}, self.executor_0.metricsmgr_ids)
self.assertEqual(
{0: "heron-shell-0", 1: "heron-shell-1", 7: "heron-shell-7"}, self.executor_0.heron_shell_ids)
def test_launch_container_0(self):
self.do_test_launch(self.executor_0, self.expected_processes_container_0)
def test_launch_container_1(self):
self.do_test_launch(self.executor_1, self.expected_processes_container_1)
def test_launch_container_7(self):
self.do_test_launch(self.executor_7, self.expected_processes_container_7)
def do_test_launch(self, executor, expected_processes):
executor.update_packing_plan(self.packing_plan_expected)
executor.launch()
monitored_processes = executor.processes_to_monitor
# convert to (pid, name, command)
found_processes = list([(process_info.pid, process_info.name, process_info.command_str) for process_info in executor.processes])
found_monitored = list([(pinfo[0], pinfo[1].name, pinfo[1].command_str) for pinfo in list(monitored_processes.items())])
found_processes.sort(key=lambda tuple: tuple[0])
found_monitored.sort(key=lambda tuple: tuple[0])
print("do_test_commands - found_processes: %s found_monitored: %s" \
% (found_processes, found_monitored))
self.assertEqual(found_processes, found_monitored)
print("do_test_commands - expected_processes: %s monitored_processes: %s" \
% (expected_processes, monitored_processes))
self.assert_processes(expected_processes, monitored_processes)
def test_change_instance_dist_container_1(self):
MockPOpen.set_next_pid(37)
self.executor_1.update_packing_plan(self.packing_plan_expected)
current_commands = self.executor_1.get_commands_to_run()
temp_dict = dict(
list(map((lambda process_info: (process_info.name, process_info.command.split(' '))),
self.expected_processes_container_1)))
current_json = json.dumps(current_commands, sort_keys=True, cls=CommandEncoder).split(' ')
temp_json = json.dumps(temp_dict, sort_keys=True).split(' ')
print("current_json: %s" % current_json)
print("temp_json: %s" % temp_json)
# better test error report
for (s1, s2) in zip(current_json, temp_json):
self.assertEqual(s1, s2)
# update instance distribution
new_packing_plan = self.build_packing_plan(
{1:[('word', '3', '0'), ('word', '2', '0'), ('exclaim1', '1', '0')]})
self.executor_1.update_packing_plan(new_packing_plan)
updated_commands = self.executor_1.get_commands_to_run()
# get the commands to kill, keep and start and verify
commands_to_kill, commands_to_keep, commands_to_start = \
self.executor_1.get_command_changes(current_commands, updated_commands)
self.assertEqual(['container_1_exclaim1_2', 'stmgr-1'], sorted(commands_to_kill.keys()))
self.assertEqual(
['container_1_exclaim1_1', 'container_1_word_3', 'heron-shell-1', 'metricsmgr-1'],
sorted(commands_to_keep.keys()))
self.assertEqual(['container_1_word_2', 'stmgr-1'], sorted(commands_to_start.keys()))
def assert_processes(self, expected_processes, found_processes):
self.assertEqual(len(expected_processes), len(found_processes))
for expected_process in expected_processes:
self.assert_process(expected_process, found_processes)
def assert_process(self, expected_process, found_processes):
pid = expected_process.pid
self.assertTrue(found_processes[pid])
self.assertEqual(expected_process.name, found_processes[pid].name)
self.assertEqual(expected_process.command, found_processes[pid].command_str)
self.assertEqual(1, found_processes[pid].attempts)