| # Copyright 2016 Twitter. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| '''topology.py: module for defining python topologies''' |
| import os |
| import uuid |
| |
| from heron.common.src.python.utils.misc import default_serializer |
| import heron.common.src.python.constants as constants |
| from heron.proto import topology_pb2 |
| |
| from .component import HeronComponentSpec |
| |
| class TopologyType(type): |
| """Metaclass to define a Heron topology in Python""" |
| DEFAULT_TOPOLOGY_CONFIG = {constants.TOPOLOGY_DEBUG: "false", |
| constants.TOPOLOGY_STMGRS: "1", |
| constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS: "30", |
| constants.TOPOLOGY_COMPONENT_PARALLELISM: "1", |
| constants.TOPOLOGY_MAX_SPOUT_PENDING: "100", |
| constants.TOPOLOGY_ENABLE_ACKING: "false", |
| constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: "true"} |
| def __new__(mcs, classname, bases, class_dict): |
| bolt_specs = {} |
| spout_specs = {} |
| # Copy HeronComponentSpec items out of class_dict |
| specs = TopologyType.class_dict_to_specs(class_dict) |
| for spec in specs.itervalues(): |
| if spec.is_spout: |
| TopologyType.add_spout_specs(spec, spout_specs) |
| else: |
| TopologyType.add_bolt_specs(spec, bolt_specs) |
| if classname != 'Topology' and not spout_specs: |
| raise ValueError("A Topology requires at least one Spout") |
| |
| topology_config = TopologyType.class_dict_to_topo_config(class_dict) |
| |
| if classname != 'Topology': |
| class_dict['_topo_config'] = topology_config |
| class_dict['_protobuf_bolts'] = bolt_specs |
| class_dict['_protobuf_spouts'] = spout_specs |
| class_dict['_heron_specs'] = list(specs.values()) |
| |
| # create topology protobuf here |
| TopologyType.init_topology(classname, class_dict) |
| |
| return type.__new__(mcs, classname, bases, class_dict) |
| |
| @classmethod |
| def class_dict_to_specs(mcs, class_dict): |
| """Takes a class ``__dict__`` and returns the ``HeronComponentSpec`` entries""" |
| specs = {} |
| |
| for name, spec in class_dict.iteritems(): |
| if isinstance(spec, HeronComponentSpec): |
| # Use the variable name as the specification name. |
| if spec.name is None: |
| spec.name = name |
| if spec.name in specs: |
| raise ValueError("Duplicate component name: %s" % spec.name) |
| else: |
| specs[spec.name] = spec |
| return specs |
| |
| @classmethod |
| def class_dict_to_topo_config(mcs, class_dict): |
| """Takes a class ``__dict__`` and returns a map containing topology-wide configuration |
| |
| Returned dictionary is a sanitized dict <str -> (str|object)> |
| |
| This classmethod firsts insert default topology configuration, and then override them |
| with a given topology-wide configuration. |
| Note that this configuration will be overriden by a component-specific configuration at |
| runtime. |
| """ |
| topo_config = {} |
| |
| # add defaults |
| topo_config.update(mcs.DEFAULT_TOPOLOGY_CONFIG) |
| |
| for name, custom_config in class_dict.iteritems(): |
| if name == 'config' and isinstance(custom_config, dict): |
| sanitized_dict = mcs._sanitize_config(custom_config) |
| topo_config.update(sanitized_dict) |
| |
| return topo_config |
| |
| @classmethod |
| def add_spout_specs(mcs, spec, spout_specs): |
| if not spec.outputs: |
| raise ValueError("%s: %s requires at least one output, because it is a spout" |
| % (spec.python_class_path, spec.name)) |
| spout_specs[spec.name] = spec.get_protobuf() |
| |
| @classmethod |
| def add_bolt_specs(mcs, spec, bolt_specs): |
| if not spec.inputs: |
| raise ValueError("%s: %s requires at least one input, because it is a bolt" |
| % (spec.python_class_path, spec.name)) |
| bolt_specs[spec.name] = spec.get_protobuf() |
| |
| @classmethod |
| def get_topology_config_protobuf(mcs, class_dict): |
| config = topology_pb2.Config() |
| conf_dict = class_dict['_topo_config'] |
| |
| for key, value in conf_dict.iteritems(): |
| if isinstance(value, str): |
| kvs = config.kvs.add() |
| kvs.key = key |
| kvs.value = value |
| kvs.type = topology_pb2.ConfigValueType.Value("STRING_VALUE") |
| else: |
| # need to serialize |
| kvs = config.kvs.add() |
| kvs.key = key |
| kvs.serialized_value = default_serializer.serialize(value) |
| kvs.type = topology_pb2.ConfigValueType.Value("PYTHON_SERIALIZED_VALUE") |
| |
| return config |
| |
| @classmethod |
| def init_topology(mcs, classname, class_dict): |
| """Initializes a topology protobuf""" |
| if classname == 'Topology': |
| # Base class can't initialize protobuf |
| return |
| heron_options = TopologyType.get_heron_options_from_env() |
| initial_state = heron_options.get("cmdline.topology.initial.state", "RUNNING") |
| tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory", None) |
| if tmp_directory is None: |
| raise RuntimeError("Topology definition temp directory not specified") |
| |
| topology_name = heron_options.get("cmdline.topology.name", classname) |
| topology_id = topology_name + str(uuid.uuid4()) |
| |
| # create protobuf |
| topology = topology_pb2.Topology() |
| topology.id = topology_id |
| topology.name = topology_name |
| topology.state = topology_pb2.TopologyState.Value(initial_state) |
| topology.topology_config.CopyFrom(TopologyType.get_topology_config_protobuf(class_dict)) |
| |
| TopologyType.add_bolts_and_spouts(topology, class_dict) |
| |
| class_dict['topology_name'] = topology_name |
| class_dict['topology_id'] = topology_id |
| class_dict['protobuf_topology'] = topology |
| class_dict['topologydefn_tmpdir'] = tmp_directory |
| class_dict['heron_runtime_options'] = heron_options |
| |
| @staticmethod |
| def get_heron_options_from_env(): |
| """Retrieves heron options from environment variable HERON_OPTIONS |
| It has the following format: |
| cmdline.topologydefn.tmpdirectory=/var/folders/tmpdir,cmdline.topology.initial.state=PAUSED |
| |
| In this case, the returned map will contain: |
| {"cmdline.topologydefn.tmpdirectory": "/var/folders/tmpdir", |
| "cmdline.topology.initial.state": "PAUSED"} |
| |
| Currently supporting the following options natively: |
| |
| - cmdline.topologydefn.tmpdirectory: directory to which this topology's defn file is written |
| - cmdline.topology.initial.state: initial state of the topology |
| - cmdline.topology.name: topology name on deployment |
| |
| :return: map mapping from key to value |
| """ |
| heron_options_raw = os.environ.get("HERON_OPTIONS", None) |
| if heron_options_raw is None: |
| raise RuntimeError("HERON_OPTIONS environment variable not found") |
| |
| ret = {} |
| heron_opt_list = heron_options_raw.replace("%%%%", " ").split(',') |
| for opt_raw in heron_opt_list: |
| opt = opt_raw.split("=") |
| if len(opt) == 2: |
| ret[opt[0]] = opt[1] |
| return ret |
| |
| @classmethod |
| def add_bolts_and_spouts(mcs, topology, class_dict): |
| spouts = list(class_dict["_protobuf_spouts"].values()) |
| bolts = list(class_dict["_protobuf_bolts"].values()) |
| |
| for spout in spouts: |
| added = topology.spouts.add() |
| added.CopyFrom(spout) |
| for bolt in bolts: |
| added = topology.bolts.add() |
| added.CopyFrom(bolt) |
| |
| @staticmethod |
| def _sanitize_config(custom_config): |
| """Checks whether a given custom_config and returns a sanitized dict <str -> (str|object)> |
| |
| It checks if keys are all strings and sanitizes values of a given dictionary as follows: |
| |
| - If string, number or boolean is given as a value, it is converted to string. |
| For string and number (int, float), it is converted to string by a built-in ``str()`` method. |
| For a boolean value, ``True`` is converted to "true" instead of "True", and ``False`` is |
| converted to "false" instead of "False", in order to keep the consistency with |
| Java configuration. |
| |
| - If neither of the above is given as a value, it is inserted into the sanitized dict as it is. |
| These values will need to be serialized before adding to a protobuf message. |
| """ |
| sanitized = {} |
| for key, value in custom_config.iteritems(): |
| if not isinstance(key, str): |
| raise TypeError("Key for topology-wide configuration must be string, given: %s: %s" |
| % (str(type(key)), str(key))) |
| |
| if isinstance(value, bool): |
| sanitized[key] = "true" if value else "false" |
| elif isinstance(value, (str, int, float)): |
| sanitized[key] = str(value) |
| else: |
| sanitized[key] = value |
| |
| return sanitized |
| |
| class Topology(object): |
| """Topology is an abstract class for defining a topology |
| |
| Topology writers can define their custom topology by inheriting this class. |
| The usage of this class is compatible with StreamParse API. |
| |
| Defining a topology is simple. Topology writers need to create a subclass, in which information |
| about the components in their topology and how they connect to each other are specified |
| by placing ``HeronComponentSpec`` as class instances. |
| For more information, refer to ``spec()`` method of both ``Bolt`` and ``Spout`` class. |
| |
| In addition, you can also set a topology-wide configuration, by adding a ``config`` class |
| attribute to your topology class, that is dict mapping from option names to their values. |
| Note that topology-wide configurations are overridden by component-specific configurations |
| that might be specified from ``spec()`` method of ``Bolt`` or ``Spout`` class. |
| |
| :Example: A sample WordCountTopology can be defined as follows: |
| :: |
| |
| from pyheron import Topology |
| from heron.examples.src.python import WordSpout, CountBolt |
| |
| class WordCount(Topology): |
| config = {"topology.wide.config": "some value"} |
| |
| word_spout = WordSpout.spec(par=1) |
| count_bolt = CountBolt.spec(par=1, |
| inputs={word_spout: Grouping.fields('word')}, |
| config={"count_bolt.specific.config": "another value"}) |
| :: |
| """ |
| __metaclass__ = TopologyType |
| |
| # pylint: disable=no-member |
| @classmethod |
| def write(cls): |
| """Writes the Topology .defn file to ``dest`` |
| |
| This classmethod is meant be used by heron-cli when submitting a topology. |
| """ |
| if cls.__name__ == 'Topology': |
| raise ValueError("The base Topology class cannot be writable") |
| filename = "%s.defn" % cls.topology_name |
| path = os.path.join(cls.topologydefn_tmpdir, filename) |
| |
| with open(path, 'wb') as f: |
| f.write(cls.protobuf_topology.SerializeToString()) |
| |
| class TopologyBuilder(object): |
| """Builder for pyheron topology |
| |
| This class dynamically creates a subclass of Topology with given spouts and bolts and |
| writes its definition files when ``build_and_submit()`` is called. |
| |
| :Example: A sample WordCountTopology can be defined as follows: |
| :: |
| import sys |
| from pyheron import TopologyBuilder |
| from heron.examples.spout import WordSpout |
| from heron.examples.bolt import CountBolt |
| |
| if __name__ == '__main__': |
| builder = TopologyBuilder(name=sys.argv[1]) |
| word_spout = builder.add_spout("word-spout", WordSpout, 2) |
| |
| builder.add_bolt("count-bolt", CountBolt, 2, |
| inputs={word_spout: Grouping.fields('word')}, |
| config={"count_bolt.specific.config": "some value"}) |
| |
| builder.build_and_submit() |
| :: |
| """ |
| def __init__(self, name): |
| """Initialize this TopologyBuilder |
| |
| :type name: str |
| :param name: topology name |
| """ |
| assert name is not None and isinstance(name, str) and name != "Topology" |
| |
| self.topology_name = name |
| |
| self._specs = [] |
| self._topology_config = {} |
| |
| def add_spec(self, *specs): |
| """Add specs to the topology |
| |
| :type specs: HeronComponentSpec |
| :param specs: specs to add to the topology |
| """ |
| for spec in specs: |
| if not isinstance(spec, HeronComponentSpec): |
| raise TypeError("Argument to add_spec needs to be HeronComponentSpec, given: %s" |
| % str(spec)) |
| if spec.name is None: |
| raise ValueError("TopologyBuilder cannot take a spec without name") |
| self._specs.append(spec) |
| |
| def add_spout(self, name, spout_cls, par, config=None, optional_outputs=None): |
| """Add a spout to the topology""" |
| spout_spec = spout_cls.spec(name=name, par=par, config=config, |
| optional_outputs=optional_outputs) |
| self.add_spec(spout_spec) |
| return spout_spec |
| |
| def add_bolt(self, name, bolt_cls, par, inputs, config=None, optional_outputs=None): |
| """Add a bolt to the topology""" |
| bolt_spec = bolt_cls.spec(name=name, par=par, inputs=inputs, config=config, |
| optional_outputs=optional_outputs) |
| self.add_spec(bolt_spec) |
| return bolt_spec |
| |
| def set_config(self, config): |
| """Set topology-wide configuration to the topology |
| |
| :type config: dict |
| :param config: topology-wide config |
| """ |
| if not isinstance(config, dict): |
| raise TypeError("Argument to set_config needs to be dict, given: %s" % str(config)) |
| self._topology_config = config |
| |
| def _construct_topo_class_dict(self): |
| class_dict = {} |
| |
| # specs |
| for spec in self._specs: |
| name = spec.name |
| if name in class_dict: |
| raise ValueError("Duplicate spec names: %s" % name) |
| class_dict[name] = spec |
| |
| # config |
| class_dict["config"] = self._topology_config |
| return class_dict |
| |
| def build_and_submit(self): |
| """Builds the topology and submits to the destination""" |
| class_dict = self._construct_topo_class_dict() |
| topo_cls = TopologyType(self.topology_name, (Topology,), class_dict) |
| topo_cls.write() |