blob: cc35146e8fc1b66c5071cace6e21a31ddc06f7b8 [file] [log] [blame]
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''heron executor unittest'''
import os
import socket
import unittest2 as unittest
from heron.executor.src.python.heron_executor import ProcessInfo
from heron.executor.src.python.heron_executor import HeronExecutor
from heron.proto.packing_plan_pb2 import PackingPlan
# pylint: disable=unused-argument
# pylint: disable=missing-docstring
def get_test_heron_internal_yaml():
"""Get the path to test_heron_internal.yaml
For example, __file__ would be
/tmp/_bazel_heron/randgen_dir/heron/heron/executor/tests/python/heron_executor_unittest.py
"""
heron_dir = '/'.join(__file__.split('/')[:-5])
yaml_path = os.path.join(heron_dir, 'heron/config/src/yaml/conf/test/test_heron_internals.yaml')
return yaml_path
INTERNAL_CONF_PATH = get_test_heron_internal_yaml()
HOSTNAME = socket.gethostname()
class MockPOpen(object):
"""fake subprocess.Popen object that we can use to mock processes and pids"""
next_pid = 0
def __init__(self):
self.pid = MockPOpen.next_pid
MockPOpen.next_pid += 1
@staticmethod
def set_next_pid(next_pid):
MockPOpen.next_pid = next_pid
class MockExecutor(HeronExecutor):
"""mock executor that overrides methods that don't apply to unit tests, like running processes"""
def __init__(self, args):
self.processes = []
super(MockExecutor, self).__init__(args, None)
# pylint: disable=no-self-use
def _load_logging_dir(self, heron_internals_config_file):
return "fake_dir"
def _run_process(self, name, cmd, env=None):
popen = MockPOpen()
self.processes.append(ProcessInfo(popen, name, cmd))
return popen
def _get_jvm_version(self):
return "1.8.y.x"
class HeronExecutorTest(unittest.TestCase):
"""Unittest for Heron Executor"""
shell_command_expected = 'heron_shell_binary --port=shell-port ' \
'--log_file_prefix=fake_dir/heron-shell.log'
def build_packing_plan(self, instance_distribution):
packing_plan = PackingPlan()
for container_id in instance_distribution.keys():
container_plan = packing_plan.container_plans.add()
container_plan.id = int(container_id)
for (component_name, global_task_id, component_index) in instance_distribution[container_id]:
instance_plan = container_plan.instance_plans.add()
instance_plan.component_name = component_name
instance_plan.task_id = int(global_task_id)
instance_plan.component_index = int(component_index)
return packing_plan
# pylint: disable=no-self-argument
def get_expected_metricsmgr_command(container_id):
return "heron_java_home/bin/java -Xmx1024M -XX:+PrintCommandLineFlags -verbosegc " \
"-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+PrintCommandLineFlags " \
"-Xloggc:log-files/gc.metricsmgr.log -Djava.net.preferIPv4Stack=true " \
"-cp metricsmgr_classpath com.twitter.heron.metricsmgr.MetricsManager metricsmgr-%d " \
"metricsmgr_port topname topid %s " \
"metrics_sinks_config_file" % (container_id, INTERNAL_CONF_PATH)
def get_expected_metricscachemgr_command():
return "heron_java_home/bin/java -Xmx1024M -XX:+PrintCommandLineFlags -verbosegc " \
"-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+PrintCommandLineFlags " \
"-Xloggc:log-files/gc.metricscache.log -Djava.net.preferIPv4Stack=true " \
"-cp metricscachemgr_classpath com.twitter.heron.metricscachemgr.MetricsCacheManager " \
"--metricscache_id metricscache-0 --master_port metricscachemgr_masterport " \
"--stats_port metricscachemgr_statsport --topology_name topname --topology_id topid " \
"--system_config_file %s --sink_config_file metrics_sinks_config_file " \
"--cluster cluster --role role --environment environ --verbose" % (INTERNAL_CONF_PATH)
def get_expected_instance_command(component_name, instance_id, container_id):
instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
return "heron_java_home/bin/java -Xmx320M -Xms320M -Xmn160M -XX:MaxMetaspaceSize=128M " \
"-XX:MetaspaceSize=128M -XX:ReservedCodeCacheSize=64M -XX:+CMSScavengeBeforeRemark " \
"-XX:TargetSurvivorRatio=90 -XX:+PrintCommandLineFlags -verbosegc -XX:+PrintGCDetails " \
"-XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 " \
"-Xloggc:log-files/gc.%s.log -XX:+HeapDumpOnOutOfMemoryError " \
"-Djava.net.preferIPv4Stack=true -cp instance_classpath:classpath " \
"com.twitter.heron.instance.HeronInstance topname topid %s %s %d 0 stmgr-%d " \
"master_port metricsmgr_port %s" \
% (instance_name, instance_name, component_name, instance_id,
container_id, INTERNAL_CONF_PATH)
MockPOpen.set_next_pid(37)
expected_processes_container_0 = [
ProcessInfo(MockPOpen(), 'heron-shell-0', shell_command_expected),
ProcessInfo(MockPOpen(), 'metricsmgr-0', get_expected_metricsmgr_command(0)),
ProcessInfo(MockPOpen(), 'heron-tmaster',
'tmaster_binary %s master_port '
'tmaster_controller_port tmaster_stats_port '
'topname topid zknode zkroot stmgr-1,stmgr-7 '
'%s metrics_sinks_config_file metricsmgr_port' % (HOSTNAME, INTERNAL_CONF_PATH )),
ProcessInfo(MockPOpen(), 'heron-metricscache', get_expected_metricscachemgr_command()),
]
MockPOpen.set_next_pid(37)
expected_processes_container_1 = [
ProcessInfo(MockPOpen(), 'stmgr-1',
'stmgr_binary topname topid topdefnfile zknode zkroot stmgr-1 '
'container_1_word_3,container_1_exclaim1_2,container_1_exclaim1_1 %s master_port '
'metricsmgr_port shell-port %s' % (HOSTNAME, INTERNAL_CONF_PATH)),
ProcessInfo(MockPOpen(), 'container_1_word_3', get_expected_instance_command('word', 3, 1)),
ProcessInfo(MockPOpen(), 'container_1_exclaim1_1',
get_expected_instance_command('exclaim1', 1, 1)),
ProcessInfo(MockPOpen(), 'container_1_exclaim1_2',
get_expected_instance_command('exclaim1', 2, 1)),
ProcessInfo(MockPOpen(), 'heron-shell-1', shell_command_expected),
ProcessInfo(MockPOpen(), 'metricsmgr-1', get_expected_metricsmgr_command(1)),
]
MockPOpen.set_next_pid(37)
expected_processes_container_7 = [
ProcessInfo(MockPOpen(), 'container_7_word_11', get_expected_instance_command('word', 11, 7)),
ProcessInfo(MockPOpen(), 'container_7_exclaim1_210',
get_expected_instance_command('exclaim1', 210, 7)),
ProcessInfo(MockPOpen(), 'stmgr-7',
'stmgr_binary topname topid topdefnfile zknode zkroot stmgr-7 '
'container_7_word_11,container_7_exclaim1_210 %s master_port '
'metricsmgr_port shell-port %s' % (HOSTNAME, INTERNAL_CONF_PATH)),
ProcessInfo(MockPOpen(), 'metricsmgr-7', get_expected_metricsmgr_command(7)),
ProcessInfo(MockPOpen(), 'heron-shell-7', shell_command_expected),
]
def setUp(self):
MockPOpen.set_next_pid(37)
self.maxDiff = None
self.executor_0 = MockExecutor(self.get_args(0))
self.executor_1 = MockExecutor(self.get_args(1))
self.executor_7 = MockExecutor(self.get_args(7))
self.packing_plan_expected = self.build_packing_plan({
1:[('word', '3', '0'), ('exclaim1', '2', '0'), ('exclaim1', '1', '0')],
7:[('word', '11', '0'), ('exclaim1', '210', '0')],
})
# ./heron-executor <shardid> <topname> <topid> <topdefnfile>
# <zknode> <zkroot> <tmaster_binary> <stmgr_binary>
# <metricsmgr_classpath> <instance_jvm_opts_in_base64> <classpath>
# <master_port> <tmaster_controller_port> <tmaster_stats_port> <heron_internals_config_file>
# <component_rammap> <component_jvm_opts_in_base64> <pkg_type> <topology_bin_file>
# <heron_java_home> <shell-port> <heron_shell_binary> <metricsmgr_port>
# <cluster> <role> <environ> <instance_classpath> <metrics_sinks_config_file>
# <scheduler_classpath> <scheduler_port> <python_instance_binary>
@staticmethod
def get_args(shard_id):
return ("""
./heron-executor %d topname topid topdefnfile
zknode zkroot tmaster_binary stmgr_binary
metricsmgr_classpath "LVhYOitIZWFwRHVtcE9uT3V0T2ZNZW1vcnlFcnJvcg&equals;&equals;" classpath
master_port tmaster_controller_port tmaster_stats_port
%s exclaim1:536870912,word:536870912 "" jar topology_bin_file
heron_java_home shell-port heron_shell_binary metricsmgr_port
cluster role environ instance_classpath metrics_sinks_config_file
scheduler_classpath scheduler_port python_instance_binary
metricscachemgr_classpath metricscachemgr_masterport metricscachemgr_statsport
""" % (shard_id, INTERNAL_CONF_PATH)).replace("\n", '').split()
def test_update_packing_plan(self):
self.executor_0.update_packing_plan(self.packing_plan_expected)
self.assertEquals(self.packing_plan_expected, self.executor_0.packing_plan)
self.assertEquals({1: "stmgr-1", 7: "stmgr-7"}, self.executor_0.stmgr_ids)
self.assertEquals(
{0: "metricsmgr-0", 1: "metricsmgr-1", 7: "metricsmgr-7"}, self.executor_0.metricsmgr_ids)
self.assertEquals(
{0: "heron-shell-0", 1: "heron-shell-1", 7: "heron-shell-7"}, self.executor_0.heron_shell_ids)
def test_launch_container_0(self):
self.do_test_launch(self.executor_0, self.expected_processes_container_0)
def test_launch_container_1(self):
self.do_test_launch(self.executor_1, self.expected_processes_container_1)
def test_launch_container_7(self):
self.do_test_launch(self.executor_7, self.expected_processes_container_7)
def do_test_launch(self, executor, expected_processes):
executor.update_packing_plan(self.packing_plan_expected)
executor.launch()
monitored_processes = executor.processes_to_monitor
# convert to (pid, name, command)
found_processes = map(lambda (process_info):
(process_info.pid, process_info.name, process_info.command_str),
executor.processes)
found_monitored = map(lambda (pid, process_info):
(pid, process_info.name, process_info.command_str),
monitored_processes.items())
found_processes.sort(key=lambda tuple: tuple[0])
found_monitored.sort(key=lambda tuple: tuple[0])
print "do_test_commands - found_processes: %s found_monitored: %s" \
% (found_processes, found_monitored)
self.assertEquals(found_processes, found_monitored)
print "do_test_commands - expected_processes: %s monitored_processes: %s" \
% (expected_processes, monitored_processes)
self.assert_processes(expected_processes, monitored_processes)
def test_change_instance_dist_container_1(self):
MockPOpen.set_next_pid(37)
self.executor_1.update_packing_plan(self.packing_plan_expected)
current_commands = self.executor_1.get_commands_to_run()
self.assertEquals(dict(
map((lambda (process_info): (process_info.name, process_info.command.split(' '))),
self.expected_processes_container_1)), current_commands)
# update instance distribution
new_packing_plan = self.build_packing_plan(
{1:[('word', '3', '0'), ('word', '2', '0'), ('exclaim1', '1', '0')]})
self.executor_1.update_packing_plan(new_packing_plan)
updated_commands = self.executor_1.get_commands_to_run()
# get the commands to kill, keep and start and verify
commands_to_kill, commands_to_keep, commands_to_start = \
self.executor_1.get_command_changes(current_commands, updated_commands)
self.assertEquals(['container_1_exclaim1_2', 'stmgr-1'], sorted(commands_to_kill.keys()))
self.assertEquals(
['container_1_exclaim1_1', 'container_1_word_3', 'heron-shell-1', 'metricsmgr-1'],
sorted(commands_to_keep.keys()))
self.assertEquals(['container_1_word_2', 'stmgr-1'], sorted(commands_to_start.keys()))
def assert_processes(self, expected_processes, found_processes):
self.assertEquals(len(expected_processes), len(found_processes))
for expected_process in expected_processes:
self.assert_process(expected_process, found_processes)
def assert_process(self, expected_process, found_processes):
pid = expected_process.pid
self.assertTrue(found_processes[pid])
self.assertEquals(expected_process.name, found_processes[pid].name)
self.assertEquals(expected_process.command, found_processes[pid].command_str)
self.assertEquals(1, found_processes[pid].attempts)