blob: 81ff5d73697c1808e8bae7d617e56db04ce88cba [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''bolt.py: API for defining bolt in python'''
from abc import abstractmethod
from heronpy.api.spout.base_spout import BaseSpout
class Spout(BaseSpout):
"""API for defining a spout for Heron in Python
Topology writers need to inherit this ``Spout`` class to define their own custom spout, by
implementing ``initialize()``, ``next_tuple()``, ``ack()`` and ``fail()`` methods.
In addition, ``close()``, ``activate()`` and ``deactivate()`` are available to be implemented.
"""
@abstractmethod
def initialize(self, config, context):
"""Called when a task for this component is initialized within a worker on the cluster
It provides the spout with the environment in which the spout executes. Note that
you should NOT override ``__init__()`` for initialization of your spout, as it is
used internally by Heron Instance; instead, you should use this method to initialize
any custom instance variables or connections to data sources.
*Should be implemented by a subclass.*
:type config: dict
:param config: The Heron configuration for this bolt. This is the configuration provided to
the topology merged in with cluster configuration on this machine.
Note that types of string values in the config have been automatically converted,
meaning that number strings and boolean strings are converted to appropriate
types.
:type context: :class:`TopologyContext`
:param context: This object can be used to get information about this task's place within the
topology, including the task id and component id of this task, input and output
information, etc.
"""
pass
@abstractmethod
def close(self):
"""Called when this spout is going to be shutdown
There is no guarantee that close() will be called.
"""
pass
@abstractmethod
def next_tuple(self):
"""When this method is called, Heron is requesting that the Spout emit tuples
It is compatible with StreamParse API.
This method should be non-blocking, so if the Spout has no tuples to emit,
this method should return; next_tuple(), ack(), and fail() are all called in a tight
loop in a single thread in the spout task. WHen there are no tuples to emit, it is
courteous to have next_tuple sleep for a short amount of time (like a single millisecond)
so as not to waste too much CPU.
**Must be implemented by a subclass, otherwise NotImplementedError is raised.**
"""
raise NotImplementedError("Spout not implementing next_tuple() method")
@abstractmethod
def ack(self, tup_id):
"""Determine that the tuple emitted by this spout with the tup_id has been fully processed
It is compatible with StreamParse API.
Heron has determined that the tuple emitted by this spout with the tup_id identifier
has been fully processed. Typically, an implementation of this method will take that
message off the queue and prevent it from being replayed.
*Should be implemented by a subclass.*
:param tup_id: the ID of the HeronTuple that has been fully acknowledged.
"""
pass
@abstractmethod
def fail(self, tup_id):
"""Determine that the tuple emitted by this spout with the tup_id has failed to be processed
It is compatible with StreamParse API.
The tuple emitted by this spout with the tup_id identifier has failed to be
fully processed. Typically, an implementation of this method will put that
message back on the queue to be replayed at a later time.
*Should be implemented by a subclass.*
:param tup_id: the ID of the HeronTuple that has failed either due to a bolt calling ``fail()``
or timeout
"""
pass
@abstractmethod
def activate(self):
"""Called when a spout has been activated out of a deactivated mode
next_tuple() will be called on this spout soon. A spout can become activated
after having been deactivated when the topology is manipulated using the
`heron` client.
"""
pass
@abstractmethod
def deactivate(self):
"""Called when a spout has been deactivated
next_tuple() will not be called while a spout is deactivated.
The spout may or may not be reactivated in the future.
"""
pass