blob: 6881644734d6beb1d99e1c535879247615ff415f [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''integration test topology builder'''
import copy
import heronpy.api.api_constants as api_constants
from heronpy.api.stream import Stream, Grouping
from heronpy.api.topology import TopologyBuilder, Topology, TopologyType
from ..core import constants as integ_const
from .aggregator_bolt import AggregatorBolt
from .integration_test_spout import IntegrationTestSpout
from .integration_test_bolt import IntegrationTestBolt
class TestTopologyBuilder(TopologyBuilder):
"""Topology Builder for integration tests
Given spouts and bolts will be delegated by IntegrationTestSpout and IntegrationTestBolt
classes respectively.
"""
TERMINAL_BOLT_NAME = '__integration_test_aggregator_bolt'
TERMINAL_BOLT_CLASS = AggregatorBolt
DEFAULT_CONFIG = {api_constants.TOPOLOGY_DEBUG: True,
api_constants.TOPOLOGY_RELIABILITY_MODE:
api_constants.TopologyReliabilityMode.ATLEAST_ONCE,
api_constants.TOPOLOGY_PROJECT_NAME: "heron-integration-test"}
def __init__(self, name, http_server_url):
super(TestTopologyBuilder, self).__init__(name)
self.output_location = "%s/%s" % (http_server_url, self.topology_name)
self.set_config(self.DEFAULT_CONFIG)
# map <name -> spout's component spec>
self.spouts = {}
# map <name -> bolt's component spec>
self.bolts = {}
# map <name -> set of parents>
self.prev = {}
def add_spout(self, name, spout_cls, par, config=None,
optional_outputs=None, max_executions=None):
"""Add an integration_test spout"""
user_spec = spout_cls.spec(name)
spout_classpath = user_spec.python_class_path
if hasattr(spout_cls, 'outputs'):
user_outputs = spout_cls.outputs
else:
user_outputs = []
if optional_outputs is not None:
user_outputs.extend(optional_outputs)
if config is None:
_config = {}
else:
_config = copy.copy(config)
if max_executions is not None:
_config[integ_const.USER_MAX_EXECUTIONS] = max_executions
test_spec = IntegrationTestSpout.spec(name, par, _config,
user_spout_classpath=spout_classpath,
user_output_fields=user_outputs)
self.add_spec(test_spec)
self.spouts[name] = test_spec
return test_spec
def add_bolt(self, name, bolt_cls, par, inputs, config=None, optional_outputs=None):
"""Add an integration_test bolt
Only dict based inputs is supported
"""
assert isinstance(inputs, dict)
user_spec = bolt_cls.spec(name)
bolt_classpath = user_spec.python_class_path
if hasattr(bolt_cls, 'outputs'):
user_outputs = bolt_cls.outputs
else:
user_outputs = []
if optional_outputs is not None:
user_outputs.extend(optional_outputs)
if config is None:
_config = {}
else:
_config = config
test_spec = IntegrationTestBolt.spec(name, par, inputs, _config,
user_bolt_classpath=bolt_classpath,
user_output_fields=user_outputs)
self.add_spec(test_spec)
self.bolts[name] = test_spec
return test_spec
# pylint: disable=too-many-branches
def create_topology(self):
"""Creates an integration_test topology class"""
# first add the aggregation_bolt
# inputs will be updated later
aggregator_config = {integ_const.HTTP_POST_URL_KEY: self.output_location}
self.add_bolt(self.TERMINAL_BOLT_NAME, self.TERMINAL_BOLT_CLASS, 1,
inputs={}, config=aggregator_config)
# building a graph directed from children to parents, by looking only on bolts
# since spouts don't have parents
for name, bolt_spec in self.bolts.items():
if name == self.TERMINAL_BOLT_NAME:
continue
bolt_protobuf = bolt_spec.get_protobuf()
for istream in bolt_protobuf.inputs:
parent = istream.stream.component_name
if name in self.prev:
self.prev[name].add(parent)
else:
parents = set()
parents.add(parent)
self.prev[name] = parents
# Find the terminal bolts defined by users and link them with "AggregatorBolt".
# set of terminal component names
terminals = set()
# set of non-terminal component names
non_terminals = set()
# 1. terminal bolts need upstream components, because we don't want isolated bolts
# 2. terminal bolts should not exist in the prev.values(), meaning that no downstream
for parent_set in list(self.prev.values()):
non_terminals.update(parent_set)
for bolt_name in list(self.prev.keys()):
if bolt_name not in non_terminals:
terminals.add(bolt_name)
# will also consider the cases with spouts without children
for spout_name in list(self.spouts.keys()):
if spout_name not in non_terminals:
terminals.add(spout_name)
# add all grouping to components
for child in list(self.prev.keys()):
for parent in self.prev[child]:
self._add_all_grouping(child, parent, integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)
# then connect aggregator bolt with user's terminal components
# terminal_outputs are output fields for terminals, list of either str or Stream
for terminal in terminals:
if terminal in self.bolts:
terminal_outputs = self.bolts[terminal].outputs
else:
terminal_outputs = self.spouts[terminal].outputs
# now get a set of stream ids
stream_ids = ["default" if isinstance(out, str) else out.stream_id
for out in terminal_outputs]
for stream_id in set(stream_ids):
self._add_all_grouping(self.TERMINAL_BOLT_NAME, terminal, stream_id)
# create topology class
class_dict = self._construct_topo_class_dict()
return TopologyType(self.topology_name, (Topology,), class_dict)
def _add_all_grouping(self, child, parent, stream_id):
"""Adds all grouping between child component and parent component with a given stream id
:type child: str
:param child: child's component name
:type parent: str
:param parent: parent's component name
:type stream_id: str
:param stream_id: stream id
"""
# child has to be a bolt
child_component_spec = self.bolts[child]
# child_inputs is dict mapping from <HeronComponentSpec|GlobalStreamId -> grouping>
child_inputs = child_component_spec.inputs
if parent in self.bolts:
parent_component_spec = self.bolts[parent]
else:
parent_component_spec = self.spouts[parent]
if stream_id == Stream.DEFAULT_STREAM_ID:
child_inputs[parent_component_spec] = Grouping.ALL
else:
child_inputs[parent_component_spec[stream_id]] = Grouping.ALL