id: version-0.20.0-topology-development-topology-api-python title: The Heron Topology API for Python sidebar_label: The Heron Topology API for Python original_id: topology-development-topology-api-python

The current version of heronpy is [{{% heronpyVersion %}}](https://pypi.python.org/pypi/heronpy/{{% heronpyVersion %}}).

Support for developing Heron topologies in Python is provided by a Python library called heronpy.

Python API docs

You can find API docs for the heronpy library here.

Setup

First, you need to install the heronpy library using pip, EasyInstall, or an analogous tool:

$ pip install heronpy
$ easy_install heronpy

Then you can include heronpy in your project files. Here's an example:

from heronpy.api.bolt.bolt import Bolt
from heronpy.api.spout.spout import Spout
from heronpy.api.topology import Topology

Writing topologies in Python

Heron topologies are networks of spouts that pull data into a topology and bolts that process that ingested data.

You can see how to create Python spouts in the Implementing Python Spouts guide and how to create Python bolts in the Implementing Python Bolts guide.

Once you've defined spouts and bolts for a topology, you can then compose the topology in one of two ways:

  • You can use the TopologyBuilder class inside of a main function.

    Here's an example:

    #!/usr/bin/env python
    from heronpy.api.topology import TopologyBuilder
    
    
    if __name__ == "__main__":
        builder = TopologyBuilder("MyTopology")
        # Add spouts and bolts
        builder.build_and_submit()
    
  • You can subclass the Topology class.

    Here's an example:

    from heronpy.api.stream import Grouping
    from heronpy.api.topology import Topology
    
    
    class MyTopology(Topology):
        my_spout = WordSpout.spec(par=2)
        my_bolt = CountBolt.spec(par=3, inputs={spout: Grouping.fields("word")})
    

Defining topologies using the TopologyBuilder class

If you create a Python topology using a TopologyBuilder, you need to instantiate a TopologyBuilder inside of a standard Python main function, like this:

from heronpy.api.topology import TopologyBuilder


if __name__ == "__main__":
    builder = TopologyBuilder("MyTopology")

Once you‘ve created a TopologyBuilder object, you can add bolts using the add_bolt method and spouts using the add_spout method. Here’s an example:

builder = TopologyBuilder("MyTopology")
builder.add_bolt("my_bolt", CountBolt, par=3)
builder.add_spout("my_spout", WordSpout, par=2)

Both the add_bolt and add_spout methods return the corresponding HeronComponentSpec object.

The add_bolt method takes four arguments and an optional config parameter:

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this bolt
bolt_clsclassThe subclass of Bolt that defines this bolt
parintThe number of instances of this bolt in the topology
configdictSpecifies the configuration for this spoutNone

The add_spout method takes three arguments and an optional config parameter:

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this spout
spout_clsclassThe subclass of Spout that defines this spout
parintThe number of instances of this spout in the topology
inputsdict or listEither a dict mapping from HeronComponentSpec to Grouping or a list of HeronComponentSpecs, in which case the shuffle grouping is used
configdictSpecifies the configuration for this spoutNone

Example

The following is an example implementation of a word count topology in Python that subclasses TopologyBuilder.

from your_spout import WordSpout
from your_bolt import CountBolt

from heronpy.api.stream import Grouping
from heronpy.api.topology import TopologyBuilder


if __name__ == "__main__":
    builder = TopologyBuilder("WordCountTopology")
    # piece together the topology
    word_spout = builder.add_spout("word_spout", WordSpout, par=2)
    count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2, inputs={word_spout: Grouping.fields("word")})
    # submit the toplogy
    builder.build_and_submit()

Note that arguments to the main method can be passed by providing them in the heron submit command.

Topology-wide configuration

If you‘re building a Python topology using a TopologyBuilder, you can specify configuration for the topology using the set_config method. A topology’s config is a dict in which the keys are a series constants from the api_constants module and values are configuration values for those parameters.

Here's an example:

from heronpy.api import api_constants
from heronpy.api.topology import TopologyBuilder


if __name__ == "__main__":
    topology_config = {
        api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
    }
    builder = TopologyBuilder("MyTopology")
    builder.set_config(topology_config)
    # Add bolts and spouts, etc.

Launching the topology

If you want to submit Python topologies to a Heron cluster, they need to be packaged as a PEX file. In order to produce PEX files, we recommend using a build tool like Pants or Bazel.

If you defined your topology by subclassing the TopologyBuilder class and built a word_count.pex file for that topology in the ~/topology folder. You can submit the topology to a cluster called local like this:

$ heron submit local \
  ~/topology/word_count.pex \
  - # No class specified

Note the - in this submission command. If you define a topology by subclassing TopologyBuilder you do not need to instruct Heron where your main method is located.

Example topologies buildable as PEXs

  • See this repo for an example of a Heron topology written in Python and deployable as a Pants-packaged PEX.
  • See this repo for an example of a Heron topology written in Python and deployable as a Bazel-packaged PEX.

Defining a topology by subclassing the Topology class

If you create a Python topology by subclassing the Topology class, you need to create a new topology class, like this:

from my_spout import WordSpout
from my_bolt import CountBolt

from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology


class MyTopology(Topology):
    my_spout = WordSpout.spec(par=2)
    my_bolt_inputs = {my_spout: Grouping.fields("word")}
    my_bolt = CountBolt.spec(par=3, inputs=my_bolt_inputs)

All you need to do is place HeronComponentSpecs as the class attributes of your topology class, which are returned by the spec() method of your spout or bolt class. You do not need to run a build method or anything like that; the Topology class will automatically detect which spouts and bolts are included in the topology.

If you use this method to define a new Python topology, you do not need to have a main function.

For bolts, the spec method for spouts takes three optional arguments::

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this bolt or None if you want to use the variable name of the return HeronComponentSpec as the unique identifier for this bolt
parintThe number of instances of this bolt in the topology
configdictSpecifies the configuration for this boltNone

For spouts, the spec method takes four optional arguments:

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this spout or None if you want to use the variable name of the return HeronComponentSpec as the unique identifier for this spoutNone
inputsdict or listEither a dict mapping from HeronComponentSpec to Grouping or a list of HeronComponentSpecs, in which case the shuffle grouping is used
parintThe number of instances of this spout in the topology1
configdictSpecifies the configuration for this spoutNone

Example

Here's an example topology definition with one spout and one bolt:

from my_spout import WordSpout
from my_bolt import CountBolt

from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology


class WordCount(Topology):
    word_spout = WordSpout.spec(par=2)
    count_bolt = CountBolt.spec(par=2, inputs={word_spout: Grouping.fields("word")})

Launching

If you defined your topology by subclassing the Topology class, your main Python file should not contain a main method. You will, however, need to instruct Heron which class contains your topology definition.

Let‘s say that you’ve defined a topology by subclassing Topology and built a PEX stored in ~/topology/dist/word_count.pex. The class containing your topology definition is topology.word_count.WordCount. You can submit the topology to a cluster called local like this:

$ heron submit local \
  ~/topology/dist/word_count.pex \
  topology.word_count.WordCount \ # Specifies the topology class definition
  WordCountTopology

Topology-wide configuration

If you‘re building a Python topology by subclassing Topology, you can specify configuration for the topology using the set_config method. A topology’s config is a dict in which the keys are a series constants from the api_constants module and values are configuration values for those parameters.

Here's an example:

from heronpy.api.topology import Topology
from heronpy.api import api_constants


class MyTopology(Topology):
    config = {
        api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
    }
    # Add bolts and spouts, etc.

Multiple streams

To specify that a component has multiple output streams, instead of using a list of strings for outputs, you can specify a list of Stream objects, in the following manner.

class MultiStreamSpout(Spout):
    outputs = [
        Stream(fields=["normal", "fields"], name="default"),
        Stream(fields=["error_message"], name="error_stream"),
    ]

To select one of these streams as the input for your bolt, you can simply use [] to specify the stream you want. Without any stream specified, the default stream will be used.

class MultiStreamTopology(Topology):
    spout = MultiStreamSpout.spec()
    error_bolt = ErrorBolt.spec(inputs={spout["error_stream"]: Grouping.LOWEST})
    consume_bolt = ConsumeBolt.spec(inputs={spout: Grouping.SHUFFLE})

Declaring output fields using the spec() method

In Python topologies, the output fields of your spouts and bolts need to be declared by placing outputs class attributes, as there is no declareOutputFields() method. heronpy enables you to dynamically declare output fields as a list using the optional_outputs argument in the spec() method.

This is useful in a situation like below.

class IdentityBolt(Bolt):
    # Statically declaring output fields is not allowed
    class process(self, tup):
        emit([tup.values])


class DynamicOutputField(Topology):
    spout = WordSpout.spec()
    bolt = IdentityBolt.spec(inputs={spout: Grouping.ALL}, optional_outputs=["word"])

You can also declare outputs in the add_spout() and the add_bolt() method for the TopologyBuilder in the same way.

Example topologies

There are a number of example topologies that you can peruse in the [examples/src/python]({{% githubMaster %}}/examples/src/python) directory of the [Heron repo]({{% githubMaster %}}):

TopologyFileDescription
Word count[word_count_topology.py]({{% githubMaster %}}/examples/src/python/word_count_topology.py)The [WordSpout]({{% githubMaster %}}/examples/src/python/spout/word_spout.py) spout emits random words from a list, while the [CountBolt]({{% githubMaster %}}/examples/src/python/bolt/count_bolt.py) bolt counts the number of words that have been emitted.
Multiple streams[multi_stream_topology.py]({{% githubMaster %}}/examples/src/python/multi_stream_topology.py)The [MultiStreamSpout]({{% githubMaster %}}/examples/src/python/spout/multi_stream_spout.py) emits multiple streams to downstream bolts.
Half acking[half_acking_topology.py]({{% githubMaster %}}/examples/src/python/half_acking_topology.py)The [HalfAckBolt]({{% githubMaster %}}/examples/src/python/bolt/half_ack_bolt.py) acks only half of all received tuples.
Custom grouping[custom_grouping_topology.py]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py)The [SampleCustomGrouping]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py#L26) class provides a custom field grouping.

You can build the respective PEXs for these topologies using the following commands:

$ bazel build examples/src/python:word_count
$ bazel build examples/src/python:multi_stream
$ bazel build examples/src/python:half_acking
$ bazel build examples/src/python:custom_grouping

All built PEXs will be stored in bazel-bin/examples/src/python. You can submit them to Heron like so:

$ heron submit local \
  bazel-bin/examples/src/python/word_count.pex - \
  WordCount
$ heron submit local \
  bazel-bin/examples/src/python/multi_stream.pex \
  heron.examples.src.python.multi_stream_topology.MultiStream
$ heron submit local \
  bazel-bin/examples/src/python/half_acking.pex - \
  HalfAcking
$ heron submit local \
  bazel-bin/examples/src/python/custom_grouping.pex \
  heron.examples.src.python.custom_grouping_topology.CustomGrouping

By default, the submit command also activates topologies. To disable this behavior, set the --deploy-deactivated flag.

Bolts

Bolts must implement the Bolt interface, which has the following methods.

class MyBolt(Bolt):
    def initialize(self, config, context): pass
    def process(self, tup): pass
  • The initialize() method is called when the bolt is first initialized and provides the bolt with the executing environment. It is equivalent to prepare() method of the IBolt interface in Java. Note that you should not override __init__() constructor of Bolt class for initialization of custom variables, since it is used internally by HeronInstance; instead, initialize() should be used to initialize any custom variables or connections to databases.

  • The process() method is called to process a single input tup of HeronTuple type. This method is equivalent to execute() method of IBolt interface in Java. You can use self.emit() method to emit the result, as described below.

In addition, BaseBolt class provides you with the following methods.

class BaseBolt(BaseComponent):
    def emit(self, tup, stream="default", anchors=None, direct_task=None, need_task_ids=False): ...
    def ack(self, tup): ...
    def fail(self, tup): ...
    def log(self, message, level=None): ...
    @staticmethod
    def is_tick(tup)
    @classmethod
    def spec(cls, name=None, inputs=None, par=1, config=None): ...
  • The emit() method is used to emit a given tup, which can be a list or tuple of any python objects. Unlike the Java implementation, OutputCollector doesn't exist in the Python implementation.

  • The ack() method is used to indicate that processing of a tuple has succeeded.

  • The fail() method is used to indicate that processing of a tuple has failed.

  • The is_tick() method returns whether a given tup of HeronTuple type is a tick tuple.

  • The log() method is used to log an arbitrary message, and its outputs are redirected to the log file of the component. It accepts an optional argument which specifies the logging level. By default, its logging level is info.

    Warning: due to internal issue, you should NOT output anything to sys.stdout or sys.stderr; instead, you should use this method to log anything you want.

  • In order to declare the output fields of this bolt, you need to place a class attribute outputs as a list of str or Stream. Note that unlike Java, declareOutputFields does not exist in the Python implementation. Moreover, you can optionally specify the output fields from the spec() method from the optional_outputs.

  • You will use the spec() method to define a topology and specify the location of this bolt within the topology, as well as to give component-specific configurations.

The following is an example implementation of a bolt in Python.

from collections import Counter
from heronpy.api.bolt.bolt import Bolt


class CountBolt(Bolt):
    outputs = ["word", "count"]

    def initialize(self, config, context):
        self.counter = Counter()

    def process(self, tup):
        word = tup.values[0]
        self.counter[word] += 1
        self.emit([word, self.counter[word]])

Spouts

To create a spout for a Heron topology, you need to subclass the Spout class, which has the following methods.

class MySpout(Spout):
    def initialize(self, config, context): pass
    def next_tuple(self): pass
    def ack(self, tup_id): pass
    def fail(self, tup_id): pass
    def activate(self): pass
    def deactivate(self): pass
    def close(self): pass

Spout class methods

The Spout class provides a number of methods that you should implement when subclassing.

  • The initialize() method is called when the spout is first initialized and provides the spout with the executing environment. It is equivalent to open() method of ISpout. Note that you should not override __init__() constructor of Spout class for initialization of custom variables, since it is used internally by HeronInstance; instead, initialize() should be used to initialize any custom variables or connections to databases.

  • The next_tuple() method is used to fetch tuples from input source. You can emit fetched tuples by calling self.emit(), as described below.

  • The ack() method is called when the HeronTuple with the tup_id emitted by this spout is successfully processed.

  • The fail() method is called when the HeronTuple with the tup_id emitted by this spout is not processed successfully.

  • The activate() method is called when the spout is asked to back into active state.

  • The deactivate() method is called when the spout is asked to enter deactive state.

  • The close() method is called when when the spout is shutdown. There is no guarantee that this method is called due to how the instance is killed.

BaseSpout class methods

The Spout class inherits from the BaseSpout class, which also provides you methods you can use in your spouts.

class BaseSpout(BaseComponent):
    def log(self, message, level=None): ...
    def emit(self, tup, tup_id=None, stream="default", direct_task=None, need_task_ids=False): ...
    @classmethod
    def spec(cls, name=None, par=1, config=None): ...
  • The emit() method is used to emit a given tuple, which can be a list or tuple of any Python objects. Unlike in the Java implementation, there is no OutputCollector in the Python implementation.

  • The log() method is used to log an arbitrary message, and its outputs are redirected to the log file of the component. It accepts an optional argument which specifies the logging level. By default, its logging level is info.

    Warning: due to internal issue, you should NOT output anything to sys.stdout or sys.stderr; instead, you should use this method to log anything you want.

  • In order to declare the output fields of this spout, you need to place a class attribute outputs as a list of str or Stream. Note that unlike Java, declareOutputFields does not exist in the Python implementation. Moreover, you can optionally specify the output fields from the spec() method from the optional_outputs. .

  • You will use the spec() method to define a topology and specify the location of this spout within the topology, as well as to give component-specific configurations.

Example spout

The following is an example implementation of a spout in Python.

from itertools import cycle
from heronpy.api.spout.spout import Spout


class WordSpout(Spout):
    outputs = ['word']

    def initialize(self, config, context):
        self.words = cycle(["hello", "world", "heron", "storm"])
        self.log("Initializing WordSpout...")

    def next_tuple(self):
        word = next(self.words)
        self.emit([word])

Topologies Further

$ pip install heronpy
$ easy_install heronpy

Then you can include heronpy in your project files. Here's an example:

from heronpy.api.bolt.bolt import Bolt
from heronpy.api.spout.spout import Spout
from heronpy.api.topology import Topology

Writing topologies in Python

Heron topologies are networks of spouts that pull data into a topology and bolts that process that ingested data.

You can see how to create Python spouts in the Implementing Python Spouts guide and how to create Python bolts in the Implementing Python Bolts guide.

Once you've defined spouts and bolts for a topology, you can then compose the topology in one of two ways:

  • You can use the TopologyBuilder class inside of a main function.

    Here's an example:

    #!/usr/bin/env python
    from heronpy.api.topology import TopologyBuilder
    
    
    if __name__ == "__main__":
        builder = TopologyBuilder("MyTopology")
        # Add spouts and bolts
        builder.build_and_submit()
    
  • You can subclass the Topology class.

    Here's an example:

    from heronpy.api.stream import Grouping
    from heronpy.api.topology import Topology
    
    
    class MyTopology(Topology):
        my_spout = WordSpout.spec(par=2)
        my_bolt = CountBolt.spec(par=3, inputs={spout: Grouping.fields("word")})
    

Defining topologies using the TopologyBuilder class

If you create a Python topology using a TopologyBuilder, you need to instantiate a TopologyBuilder inside of a standard Python main function, like this:

from heronpy.api.topology import TopologyBuilder


if __name__ == "__main__":
    builder = TopologyBuilder("MyTopology")

Once you‘ve created a TopologyBuilder object, you can add bolts using the add_bolt method and spouts using the add_spout method. Here’s an example:

builder = TopologyBuilder("MyTopology")
builder.add_bolt("my_bolt", CountBolt, par=3)
builder.add_spout("my_spout", WordSpout, par=2)

Both the add_bolt and add_spout methods return the corresponding HeronComponentSpec object.

The add_bolt method takes four arguments and an optional config parameter:

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this bolt
bolt_clsclassThe subclass of Bolt that defines this bolt
parintThe number of instances of this bolt in the topology
configdictSpecifies the configuration for this spoutNone

The add_spout method takes three arguments and an optional config parameter:

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this spout
spout_clsclassThe subclass of Spout that defines this spout
parintThe number of instances of this spout in the topology
inputsdict or listEither a dict mapping from HeronComponentSpec to Grouping or a list of HeronComponentSpecs, in which case the shuffle grouping is used
configdictSpecifies the configuration for this spoutNone

Example

The following is an example implementation of a word count topology in Python that subclasses TopologyBuilder.

from your_spout import WordSpout
from your_bolt import CountBolt

from heronpy.api.stream import Grouping
from heronpy.api.topology import TopologyBuilder


if __name__ == "__main__":
    builder = TopologyBuilder("WordCountTopology")
    # piece together the topology
    word_spout = builder.add_spout("word_spout", WordSpout, par=2)
    count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2, inputs={word_spout: Grouping.fields("word")})
    # submit the toplogy
    builder.build_and_submit()

Note that arguments to the main method can be passed by providing them in the heron submit command.

Topology-wide configuration

If you‘re building a Python topology using a TopologyBuilder, you can specify configuration for the topology using the set_config method. A topology’s config is a dict in which the keys are a series constants from the api_constants module and values are configuration values for those parameters.

Here's an example:

from heronpy.api import api_constants
from heronpy.api.topology import TopologyBuilder


if __name__ == "__main__":
    topology_config = {
        api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
    }
    builder = TopologyBuilder("MyTopology")
    builder.set_config(topology_config)
    # Add bolts and spouts, etc.

Launching the topology

If you want to submit Python topologies to a Heron cluster, they need to be packaged as a PEX file. In order to produce PEX files, we recommend using a build tool like Pants or Bazel.

If you defined your topology by subclassing the TopologyBuilder class and built a word_count.pex file for that topology in the ~/topology folder. You can submit the topology to a cluster called local like this:

$ heron submit local \
  ~/topology/word_count.pex \
  - # No class specified

Note the - in this submission command. If you define a topology by subclassing TopologyBuilder you do not need to instruct Heron where your main method is located.

Example topologies buildable as PEXs

  • See this repo for an example of a Heron topology written in Python and deployable as a Pants-packaged PEX.
  • See this repo for an example of a Heron topology written in Python and deployable as a Bazel-packaged PEX.

Defining a topology by subclassing the Topology class

If you create a Python topology by subclassing the Topology class, you need to create a new topology class, like this:

from my_spout import WordSpout
from my_bolt import CountBolt

from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology


class MyTopology(Topology):
    my_spout = WordSpout.spec(par=2)
    my_bolt_inputs = {my_spout: Grouping.fields("word")}
    my_bolt = CountBolt.spec(par=3, inputs=my_bolt_inputs)

All you need to do is place HeronComponentSpecs as the class attributes of your topology class, which are returned by the spec() method of your spout or bolt class. You do not need to run a build method or anything like that; the Topology class will automatically detect which spouts and bolts are included in the topology.

If you use this method to define a new Python topology, you do not need to have a main function.

For bolts, the spec method for spouts takes three optional arguments::

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this bolt or None if you want to use the variable name of the return HeronComponentSpec as the unique identifier for this bolt
parintThe number of instances of this bolt in the topology
configdictSpecifies the configuration for this boltNone

For spouts, the spec method takes four optional arguments:

ArgumentData typeDescriptionDefault
namestrThe unique identifier assigned to this spout or None if you want to use the variable name of the return HeronComponentSpec as the unique identifier for this spoutNone
inputsdict or listEither a dict mapping from HeronComponentSpec to Grouping or a list of HeronComponentSpecs, in which case the shuffle grouping is used
parintThe number of instances of this spout in the topology1
configdictSpecifies the configuration for this spoutNone

Example

Here's an example topology definition with one spout and one bolt:

from my_spout import WordSpout
from my_bolt import CountBolt

from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology


class WordCount(Topology):
    word_spout = WordSpout.spec(par=2)
    count_bolt = CountBolt.spec(par=2, inputs={word_spout: Grouping.fields("word")})

Launching

If you defined your topology by subclassing the Topology class, your main Python file should not contain a main method. You will, however, need to instruct Heron which class contains your topology definition.

Let‘s say that you’ve defined a topology by subclassing Topology and built a PEX stored in ~/topology/dist/word_count.pex. The class containing your topology definition is topology.word_count.WordCount. You can submit the topology to a cluster called local like this:

$ heron submit local \
  ~/topology/dist/word_count.pex \
  topology.word_count.WordCount \ # Specifies the topology class definition
  WordCountTopology

Topology-wide configuration

If you‘re building a Python topology by subclassing Topology, you can specify configuration for the topology using the set_config method. A topology’s config is a dict in which the keys are a series constants from the api_constants module and values are configuration values for those parameters.

Here's an example:

from heronpy.api.topology import Topology
from heronpy.api import api_constants


class MyTopology(Topology):
    config = {
        api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
    }
    # Add bolts and spouts, etc.

Multiple streams

To specify that a component has multiple output streams, instead of using a list of strings for outputs, you can specify a list of Stream objects, in the following manner.

class MultiStreamSpout(Spout):
    outputs = [
        Stream(fields=["normal", "fields"], name="default"),
        Stream(fields=["error_message"], name="error_stream"),
    ]

To select one of these streams as the input for your bolt, you can simply use [] to specify the stream you want. Without any stream specified, the default stream will be used.

class MultiStreamTopology(Topology):
    spout = MultiStreamSpout.spec()
    error_bolt = ErrorBolt.spec(inputs={spout["error_stream"]: Grouping.LOWEST})
    consume_bolt = ConsumeBolt.spec(inputs={spout: Grouping.SHUFFLE})

Declaring output fields using the spec() method

In Python topologies, the output fields of your spouts and bolts need to be declared by placing outputs class attributes, as there is no declareOutputFields() method. heronpy enables you to dynamically declare output fields as a list using the optional_outputs argument in the spec() method.

This is useful in a situation like below.

class IdentityBolt(Bolt):
    # Statically declaring output fields is not allowed
    class process(self, tup):
        emit([tup.values])


class DynamicOutputField(Topology):
    spout = WordSpout.spec()
    bolt = IdentityBolt.spec(inputs={spout: Grouping.ALL}, optional_outputs=["word"])

You can also declare outputs in the add_spout() and the add_bolt() method for the TopologyBuilder in the same way.

Example topologies

There are a number of example topologies that you can peruse in the [examples/src/python]({{% githubMaster %}}/examples/src/python) directory of the [Heron repo]({{% githubMaster %}}):

TopologyFileDescription
Word count[word_count_topology.py]({{% githubMaster %}}/examples/src/python/word_count_topology.py)The [WordSpout]({{% githubMaster %}}/examples/src/python/spout/word_spout.py) spout emits random words from a list, while the [CountBolt]({{% githubMaster %}}/examples/src/python/bolt/count_bolt.py) bolt counts the number of words that have been emitted.
Multiple streams[multi_stream_topology.py]({{% githubMaster %}}/examples/src/python/multi_stream_topology.py)The [MultiStreamSpout]({{% githubMaster %}}/examples/src/python/spout/multi_stream_spout.py) emits multiple streams to downstream bolts.
Half acking[half_acking_topology.py]({{% githubMaster %}}/examples/src/python/half_acking_topology.py)The [HalfAckBolt]({{% githubMaster %}}/examples/src/python/bolt/half_ack_bolt.py) acks only half of all received tuples.
Custom grouping[custom_grouping_topology.py]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py)The [SampleCustomGrouping]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py#L26) class provides a custom field grouping.

You can build the respective PEXs for these topologies using the following commands:

$ bazel build examples/src/python:word_count
$ bazel build examples/src/python:multi_stream
$ bazel build examples/src/python:half_acking
$ bazel build examples/src/python:custom_grouping

All built PEXs will be stored in bazel-bin/examples/src/python. You can submit them to Heron like so:

$ heron submit local \
  bazel-bin/examples/src/python/word_count.pex - \
  WordCount
$ heron submit local \
  bazel-bin/examples/src/python/multi_stream.pex \
  heron.examples.src.python.multi_stream_topology.MultiStream
$ heron submit local \
  bazel-bin/examples/src/python/half_acking.pex - \
  HalfAcking
$ heron submit local \
  bazel-bin/examples/src/python/custom_grouping.pex \
  heron.examples.src.python.custom_grouping_topology.CustomGrouping

By default, the submit command also activates topologies. To disable this behavior, set the --deploy-deactivated flag.