blob: 3f6202fd5f5df94da3a5bc911642c5da0c5449b9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' topology_unittest.py '''
# pylint: disable=missing-docstring
import unittest2 as unittest
from heron.tools.tracker.src.python.topology import Topology
from mock_proto import MockProto
class TopologyTest(unittest.TestCase):
def setUp(self):
self.state_manager_name = "test_state_manager_name"
self.topology = Topology(MockProto.topology_name,
self.state_manager_name)
def test_set_physical_plan(self):
# Set it to None
self.topology.set_physical_plan(None)
self.assertIsNone(self.topology.id)
self.assertIsNone(self.topology.physical_plan)
physical_plan = MockProto().create_mock_simple_physical_plan()
self.topology.set_physical_plan(physical_plan)
self.assertEqual(MockProto.topology_id, self.topology.id)
self.assertEqual(physical_plan, self.topology.physical_plan)
def test_set_packing_plan(self):
# Set it to None
self.topology.set_packing_plan(None)
self.assertIsNone(self.topology.id)
self.assertIsNone(self.topology.packing_plan)
packing_plan = MockProto().create_mock_simple_packing_plan()
self.topology.set_packing_plan(packing_plan)
self.assertEqual(packing_plan, self.topology.packing_plan)
# testing with a packing plan with scheduled resources
self.topology.set_packing_plan(None)
self.assertIsNone(self.topology.id)
self.assertIsNone(self.topology.packing_plan)
packing_plan = MockProto().create_mock_simple_packing_plan2()
self.topology.set_packing_plan(packing_plan)
self.assertEqual(packing_plan, self.topology.packing_plan)
def test_set_execution_state(self):
# Set it to None
self.topology.set_execution_state(None)
self.assertIsNone(self.topology.execution_state)
self.assertIsNone(self.topology.cluster)
self.assertIsNone(self.topology.environ)
estate = MockProto().create_mock_execution_state()
self.topology.set_execution_state(estate)
self.assertEqual(estate, self.topology.execution_state)
self.assertEqual(MockProto.cluster, self.topology.cluster)
self.assertEqual(MockProto.environ, self.topology.environ)
def test_set_tmaster(self):
# Set it to None
self.topology.set_tmaster(None)
self.assertIsNone(self.topology.tmaster)
tmaster = MockProto().create_mock_tmaster()
self.topology.set_tmaster(tmaster)
self.assertEqual(tmaster, self.topology.tmaster)
def test_spouts(self):
# When pplan is not set
self.assertEqual(0, len(self.topology.spouts()))
# Set pplan now
pplan = MockProto().create_mock_simple_physical_plan()
self.topology.set_physical_plan(pplan)
spouts = self.topology.spouts()
self.assertEqual(1, len(spouts))
self.assertEqual("mock_spout", spouts[0].comp.name)
self.assertEqual(["mock_spout"], self.topology.spout_names())
def test_bolts(self):
# When pplan is not set
self.assertEqual(0, len(self.topology.bolts()))
# Set pplan
pplan = MockProto().create_mock_medium_physical_plan()
self.topology.set_physical_plan(pplan)
bolts = self.topology.bolts()
self.assertEqual(3, len(bolts))
self.assertEqual(["mock_bolt1", "mock_bolt2", "mock_bolt3"],
self.topology.bolt_names())
def test_num_instances(self):
# When pplan is not set
self.assertEqual(0, self.topology.num_instances())
pplan = MockProto().create_mock_medium_physical_plan(1, 2, 3, 4)
self.topology.set_physical_plan(pplan)
self.assertEqual(10, self.topology.num_instances())
def test_trigger_watches(self):
# Workaround
scope = {
"is_called": False
}
# pylint: disable=unused-argument, unused-variable
def callback(something):
scope["is_called"] = True
uid = self.topology.register_watch(callback)
self.assertTrue(scope["is_called"])
scope["is_called"] = False
self.assertFalse(scope["is_called"])
print(scope)
self.topology.set_physical_plan(None)
print(scope)
self.assertTrue(scope["is_called"])
print(scope)
scope["is_called"] = False
self.assertFalse(scope["is_called"])
self.topology.set_execution_state(None)
self.assertTrue(scope["is_called"])
scope["is_called"] = False
self.assertFalse(scope["is_called"])
self.topology.set_tmaster(None)
self.assertTrue(scope["is_called"])
def test_unregister_watch(self):
# Workaround
scope = {
"is_called": False
}
# pylint: disable=unused-argument
def callback(something):
scope["is_called"] = True
uid = self.topology.register_watch(callback)
scope["is_called"] = False
self.assertFalse(scope["is_called"])
self.topology.set_physical_plan(None)
self.assertTrue(scope["is_called"])
self.topology.unregister_watch(uid)
scope["is_called"] = False
self.assertFalse(scope["is_called"])
self.topology.set_physical_plan(None)
self.assertFalse(scope["is_called"])
def test_bad_watch(self):
# Workaround
scope = {
"is_called": False
}
# pylint: disable=unused-argument, unused-variable
def callback(something):
scope["is_called"] = True
raise Exception("Test Bad Trigger Exception")
uid = self.topology.register_watch(callback)
# is called the first time because of registeration
self.assertTrue(scope["is_called"])
# But should no longer be called
scope["is_called"] = False
self.assertFalse(scope["is_called"])
self.topology.set_physical_plan(None)
self.assertFalse(scope["is_called"])