blob: d605a0c27046430b3d567d4de9d7484a4067919a [file] [log] [blame]
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''topology.py: module for defining python topologies'''
import os
import uuid
from heron.common.src.python.utils.misc import default_serializer
import heron.common.src.python.constants as constants
from heron.proto import topology_pb2
from .component import HeronComponentSpec
class TopologyType(type):
"""Metaclass to define a Heron topology in Python"""
DEFAULT_TOPOLOGY_CONFIG = {constants.TOPOLOGY_DEBUG: "false",
constants.TOPOLOGY_STMGRS: "1",
constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS: "30",
constants.TOPOLOGY_COMPONENT_PARALLELISM: "1",
constants.TOPOLOGY_MAX_SPOUT_PENDING: "100",
constants.TOPOLOGY_ENABLE_ACKING: "false",
constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: "true"}
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy HeronComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
for spec in specs.itervalues():
if spec.is_spout:
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
if classname != 'Topology' and not spout_specs:
raise ValueError("A Topology requires at least one Spout")
topology_config = TopologyType.class_dict_to_topo_config(class_dict)
if classname != 'Topology':
class_dict['_topo_config'] = topology_config
class_dict['_protobuf_bolts'] = bolt_specs
class_dict['_protobuf_spouts'] = spout_specs
class_dict['_heron_specs'] = list(specs.values())
# create topology protobuf here
TopologyType.init_topology(classname, class_dict)
return type.__new__(mcs, classname, bases, class_dict)
@classmethod
def class_dict_to_specs(mcs, class_dict):
"""Takes a class ``__dict__`` and returns the ``HeronComponentSpec`` entries"""
specs = {}
for name, spec in class_dict.iteritems():
if isinstance(spec, HeronComponentSpec):
# Use the variable name as the specification name.
if spec.name is None:
spec.name = name
if spec.name in specs:
raise ValueError("Duplicate component name: %s" % spec.name)
else:
specs[spec.name] = spec
return specs
@classmethod
def class_dict_to_topo_config(mcs, class_dict):
"""Takes a class ``__dict__`` and returns a map containing topology-wide configuration
Returned dictionary is a sanitized dict <str -> (str|object)>
This classmethod firsts insert default topology configuration, and then override them
with a given topology-wide configuration.
Note that this configuration will be overriden by a component-specific configuration at
runtime.
"""
topo_config = {}
# add defaults
topo_config.update(mcs.DEFAULT_TOPOLOGY_CONFIG)
for name, custom_config in class_dict.iteritems():
if name == 'config' and isinstance(custom_config, dict):
sanitized_dict = mcs._sanitize_config(custom_config)
topo_config.update(sanitized_dict)
return topo_config
@classmethod
def add_spout_specs(mcs, spec, spout_specs):
if not spec.outputs:
raise ValueError("%s: %s requires at least one output, because it is a spout"
% (spec.python_class_path, spec.name))
spout_specs[spec.name] = spec.get_protobuf()
@classmethod
def add_bolt_specs(mcs, spec, bolt_specs):
if not spec.inputs:
raise ValueError("%s: %s requires at least one input, because it is a bolt"
% (spec.python_class_path, spec.name))
bolt_specs[spec.name] = spec.get_protobuf()
@classmethod
def get_topology_config_protobuf(mcs, class_dict):
config = topology_pb2.Config()
conf_dict = class_dict['_topo_config']
for key, value in conf_dict.iteritems():
if isinstance(value, str):
kvs = config.kvs.add()
kvs.key = key
kvs.value = value
kvs.type = topology_pb2.ConfigValueType.Value("STRING_VALUE")
else:
# need to serialize
kvs = config.kvs.add()
kvs.key = key
kvs.serialized_value = default_serializer.serialize(value)
kvs.type = topology_pb2.ConfigValueType.Value("PYTHON_SERIALIZED_VALUE")
return config
@classmethod
def init_topology(mcs, classname, class_dict):
"""Initializes a topology protobuf"""
if classname == 'Topology':
# Base class can't initialize protobuf
return
heron_options = TopologyType.get_heron_options_from_env()
initial_state = heron_options.get("cmdline.topology.initial.state", "RUNNING")
tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory", None)
if tmp_directory is None:
raise RuntimeError("Topology definition temp directory not specified")
topology_name = heron_options.get("cmdline.topology.name", classname)
topology_id = topology_name + str(uuid.uuid4())
# create protobuf
topology = topology_pb2.Topology()
topology.id = topology_id
topology.name = topology_name
topology.state = topology_pb2.TopologyState.Value(initial_state)
topology.topology_config.CopyFrom(TopologyType.get_topology_config_protobuf(class_dict))
TopologyType.add_bolts_and_spouts(topology, class_dict)
class_dict['topology_name'] = topology_name
class_dict['topology_id'] = topology_id
class_dict['protobuf_topology'] = topology
class_dict['topologydefn_tmpdir'] = tmp_directory
class_dict['heron_runtime_options'] = heron_options
@staticmethod
def get_heron_options_from_env():
"""Retrieves heron options from environment variable HERON_OPTIONS
It has the following format:
cmdline.topologydefn.tmpdirectory=/var/folders/tmpdir,cmdline.topology.initial.state=PAUSED
In this case, the returned map will contain:
{"cmdline.topologydefn.tmpdirectory": "/var/folders/tmpdir",
"cmdline.topology.initial.state": "PAUSED"}
Currently supporting the following options natively:
- cmdline.topologydefn.tmpdirectory: directory to which this topology's defn file is written
- cmdline.topology.initial.state: initial state of the topology
- cmdline.topology.name: topology name on deployment
:return: map mapping from key to value
"""
heron_options_raw = os.environ.get("HERON_OPTIONS", None)
if heron_options_raw is None:
raise RuntimeError("HERON_OPTIONS environment variable not found")
ret = {}
heron_opt_list = heron_options_raw.replace("%%%%", " ").split(',')
for opt_raw in heron_opt_list:
opt = opt_raw.split("=")
if len(opt) == 2:
ret[opt[0]] = opt[1]
return ret
@classmethod
def add_bolts_and_spouts(mcs, topology, class_dict):
spouts = list(class_dict["_protobuf_spouts"].values())
bolts = list(class_dict["_protobuf_bolts"].values())
for spout in spouts:
added = topology.spouts.add()
added.CopyFrom(spout)
for bolt in bolts:
added = topology.bolts.add()
added.CopyFrom(bolt)
@staticmethod
def _sanitize_config(custom_config):
"""Checks whether a given custom_config and returns a sanitized dict <str -> (str|object)>
It checks if keys are all strings and sanitizes values of a given dictionary as follows:
- If string, number or boolean is given as a value, it is converted to string.
For string and number (int, float), it is converted to string by a built-in ``str()`` method.
For a boolean value, ``True`` is converted to "true" instead of "True", and ``False`` is
converted to "false" instead of "False", in order to keep the consistency with
Java configuration.
- If neither of the above is given as a value, it is inserted into the sanitized dict as it is.
These values will need to be serialized before adding to a protobuf message.
"""
sanitized = {}
for key, value in custom_config.iteritems():
if not isinstance(key, str):
raise TypeError("Key for topology-wide configuration must be string, given: %s: %s"
% (str(type(key)), str(key)))
if isinstance(value, bool):
sanitized[key] = "true" if value else "false"
elif isinstance(value, (str, int, float)):
sanitized[key] = str(value)
else:
sanitized[key] = value
return sanitized
class Topology(object):
"""Topology is an abstract class for defining a topology
Topology writers can define their custom topology by inheriting this class.
The usage of this class is compatible with StreamParse API.
Defining a topology is simple. Topology writers need to create a subclass, in which information
about the components in their topology and how they connect to each other are specified
by placing ``HeronComponentSpec`` as class instances.
For more information, refer to ``spec()`` method of both ``Bolt`` and ``Spout`` class.
In addition, you can also set a topology-wide configuration, by adding a ``config`` class
attribute to your topology class, that is dict mapping from option names to their values.
Note that topology-wide configurations are overridden by component-specific configurations
that might be specified from ``spec()`` method of ``Bolt`` or ``Spout`` class.
:Example: A sample WordCountTopology can be defined as follows:
::
from pyheron import Topology
from heron.examples.src.python import WordSpout, CountBolt
class WordCount(Topology):
config = {"topology.wide.config": "some value"}
word_spout = WordSpout.spec(par=1)
count_bolt = CountBolt.spec(par=1,
inputs={word_spout: Grouping.fields('word')},
config={"count_bolt.specific.config": "another value"})
::
"""
__metaclass__ = TopologyType
# pylint: disable=no-member
@classmethod
def write(cls):
"""Writes the Topology .defn file to ``dest``
This classmethod is meant be used by heron-cli when submitting a topology.
"""
if cls.__name__ == 'Topology':
raise ValueError("The base Topology class cannot be writable")
filename = "%s.defn" % cls.topology_name
path = os.path.join(cls.topologydefn_tmpdir, filename)
with open(path, 'wb') as f:
f.write(cls.protobuf_topology.SerializeToString())
class TopologyBuilder(object):
"""Builder for pyheron topology
This class dynamically creates a subclass of Topology with given spouts and bolts and
writes its definition files when ``build_and_submit()`` is called.
:Example: A sample WordCountTopology can be defined as follows:
::
import sys
from pyheron import TopologyBuilder
from heron.examples.spout import WordSpout
from heron.examples.bolt import CountBolt
if __name__ == '__main__':
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word-spout", WordSpout, 2)
builder.add_bolt("count-bolt", CountBolt, 2,
inputs={word_spout: Grouping.fields('word')},
config={"count_bolt.specific.config": "some value"})
builder.build_and_submit()
::
"""
def __init__(self, name):
"""Initialize this TopologyBuilder
:type name: str
:param name: topology name
"""
assert name is not None and isinstance(name, str) and name != "Topology"
self.topology_name = name
self._specs = []
self._topology_config = {}
def add_spec(self, *specs):
"""Add specs to the topology
:type specs: HeronComponentSpec
:param specs: specs to add to the topology
"""
for spec in specs:
if not isinstance(spec, HeronComponentSpec):
raise TypeError("Argument to add_spec needs to be HeronComponentSpec, given: %s"
% str(spec))
if spec.name is None:
raise ValueError("TopologyBuilder cannot take a spec without name")
self._specs.append(spec)
def add_spout(self, name, spout_cls, par, config=None, optional_outputs=None):
"""Add a spout to the topology"""
spout_spec = spout_cls.spec(name=name, par=par, config=config,
optional_outputs=optional_outputs)
self.add_spec(spout_spec)
return spout_spec
def add_bolt(self, name, bolt_cls, par, inputs, config=None, optional_outputs=None):
"""Add a bolt to the topology"""
bolt_spec = bolt_cls.spec(name=name, par=par, inputs=inputs, config=config,
optional_outputs=optional_outputs)
self.add_spec(bolt_spec)
return bolt_spec
def set_config(self, config):
"""Set topology-wide configuration to the topology
:type config: dict
:param config: topology-wide config
"""
if not isinstance(config, dict):
raise TypeError("Argument to set_config needs to be dict, given: %s" % str(config))
self._topology_config = config
def _construct_topo_class_dict(self):
class_dict = {}
# specs
for spec in self._specs:
name = spec.name
if name in class_dict:
raise ValueError("Duplicate spec names: %s" % name)
class_dict[name] = spec
# config
class_dict["config"] = self._topology_config
return class_dict
def build_and_submit(self):
"""Builds the topology and submits to the destination"""
class_dict = self._construct_topo_class_dict()
topo_cls = TopologyType(self.topology_name, (Topology,), class_dict)
topo_cls.write()