title: Implementing Python Spouts

Python API docs

You can find API docs for the heronpy library here.

To create a spout for a Heron topology, you need to subclass the Spout class, which has the following methods.

class MySpout(Spout):
    def initialize(self, config, context): pass
    def next_tuple(self): pass
    def ack(self, tup_id): pass
    def fail(self, tup_id): pass
    def activate(self): pass
    def deactivate(self): pass
    def close(self): pass

Spout class methods

The Spout class provides a number of methods that you should implement when subclassing.

  • The initialize() method is called when the spout is first initialized and provides the spout with the executing environment. It is equivalent to open() method of ISpout. Note that you should not override __init__() constructor of Spout class for initialization of custom variables, since it is used internally by HeronInstance; instead, initialize() should be used to initialize any custom variables or connections to databases.

  • The next_tuple() method is used to fetch tuples from input source. You can emit fetched tuples by calling self.emit(), as described below.

  • The ack() method is called when the HeronTuple with the tup_id emitted by this spout is successfully processed.

  • The fail() method is called when the HeronTuple with the tup_id emitted by this spout is not processed successfully.

  • The activate() method is called when the spout is asked to back into active state.

  • The deactivate() method is called when the spout is asked to enter deactive state.

  • The close() method is called when when the spout is shutdown. There is no guarantee that this method is called due to how the instance is killed.

BaseSpout class methods

The Spout class inherits from the BaseSpout class, which also provides you methods you can use in your spouts.

class BaseSpout(BaseComponent):
    def log(self, message, level=None): ...
    def emit(self, tup, tup_id=None, stream="default", direct_task=None, need_task_ids=False): ...
    @classmethod
    def spec(cls, name=None, par=1, config=None): ...
  • The emit() method is used to emit a given tuple, which can be a list or tuple of any Python objects. Unlike in the Java implementation, there is no OutputCollector in the Python implementation.

  • The log() method is used to log an arbitrary message, and its outputs are redirected to the log file of the component. It accepts an optional argument which specifies the logging level. By default, its logging level is info.

    Warning: due to internal issue, you should NOT output anything to sys.stdout or sys.stderr; instead, you should use this method to log anything you want.

  • In order to declare the output fields of this spout, you need to place a class attribute outputs as a list of str or Stream. Note that unlike Java, declareOutputFields does not exist in the Python implementation. Moreover, you can optionally specify the output fields from the spec() method from the optional_outputs. For further information, refer to this page.

  • You will use the spec() method to define a topology and specify the location of this spout within the topology, as well as to give component-specific configurations. For the usage of this method, refer to this page.

Example spout

The following is an example implementation of a spout in Python.

from itertools import cycle
from heronpy.api.spout.spout import Spout


class WordSpout(Spout):
    outputs = ['word']

    def initialize(self, config, context):
        self.words = cycle(["hello", "world", "heron", "storm"])
        self.log("Initializing WordSpout...")

    def next_tuple(self):
        word = next(self.words)
        self.emit([word])