blob: 3c78ebed8ded1609d11cde6cfab0db1d2e75b74c [file] [log] [blame]
# Copyright 2016 - Twitter, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''staticlines.py: module for defining a simple static lines input'''
from heron.api.src.python import Spout, Stream
from ...streamlet import Streamlet, OperationType
# pylint: disable=access-member-before-definition
# pylint: disable=attribute-defined-outside-init
class StaticLinesSpout(Spout):
"""StaticLinesSpout: Generates a line from a set of static lines again and again
"""
outputs = [Stream(fields=['_output_'], name='output')]
# pylint: disable=unused-argument
def initialize(self, config, context):
"""Implements StaticLines Spout's initialize method"""
self.logger.info("Initializing StaticLinesSpout with the following")
self.logger.info("Component-specific config: \n%s" % str(config))
self.words = ["Mary had a little lamb",
"Humpy Dumpy sat on a wall",
"Here we round the Moulberry bush"]
self.index = 0
self.emit_count = 0
self.ack_count = 0
self.fail_count = 0
def _get_next_line(self):
retval = self.words[self.index]
self.index += 1
if self.index >= len(self.words):
self.index = 0
return retval
def next_tuple(self):
self.emit([self._get_next_line()], stream='output')
self.emit_count += 1
def ack(self, tup_id):
self.ack_count += 1
self.logger.debug("Acked tuple %s" % str(tup_id))
def fail(self, tup_id):
self.fail_count += 1
self.logger.debug("Failed tuple %s" % str(tup_id))
class StaticLinesStreamlet(Streamlet):
"""A StaticLinesStreamlet spews a set of words forever
"""
# pylint: disable=no-self-use
def __init__(self, stage_name=None, parallelism=None):
super(StaticLinesStreamlet, self).__init__(operation=OperationType.Input,
stage_name=stage_name,
parallelism=parallelism)
@staticmethod
def staticLinesGenerator(stage_name=None, parallelism=None):
return StaticLinesStreamlet(stage_name=stage_name, parallelism=parallelism)
def _build(self, bldr, stage_names):
if self._parallelism is None:
self._parallelism = 1
if self._parallelism < 1:
raise RuntimeError("StaticLines parallelism has to be >= 1")
if self._stage_name is None:
index = 1
self._stage_name = "staticlines"
while self._stage_name in stage_names:
index = index + 1
self._stage_name = "staticlines" + str(index)
bldr.add_spout(self._stage_name, StaticLinesSpout, par=self._parallelism)
return bldr