blob: cad88dce596c83f86313aaeb7db47363a99fffd7 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''streamlet.py: module for defining the basic concept of the heron python streamlet API'''
from abc import abstractmethod
from heronpy.streamlet.impl.streamletboltbase import StreamletBoltBase
# pylint: disable=too-many-instance-attributes, protected-access
class Streamlet(object):
"""A Streamlet is a (potentially unbounded) ordered collection of tuples
Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from static data(such as
csv files, HDFS files), or for that matter any other source. They are also created by
transforming existing Streamlets using operations such as map/flat_map, etc.
"""
def __init__(self):
"""
"""
self._children = None
self._name = None
self._num_partitions = 1
self._children = []
self._built = False
self._output = StreamletBoltBase.outputs[0].stream_id
def set_name(self, name):
"""Sets the name of the Streamlet"""
self._name = name
return self
def get_name(self):
return self._name
def set_num_partitions(self, num_partitions):
"""Sets the number of partitions"""
self._num_partitions = num_partitions
return self
def get_num_partitions(self):
return self._num_partitions
def map(self, map_function):
"""Return a new Streamlet by applying map_function to each element of this Streamlet.
"""
from heronpy.streamlet.impl.mapbolt import MapStreamlet
map_streamlet = MapStreamlet(map_function, self)
self._add_child(map_streamlet)
return map_streamlet
def flat_map(self, flatmap_function):
"""Return a new Streamlet by applying map_function to each element of this Streamlet
and flattening the result
"""
from heronpy.streamlet.impl.flatmapbolt import FlatMapStreamlet
fm_streamlet = FlatMapStreamlet(flatmap_function, self)
self._add_child(fm_streamlet)
return fm_streamlet
def filter(self, filter_function):
"""Return a new Streamlet containing only the elements that satisfy filter_function
"""
from heronpy.streamlet.impl.filterbolt import FilterStreamlet
filter_streamlet = FilterStreamlet(filter_function, self)
self._add_child(filter_streamlet)
return filter_streamlet
def repartition(self, num_partitions, repartition_function=None):
"""Return a new Streamlet containing all elements of the this streamlet but having
num_partitions partitions. Note that this is different from num_partitions(n) in
that new streamlet will be created by the repartition call.
If repartiton_function is not None, it is used to decide which parititons
(from 0 to num_partitions -1), it should route each element to.
It could also return a list of partitions if it wants to send it to multiple
partitions.
"""
from heronpy.streamlet.impl.repartitionbolt import RepartitionStreamlet
if repartition_function is None:
repartition_function = lambda x: x
repartition_streamlet = RepartitionStreamlet(num_partitions, repartition_function, self)
self._add_child(repartition_streamlet)
return repartition_streamlet
# pylint: disable=unused-variable
def clone(self, num_clones):
"""Return num_clones number of streamlets each containing all elements
of the current streamlet
"""
retval = []
for i in range(num_clones):
retval.append(self.repartition(self.get_num_partitions()))
return retval
def reduce_by_window(self, window_config, reduce_function):
"""Return a new Streamlet in which each element of this Streamlet are collected
over a window defined by window_config and then reduced using the reduce_function
reduce_function takes two element at one time and reduces them to one element that
is used in the subsequent operations.
"""
from heronpy.streamlet.impl.reducebywindowbolt import ReduceByWindowStreamlet
reduce_streamlet = ReduceByWindowStreamlet(window_config, reduce_function, self)
self._add_child(reduce_streamlet)
return reduce_streamlet
#pylint: disable=protected-access
def union(self, other_streamlet):
"""Returns a new Streamlet that consists of elements of both this and other_streamlet
"""
from heronpy.streamlet.impl.unionbolt import UnionStreamlet
union_streamlet = UnionStreamlet(self, other_streamlet)
self._add_child(union_streamlet)
other_streamlet._add_child(union_streamlet)
return union_streamlet
def transform(self, transform_operator):
"""Returns a new Streamlet by applying the transform_operator on each element of this
streamlet. The transform_function is of the type TransformOperator.
Before starting to cycle over the Streamlet, the open function of the transform_operator is
called. This allows the transform_operator to do any kind of initialization/loading, etc.
"""
from heronpy.streamlet.impl.transformbolt import TransformStreamlet
transform_streamlet = TransformStreamlet(transform_operator, self)
self._add_child(transform_streamlet)
return transform_streamlet
def log(self):
"""Logs all elements of this streamlet. This returns nothing
"""
from heronpy.streamlet.impl.logbolt import LogStreamlet
log_streamlet = LogStreamlet(self)
self._add_child(log_streamlet)
return
def consume(self, consume_function):
"""Calls consume_function for each element of this streamlet. This function returns nothing
"""
from heronpy.streamlet.impl.consumebolt import ConsumeStreamlet
consume_streamlet = ConsumeStreamlet(consume_function, self)
self._add_child(consume_streamlet)
return
def join(self, join_streamlet, window_config, join_function):
"""Return a new Streamlet by joining join_streamlet with this streamlet
"""
from heronpy.streamlet.impl.joinbolt import JoinStreamlet, JoinBolt
join_streamlet_result = JoinStreamlet(JoinBolt.INNER, window_config,
join_function, self, join_streamlet)
self._add_child(join_streamlet_result)
join_streamlet._add_child(join_streamlet_result)
return join_streamlet_result
def outer_right_join(self, join_streamlet, window_config, join_function):
"""Return a new Streamlet by outer right join_streamlet with this streamlet
"""
from heronpy.streamlet.impl.joinbolt import JoinStreamlet, JoinBolt
join_streamlet_result = JoinStreamlet(JoinBolt.OUTER_RIGHT, window_config,
join_function, self, join_streamlet)
self._add_child(join_streamlet_result)
join_streamlet._add_child(join_streamlet_result)
return join_streamlet_result
def outer_left_join(self, join_streamlet, window_config, join_function):
"""Return a new Streamlet by left join_streamlet with this streamlet
"""
from heronpy.streamlet.impl.joinbolt import JoinStreamlet, JoinBolt
join_streamlet_result = JoinStreamlet(JoinBolt.OUTER_LEFT, window_config,
join_function, self, join_streamlet)
self._add_child(join_streamlet_result)
join_streamlet._add_child(join_streamlet_result)
return join_streamlet_result
def outer_join(self, join_streamlet, window_config, join_function):
"""Return a new Streamlet by outer join_streamlet with this streamlet
"""
from heronpy.streamlet.impl.joinbolt import JoinStreamlet, JoinBolt
join_streamlet_result = JoinStreamlet(JoinBolt.OUTER, window_config,
join_function, self, join_streamlet)
self._add_child(join_streamlet_result)
join_streamlet._add_child(join_streamlet_result)
return join_streamlet_result
def reduce_by_key_and_window(self, window_config, reduce_function):
"""Return a new Streamlet in which each (key, value) pair of this Streamlet are collected
over the time_window and then reduced using the reduce_function
"""
from heronpy.streamlet.impl.reducebykeyandwindowbolt import ReduceByKeyAndWindowStreamlet
reduce_streamlet = ReduceByKeyAndWindowStreamlet(window_config, reduce_function, self)
self._add_child(reduce_streamlet)
return reduce_streamlet
##################################################################
### Internal functions
##################################################################
# pylint: disable=protected-access
def _build(self, bldr, stage_names):
if self._built:
raise RuntimeError("Logic error while building")
if self._build_this(bldr, stage_names):
self._built = True
for children in self._children:
children._build(bldr, stage_names)
@abstractmethod
def _build_this(self, builder, stage_names):
"""This is the method that's implemented by the operators.
:type builder: TopologyBuilder
:param builder: The operator adds in the current streamlet as a spout/bolt
"""
raise RuntimeError("Streamlet's _build_this not implemented")
def _add_child(self, child):
self._children.append(child)
#pylint: disable=no-self-use
def _default_stage_name_calculator(self, prefix, existing_stage_names):
"""This is the method that's implemented by the operators to get the name of the Streamlet
:return: The name of the operator
"""
index = 1
calculated_name = ""
while True:
calculated_name = prefix + "-" + str(index)
if calculated_name not in existing_stage_names:
return calculated_name
index = index + 1
return "Should Never Got Here"
def _all_built(self):
if not self._built:
return False
for child in self._children:
if not child._all_built():
return False
return True