blob: a4131c13a3e0c34d342bcb6a54eefac58709757c [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
---
id: topology-development-topology-api-python
title: The Heron Topology API for Python
sidebar_label: The Heron Topology API for Python
---
> The current version of `heronpy` is [{{% heronpyVersion %}}](https://pypi.python.org/pypi/heronpy/{{% heronpyVersion %}}).
Support for developing Heron topologies in Python is provided by a Python library called [`heronpy`](https://pypi.python.org/pypi/heronpy).
> #### Python API docs
> You can find API docs for the `heronpy` library [here](/api/python).
## Setup
First, you need to install the `heronpy` library using [pip](https://pip.pypa.io/en/stable/), [EasyInstall](https://wiki.python.org/moin/EasyInstall), or an analogous tool:
```shell
$ pip install heronpy
$ easy_install heronpy
```
Then you can include `heronpy` in your project files. Here's an example:
```python
from heronpy.api.bolt.bolt import Bolt
from heronpy.api.spout.spout import Spout
from heronpy.api.topology import Topology
```
## Writing topologies in Python
Heron [topologies](heron-topology-concepts) are networks of [spouts](heron-topology-concepts#spouts) that pull data into a topology and [bolts](heron-topology-concepts#bolts) that process that ingested data.
> You can see how to create Python spouts in the [Implementing Python Spouts](#spouts) guide and how to create Python bolts in the [Implementing Python Bolts](#bolts) guide.
Once you've defined spouts and bolts for a topology, you can then compose the topology in one of two ways:
* You can use the [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder) class inside of a main function.
Here's an example:
```python
#!/usr/bin/env python
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
builder = TopologyBuilder("MyTopology")
# Add spouts and bolts
builder.build_and_submit()
```
* You can subclass the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class.
Here's an example:
```python
from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology
class MyTopology(Topology):
my_spout = WordSpout.spec(par=2)
my_bolt = CountBolt.spec(par=3, inputs={spout: Grouping.fields("word")})
```
## Defining topologies using the [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder) class
If you create a Python topology using a [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder), you need to instantiate a `TopologyBuilder` inside of a standard Python main function, like this:
```python
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
builder = TopologyBuilder("MyTopology")
```
Once you've created a `TopologyBuilder` object, you can add [bolts](#bolts) using the [`add_bolt`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.add_bolt) method and [spouts](#spouts) using the [`add_spout`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.add_spout) method. Here's an example:
```python
builder = TopologyBuilder("MyTopology")
builder.add_bolt("my_bolt", CountBolt, par=3)
builder.add_spout("my_spout", WordSpout, par=2)
```
Both the `add_bolt` and `add_spout` methods return the corresponding [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec) object.
The `add_bolt` method takes four arguments and an optional `config` parameter:
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this bolt | |
`bolt_cls` | class | The subclass of [`Bolt`](/api/python/bolt/bolt.m.html#heronpy.bolt.bolt.Bolt) that defines this bolt | |
`par` | `int` | The number of instances of this bolt in the topology | |
`config` | `dict` | Specifies the configuration for this spout | `None`
The `add_spout` method takes three arguments and an optional `config` parameter:
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this spout | |
`spout_cls` | class | The subclass of [`Spout`](/api/python/spout/spout.m.html#heronpy.spout.spout.Spout) that defines this spout | |
`par` | `int` | The number of instances of this spout in the topology | |
`inputs` | `dict` or `list` | Either a `dict` mapping from [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec) to [`Grouping`](/api/python/stream.m.html#heronpy.stream.Grouping) *or* a list of [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec)s, in which case the [`shuffle`](/api/python/stream.m.html#heronpy.stream.Grouping.SHUFFLE) grouping is used
`config` | `dict` | Specifies the configuration for this spout | `None`
### Example
The following is an example implementation of a word count topology in Python that subclasses [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder).
```python
from your_spout import WordSpout
from your_bolt import CountBolt
from heronpy.api.stream import Grouping
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
builder = TopologyBuilder("WordCountTopology")
# piece together the topology
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2, inputs={word_spout: Grouping.fields("word")})
# submit the toplogy
builder.build_and_submit()
```
Note that arguments to the main method can be passed by providing them in the
`heron submit` command.
### Topology-wide configuration
If you're building a Python topology using a `TopologyBuilder`, you can specify configuration for the topology using the [`set_config`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.set_config) method. A topology's config is a `dict` in which the keys are a series constants from the [`api_constants`](/api/python/api_constants.m.html) module and values are configuration values for those parameters.
Here's an example:
```python
from heronpy.api import api_constants
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
topology_config = {
api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
}
builder = TopologyBuilder("MyTopology")
builder.set_config(topology_config)
# Add bolts and spouts, etc.
```
### Launching the topology
If you want to [submit](../../../operators/heron-cli#submitting-a-topology) Python topologies to a Heron cluster, they need to be packaged as a [PEX](https://pex.readthedocs.io/en/stable/whatispex.html) file. In order to produce PEX files, we recommend using a build tool like [Pants](http://www.pantsbuild.org/python_readme.html) or [Bazel](https://github.com/benley/bazel_rules_pex).
If you defined your topology by subclassing the [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder) class and built a `word_count.pex` file for that topology in the `~/topology` folder. You can submit the topology to a cluster called `local` like this:
```bash
$ heron submit local \
~/topology/word_count.pex \
- # No class specified
```
Note the `-` in this submission command. If you define a topology by subclassing `TopologyBuilder` you do not need to instruct Heron where your main method is located.
> #### Example topologies buildable as PEXs
> * See [this repo](https://github.com/streamlio/pants-dev-environment) for an example of a Heron topology written in Python and deployable as a Pants-packaged PEX.
> * See [this repo](https://github.com/streamlio/bazel-dev-environment) for an example of a Heron topology written in Python and deployable as a Bazel-packaged PEX.
## Defining a topology by subclassing the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class
If you create a Python topology by subclassing the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class, you need to create a new topology class, like this:
```python
from my_spout import WordSpout
from my_bolt import CountBolt
from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology
class MyTopology(Topology):
my_spout = WordSpout.spec(par=2)
my_bolt_inputs = {my_spout: Grouping.fields("word")}
my_bolt = CountBolt.spec(par=3, inputs=my_bolt_inputs)
```
All you need to do is place [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec)s as the class attributes
of your topology class, which are returned by the `spec()` method of
your spout or bolt class. You do *not* need to run a `build` method or anything like that; the `Topology` class will automatically detect which spouts and bolts are included in the topology.
> If you use this method to define a new Python topology, you do *not* need to have a main function.
For bolts, the [`spec`](/api/python/bolt/bolt.m.html#heronpy.bolt.bolt.Bolt.spec) method for spouts takes three optional arguments::
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this bolt or `None` if you want to use the variable name of the return `HeronComponentSpec` as the unique identifier for this bolt | |
`par` | `int` | The number of instances of this bolt in the topology | |
`config` | `dict` | Specifies the configuration for this bolt | `None`
For spouts, the [`spec`](/api/python/spout/spout.m.html#heronpy.spout.spout.Spout.spec) method takes four optional arguments:
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this spout or `None` if you want to use the variable name of the return `HeronComponentSpec` as the unique identifier for this spout | `None` |
`inputs` | `dict` or `list` | Either a `dict` mapping from [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec) to [`Grouping`](/api/python/stream.m.html#heronpy.stream.Grouping) *or* a list of [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec)s, in which case the [`shuffle`](/api/python/stream.m.html#heronpy.stream.Grouping.SHUFFLE) grouping is used
`par` | `int` | The number of instances of this spout in the topology | `1` |
`config` | `dict` | Specifies the configuration for this spout | `None`
### Example
Here's an example topology definition with one spout and one bolt:
```python
from my_spout import WordSpout
from my_bolt import CountBolt
from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology
class WordCount(Topology):
word_spout = WordSpout.spec(par=2)
count_bolt = CountBolt.spec(par=2, inputs={word_spout: Grouping.fields("word")})
```
### Launching
If you defined your topology by subclassing the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class,
your main Python file should *not* contain a main method. You will, however, need to instruct Heron which class contains your topology definition.
Let's say that you've defined a topology by subclassing `Topology` and built a PEX stored in `~/topology/dist/word_count.pex`. The class containing your topology definition is `topology.word_count.WordCount`. You can submit the topology to a cluster called `local` like this:
```bash
$ heron submit local \
~/topology/dist/word_count.pex \
topology.word_count.WordCount \ # Specifies the topology class definition
WordCountTopology
```
### Topology-wide configuration
If you're building a Python topology by subclassing `Topology`, you can specify configuration for the topology using the [`set_config`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.set_config) method. A topology's config is a `dict` in which the keys are a series constants from the [`api_constants`](/api/python/api_constants.m.html) module and values are configuration values for those parameters.
Here's an example:
```python
from heronpy.api.topology import Topology
from heronpy.api import api_constants
class MyTopology(Topology):
config = {
api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
}
# Add bolts and spouts, etc.
```
## Multiple streams
To specify that a component has multiple output streams, instead of using a list of
strings for `outputs`, you can specify a list of `Stream` objects, in the following manner.
```python
class MultiStreamSpout(Spout):
outputs = [
Stream(fields=["normal", "fields"], name="default"),
Stream(fields=["error_message"], name="error_stream"),
]
```
To select one of these streams as the input for your bolt, you can simply
use `[]` to specify the stream you want. Without any stream specified, the `default`
stream will be used.
```python
class MultiStreamTopology(Topology):
spout = MultiStreamSpout.spec()
error_bolt = ErrorBolt.spec(inputs={spout["error_stream"]: Grouping.LOWEST})
consume_bolt = ConsumeBolt.spec(inputs={spout: Grouping.SHUFFLE})
```
## Declaring output fields using the `spec()` method
In Python topologies, the output fields of your spouts and bolts
need to be declared by placing `outputs` class attributes, as there is
no `declareOutputFields()` method. `heronpy` enables you to dynamically declare output fields as a list using the
`optional_outputs` argument in the `spec()` method.
This is useful in a situation like below.
```python
class IdentityBolt(Bolt):
# Statically declaring output fields is not allowed
class process(self, tup):
emit([tup.values])
class DynamicOutputField(Topology):
spout = WordSpout.spec()
bolt = IdentityBolt.spec(inputs={spout: Grouping.ALL}, optional_outputs=["word"])
```
You can also declare outputs in the `add_spout()` and the `add_bolt()`
method for the `TopologyBuilder` in the same way.
## Example topologies
There are a number of example topologies that you can peruse in the [`examples/src/python`]({{% githubMaster %}}/examples/src/python) directory of the [Heron repo]({{% githubMaster %}}):
Topology | File | Description
:--------|:-----|:-----------
Word count | [`word_count_topology.py`]({{% githubMaster %}}/examples/src/python/word_count_topology.py) | The [`WordSpout`]({{% githubMaster %}}/examples/src/python/spout/word_spout.py) spout emits random words from a list, while the [`CountBolt`]({{% githubMaster %}}/examples/src/python/bolt/count_bolt.py) bolt counts the number of words that have been emitted.
Multiple streams | [`multi_stream_topology.py`]({{% githubMaster %}}/examples/src/python/multi_stream_topology.py) | The [`MultiStreamSpout`]({{% githubMaster %}}/examples/src/python/spout/multi_stream_spout.py) emits multiple streams to downstream bolts.
Half acking | [`half_acking_topology.py`]({{% githubMaster %}}/examples/src/python/half_acking_topology.py) | The [`HalfAckBolt`]({{% githubMaster %}}/examples/src/python/bolt/half_ack_bolt.py) acks only half of all received tuples.
Custom grouping | [`custom_grouping_topology.py`]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py) | The [`SampleCustomGrouping`]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py#L26) class provides a custom field grouping.
You can build the respective PEXs for these topologies using the following commands:
```shell
$ bazel build examples/src/python:word_count
$ bazel build examples/src/python:multi_stream
$ bazel build examples/src/python:half_acking
$ bazel build examples/src/python:custom_grouping
```
All built PEXs will be stored in `bazel-bin/examples/src/python`. You can submit them to Heron like so:
```shell
$ heron submit local \
bazel-bin/examples/src/python/word_count.pex - \
WordCount
$ heron submit local \
bazel-bin/examples/src/python/multi_stream.pex \
heron.examples.src.python.multi_stream_topology.MultiStream
$ heron submit local \
bazel-bin/examples/src/python/half_acking.pex - \
HalfAcking
$ heron submit local \
bazel-bin/examples/src/python/custom_grouping.pex \
heron.examples.src.python.custom_grouping_topology.CustomGrouping
```
By default, the `submit` command also activates topologies. To disable this behavior, set the `--deploy-deactivated` flag.
## Bolts
Bolts must implement the `Bolt` interface, which has the following methods.
```python
class MyBolt(Bolt):
def initialize(self, config, context): pass
def process(self, tup): pass
```
* The `initialize()` method is called when the bolt is first initialized and
provides the bolt with the executing environment. It is equivalent to `prepare()`
method of the [`IBolt`](/api/org/apache/heron/api/bolt/IBolt.html) interface in Java.
Note that you should not override `__init__()` constructor of `Bolt` class
for initialization of custom variables, since it is used internally by HeronInstance; instead,
`initialize()` should be used to initialize any custom variables or connections to databases.
* The `process()` method is called to process a single input `tup` of `HeronTuple` type. This method
is equivalent to `execute()` method of `IBolt` interface in Java. You can use
`self.emit()` method to emit the result, as described below.
In addition, `BaseBolt` class provides you with the following methods.
```python
class BaseBolt(BaseComponent):
def emit(self, tup, stream="default", anchors=None, direct_task=None, need_task_ids=False): ...
def ack(self, tup): ...
def fail(self, tup): ...
def log(self, message, level=None): ...
@staticmethod
def is_tick(tup)
@classmethod
def spec(cls, name=None, inputs=None, par=1, config=None): ...
```
* The `emit()` method is used to emit a given `tup`, which can be a `list` or `tuple` of
any python objects. Unlike the Java implementation, `OutputCollector`
doesn't exist in the Python implementation.
* The `ack()` method is used to indicate that processing of a tuple has succeeded.
* The `fail()` method is used to indicate that processing of a tuple has failed.
* The `is_tick()` method returns whether a given `tup` of `HeronTuple` type is a tick tuple.
* The `log()` method is used to log an arbitrary message, and its outputs are redirected
to the log file of the component. It accepts an optional argument
which specifies the logging level. By default, its logging level is `info`.
**Warning:** due to internal issue, you should **NOT** output anything to
`sys.stdout` or `sys.stderr`; instead, you should use this method to log anything you want.
* In order to declare the output fields of this bolt, you need to place
a class attribute `outputs` as a list of `str` or `Stream`. Note that unlike Java,
`declareOutputFields` does not exist in the Python implementation. Moreover, you can
optionally specify the output fields from the `spec()` method from the `optional_outputs`.
* You will use the `spec()` method to define a topology and specify the location
of this bolt within the topology, as well as to give component-specific configurations.
The following is an example implementation of a bolt in Python.
```python
from collections import Counter
from heronpy.api.bolt.bolt import Bolt
class CountBolt(Bolt):
outputs = ["word", "count"]
def initialize(self, config, context):
self.counter = Counter()
def process(self, tup):
word = tup.values[0]
self.counter[word] += 1
self.emit([word, self.counter[word]])
```
## Spouts
To create a spout for a Heron topology, you need to subclass the [`Spout`](/api/python/spout/spout.m.html#heronpy.spout.spout.Spout) class, which has the following methods.
```python
class MySpout(Spout):
def initialize(self, config, context): pass
def next_tuple(self): pass
def ack(self, tup_id): pass
def fail(self, tup_id): pass
def activate(self): pass
def deactivate(self): pass
def close(self): pass
```
## `Spout` class methods
The [`Spout`](/api/python/spout/spout.m.html#heronpy.spout.spout.Spout) class provides a number of methods that you should implement when subclassing.
* The `initialize()` method is called when the spout is first initialized
and provides the spout with the executing environment. It is equivalent to
`open()` method of [`ISpout`](/api/org/apache/heron/api/spout/ISpout.html).
Note that you should not override `__init__()` constructor of `Spout` class
for initialization of custom variables, since it is used internally by HeronInstance; instead,
`initialize()` should be used to initialize any custom variables or connections to databases.
* The `next_tuple()` method is used to fetch tuples from input source. You can
emit fetched tuples by calling `self.emit()`, as described below.
* The `ack()` method is called when the `HeronTuple` with the `tup_id` emitted
by this spout is successfully processed.
* The `fail()` method is called when the `HeronTuple` with the `tup_id` emitted
by this spout is not processed successfully.
* The `activate()` method is called when the spout is asked to back into
active state.
* The `deactivate()` method is called when the spout is asked to enter deactive
state.
* The `close()` method is called when when the spout is shutdown. There is no
guarantee that this method is called due to how the instance is killed.
## `BaseSpout` class methods
The `Spout` class inherits from the [`BaseSpout`](/api/python/spout/base_spout.m.html#heronpy.spout.base_spout.BaseSpout) class, which also provides you methods you can use in your spouts.
```python
class BaseSpout(BaseComponent):
def log(self, message, level=None): ...
def emit(self, tup, tup_id=None, stream="default", direct_task=None, need_task_ids=False): ...
@classmethod
def spec(cls, name=None, par=1, config=None): ...
```
* The `emit()` method is used to emit a given tuple, which can be a `list` or `tuple` of any Python objects. Unlike in the Java implementation, there is no `OutputCollector` in the Python implementation.
* The `log()` method is used to log an arbitrary message, and its outputs are redirected to the log file of the component. It accepts an optional argument which specifies the logging level. By default, its logging level is `info`.
**Warning:** due to internal issue, you should **NOT** output anything to
`sys.stdout` or `sys.stderr`; instead, you should use this method to log anything you want.
* In order to declare the output fields of this spout, you need to place
a class attribute `outputs` as a list of `str` or `Stream`. Note that unlike Java,
`declareOutputFields` does not exist in the Python implementation. Moreover, you can
optionally specify the output fields from the `spec()` method from the `optional_outputs`.
.
* You will use the `spec()` method to define a topology and specify the location
of this spout within the topology, as well as to give component-specific configurations.
## Example spout
The following is an example implementation of a spout in Python.
```python
from itertools import cycle
from heronpy.api.spout.spout import Spout
class WordSpout(Spout):
outputs = ['word']
def initialize(self, config, context):
self.words = cycle(["hello", "world", "heron", "storm"])
self.log("Initializing WordSpout...")
def next_tuple(self):
word = next(self.words)
self.emit([word])
```
## Topologies Further
```shell
$ pip install heronpy
$ easy_install heronpy
```
Then you can include `heronpy` in your project files. Here's an example:
```python
from heronpy.api.bolt.bolt import Bolt
from heronpy.api.spout.spout import Spout
from heronpy.api.topology import Topology
```
## Writing topologies in Python
Heron [topologies](heron-topologies-concepts) are networks of [spouts](../spouts) that pull data into a topology and [bolts](../bolts) that process that ingested data.
> You can see how to create Python spouts in the [Implementing Python Spouts](../spouts) guide and how to create Python bolts in the [Implementing Python Bolts](../bolts) guide.
Once you've defined spouts and bolts for a topology, you can then compose the topology in one of two ways:
* You can use the [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder) class inside of a main function.
Here's an example:
```python
#!/usr/bin/env python
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
builder = TopologyBuilder("MyTopology")
# Add spouts and bolts
builder.build_and_submit()
```
* You can subclass the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class.
Here's an example:
```python
from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology
class MyTopology(Topology):
my_spout = WordSpout.spec(par=2)
my_bolt = CountBolt.spec(par=3, inputs={spout: Grouping.fields("word")})
```
## Defining topologies using the [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder) class
If you create a Python topology using a [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder), you need to instantiate a `TopologyBuilder` inside of a standard Python main function, like this:
```python
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
builder = TopologyBuilder("MyTopology")
```
Once you've created a `TopologyBuilder` object, you can add [bolts](../bolts) using the [`add_bolt`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.add_bolt) method and [spouts](../spouts) using the [`add_spout`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.add_spout) method. Here's an example:
```python
builder = TopologyBuilder("MyTopology")
builder.add_bolt("my_bolt", CountBolt, par=3)
builder.add_spout("my_spout", WordSpout, par=2)
```
Both the `add_bolt` and `add_spout` methods return the corresponding [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec) object.
The `add_bolt` method takes four arguments and an optional `config` parameter:
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this bolt | |
`bolt_cls` | class | The subclass of [`Bolt`](/api/python/bolt/bolt.m.html#heronpy.bolt.bolt.Bolt) that defines this bolt | |
`par` | `int` | The number of instances of this bolt in the topology | |
`config` | `dict` | Specifies the configuration for this spout | `None`
The `add_spout` method takes three arguments and an optional `config` parameter:
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this spout | |
`spout_cls` | class | The subclass of [`Spout`](/api/python/spout/spout.m.html#heronpy.spout.spout.Spout) that defines this spout | |
`par` | `int` | The number of instances of this spout in the topology | |
`inputs` | `dict` or `list` | Either a `dict` mapping from [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec) to [`Grouping`](/api/python/stream.m.html#heronpy.stream.Grouping) *or* a list of [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec)s, in which case the [`shuffle`](/api/python/stream.m.html#heronpy.stream.Grouping.SHUFFLE) grouping is used
`config` | `dict` | Specifies the configuration for this spout | `None`
### Example
The following is an example implementation of a word count topology in Python that subclasses [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder).
```python
from your_spout import WordSpout
from your_bolt import CountBolt
from heronpy.api.stream import Grouping
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
builder = TopologyBuilder("WordCountTopology")
# piece together the topology
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2, inputs={word_spout: Grouping.fields("word")})
# submit the toplogy
builder.build_and_submit()
```
Note that arguments to the main method can be passed by providing them in the
`heron submit` command.
### Topology-wide configuration
If you're building a Python topology using a `TopologyBuilder`, you can specify configuration for the topology using the [`set_config`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.set_config) method. A topology's config is a `dict` in which the keys are a series constants from the [`api_constants`](/api/python/api_constants.m.html) module and values are configuration values for those parameters.
Here's an example:
```python
from heronpy.api import api_constants
from heronpy.api.topology import TopologyBuilder
if __name__ == "__main__":
topology_config = {
api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
}
builder = TopologyBuilder("MyTopology")
builder.set_config(topology_config)
# Add bolts and spouts, etc.
```
### Launching the topology
If you want to [submit](../../../operators/heron-cli#submitting-a-topology) Python topologies to a Heron cluster, they need to be packaged as a [PEX](https://pex.readthedocs.io/en/stable/whatispex.html) file. In order to produce PEX files, we recommend using a build tool like [Pants](http://www.pantsbuild.org/python_readme.html) or [Bazel](https://github.com/benley/bazel_rules_pex).
If you defined your topology by subclassing the [`TopologyBuilder`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder) class and built a `word_count.pex` file for that topology in the `~/topology` folder. You can submit the topology to a cluster called `local` like this:
```bash
$ heron submit local \
~/topology/word_count.pex \
- # No class specified
```
Note the `-` in this submission command. If you define a topology by subclassing `TopologyBuilder` you do not need to instruct Heron where your main method is located.
> #### Example topologies buildable as PEXs
> * See [this repo](https://github.com/streamlio/pants-dev-environment) for an example of a Heron topology written in Python and deployable as a Pants-packaged PEX.
> * See [this repo](https://github.com/streamlio/bazel-dev-environment) for an example of a Heron topology written in Python and deployable as a Bazel-packaged PEX.
## Defining a topology by subclassing the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class
If you create a Python topology by subclassing the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class, you need to create a new topology class, like this:
```python
from my_spout import WordSpout
from my_bolt import CountBolt
from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology
class MyTopology(Topology):
my_spout = WordSpout.spec(par=2)
my_bolt_inputs = {my_spout: Grouping.fields("word")}
my_bolt = CountBolt.spec(par=3, inputs=my_bolt_inputs)
```
All you need to do is place [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec)s as the class attributes
of your topology class, which are returned by the `spec()` method of
your spout or bolt class. You do *not* need to run a `build` method or anything like that; the `Topology` class will automatically detect which spouts and bolts are included in the topology.
> If you use this method to define a new Python topology, you do *not* need to have a main function.
For bolts, the [`spec`](/api/python/bolt/bolt.m.html#heronpy.bolt.bolt.Bolt.spec) method for spouts takes three optional arguments::
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this bolt or `None` if you want to use the variable name of the return `HeronComponentSpec` as the unique identifier for this bolt | |
`par` | `int` | The number of instances of this bolt in the topology | |
`config` | `dict` | Specifies the configuration for this bolt | `None`
For spouts, the [`spec`](/api/python/spout/spout.m.html#heronpy.spout.spout.Spout.spec) method takes four optional arguments:
Argument | Data type | Description | Default
:--------|:----------|:------------|:-------
`name` | `str` | The unique identifier assigned to this spout or `None` if you want to use the variable name of the return `HeronComponentSpec` as the unique identifier for this spout | `None` |
`inputs` | `dict` or `list` | Either a `dict` mapping from [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec) to [`Grouping`](/api/python/stream.m.html#heronpy.stream.Grouping) *or* a list of [`HeronComponentSpec`](/api/python/component/component_spec.m.html#heronpy.component.component_spec.HeronComponentSpec)s, in which case the [`shuffle`](/api/python/stream.m.html#heronpy.stream.Grouping.SHUFFLE) grouping is used
`par` | `int` | The number of instances of this spout in the topology | `1` |
`config` | `dict` | Specifies the configuration for this spout | `None`
### Example
Here's an example topology definition with one spout and one bolt:
```python
from my_spout import WordSpout
from my_bolt import CountBolt
from heronpy.api.stream import Grouping
from heronpy.api.topology import Topology
class WordCount(Topology):
word_spout = WordSpout.spec(par=2)
count_bolt = CountBolt.spec(par=2, inputs={word_spout: Grouping.fields("word")})
```
### Launching
If you defined your topology by subclassing the [`Topology`](/api/python/topology.m.html#heronpy.topology.Topology) class,
your main Python file should *not* contain a main method. You will, however, need to instruct Heron which class contains your topology definition.
Let's say that you've defined a topology by subclassing `Topology` and built a PEX stored in `~/topology/dist/word_count.pex`. The class containing your topology definition is `topology.word_count.WordCount`. You can submit the topology to a cluster called `local` like this:
```bash
$ heron submit local \
~/topology/dist/word_count.pex \
topology.word_count.WordCount \ # Specifies the topology class definition
WordCountTopology
```
### Topology-wide configuration
If you're building a Python topology by subclassing `Topology`, you can specify configuration for the topology using the [`set_config`](/api/python/topology.m.html#heronpy.topology.TopologyBuilder.set_config) method. A topology's config is a `dict` in which the keys are a series constants from the [`api_constants`](/api/python/api_constants.m.html) module and values are configuration values for those parameters.
Here's an example:
```python
from heronpy.api.topology import Topology
from heronpy.api import api_constants
class MyTopology(Topology):
config = {
api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
}
# Add bolts and spouts, etc.
```
## Multiple streams
To specify that a component has multiple output streams, instead of using a list of
strings for `outputs`, you can specify a list of `Stream` objects, in the following manner.
```python
class MultiStreamSpout(Spout):
outputs = [
Stream(fields=["normal", "fields"], name="default"),
Stream(fields=["error_message"], name="error_stream"),
]
```
To select one of these streams as the input for your bolt, you can simply
use `[]` to specify the stream you want. Without any stream specified, the `default`
stream will be used.
```python
class MultiStreamTopology(Topology):
spout = MultiStreamSpout.spec()
error_bolt = ErrorBolt.spec(inputs={spout["error_stream"]: Grouping.LOWEST})
consume_bolt = ConsumeBolt.spec(inputs={spout: Grouping.SHUFFLE})
```
## Declaring output fields using the `spec()` method
In Python topologies, the output fields of your spouts and bolts
need to be declared by placing `outputs` class attributes, as there is
no `declareOutputFields()` method. `heronpy` enables you to dynamically declare output fields as a list using the
`optional_outputs` argument in the `spec()` method.
This is useful in a situation like below.
```python
class IdentityBolt(Bolt):
# Statically declaring output fields is not allowed
class process(self, tup):
emit([tup.values])
class DynamicOutputField(Topology):
spout = WordSpout.spec()
bolt = IdentityBolt.spec(inputs={spout: Grouping.ALL}, optional_outputs=["word"])
```
You can also declare outputs in the `add_spout()` and the `add_bolt()`
method for the `TopologyBuilder` in the same way.
## Example topologies
There are a number of example topologies that you can peruse in the [`examples/src/python`]({{% githubMaster %}}/examples/src/python) directory of the [Heron repo]({{% githubMaster %}}):
Topology | File | Description
:--------|:-----|:-----------
Word count | [`word_count_topology.py`]({{% githubMaster %}}/examples/src/python/word_count_topology.py) | The [`WordSpout`]({{% githubMaster %}}/examples/src/python/spout/word_spout.py) spout emits random words from a list, while the [`CountBolt`]({{% githubMaster %}}/examples/src/python/bolt/count_bolt.py) bolt counts the number of words that have been emitted.
Multiple streams | [`multi_stream_topology.py`]({{% githubMaster %}}/examples/src/python/multi_stream_topology.py) | The [`MultiStreamSpout`]({{% githubMaster %}}/examples/src/python/spout/multi_stream_spout.py) emits multiple streams to downstream bolts.
Half acking | [`half_acking_topology.py`]({{% githubMaster %}}/examples/src/python/half_acking_topology.py) | The [`HalfAckBolt`]({{% githubMaster %}}/examples/src/python/bolt/half_ack_bolt.py) acks only half of all received tuples.
Custom grouping | [`custom_grouping_topology.py`]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py) | The [`SampleCustomGrouping`]({{% githubMaster %}}/examples/src/python/custom_grouping_topology.py#L26) class provides a custom field grouping.
You can build the respective PEXs for these topologies using the following commands:
```shell
$ bazel build examples/src/python:word_count
$ bazel build examples/src/python:multi_stream
$ bazel build examples/src/python:half_acking
$ bazel build examples/src/python:custom_grouping
```
All built PEXs will be stored in `bazel-bin/examples/src/python`. You can submit them to Heron like so:
```shell
$ heron submit local \
bazel-bin/examples/src/python/word_count.pex - \
WordCount
$ heron submit local \
bazel-bin/examples/src/python/multi_stream.pex \
heron.examples.src.python.multi_stream_topology.MultiStream
$ heron submit local \
bazel-bin/examples/src/python/half_acking.pex - \
HalfAcking
$ heron submit local \
bazel-bin/examples/src/python/custom_grouping.pex \
heron.examples.src.python.custom_grouping_topology.CustomGrouping
```
By default, the `submit` command also activates topologies. To disable this behavior, set the `--deploy-deactivated` flag.