blob: b959d03dcaea143a14f2c090fae59de56061d477 [file] [log] [blame]
# Copyright 2016 - Parsely, Inc. (d/b/a Parse.ly)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''stream.py: module for defining Stream and Grouping for python topology'''
import collections
from heronpy.api.serializer import default_serializer
from heronpy.api.custom_grouping import ICustomGrouping
from heronpy.proto import topology_pb2
class Stream(object):
"""Heron output stream
It is compatible with StreamParse API.
"""
DEFAULT_STREAM_ID = "default"
def __init__(self, fields=None, name=DEFAULT_STREAM_ID, direct=False):
"""
:type fields: `list` or `tuple` of `str`
:param fields: field names for this stream
:type name: str
:param name: name of stream. Defaults to ``default``
:type direct: bool
:param direct: whether or not this stream is direct. Default is ``False``
"""
if fields is None:
fields = []
elif isinstance(fields, (list, tuple)):
fields = list(fields)
for field in fields:
if not isinstance(field, str):
raise TypeError("All field names must be strings, given: %s" % str(field))
else:
raise TypeError("Stream fields must be a list, tuple or None, given: %s" % str(fields))
# self.fields is always list
self.fields = fields
if name is None:
raise TypeError("Stream's name cannot be None")
elif isinstance(name, str):
self.stream_id = name
else:
raise TypeError("Stream name must be a string, given: %s" % str(name))
if isinstance(direct, bool):
self.direct = direct
if self.direct:
raise NotImplementedError("Direct stream is not supported yet.")
else:
raise TypeError("'direct' must be either True or False, given: %s" % str(direct))
class Grouping(object):
"""Helper class for defining Grouping for Python topology"""
SHUFFLE = topology_pb2.Grouping.Value("SHUFFLE")
ALL = topology_pb2.Grouping.Value("ALL")
LOWEST = topology_pb2.Grouping.Value("LOWEST")
NONE = topology_pb2.Grouping.Value("NONE")
DIRECT = topology_pb2.Grouping.Value("DIRECT")
# gtype should contain topology_pb2.Grouping.Value("FIELDS")
FIELDS = collections.namedtuple('FieldGrouping', 'gtype, fields')
# gtype should contain topology_pb2.Grouping.Value("CUSTOM")
CUSTOM = collections.namedtuple('CustomGrouping', 'gtype, python_serialized')
# StreamParse compatibility
GLOBAL = LOWEST
LOCAL_OR_SHUFFLE = SHUFFLE
@classmethod
def is_grouping_sane(cls, gtype):
"""Checks if a given gtype is sane"""
if gtype == cls.SHUFFLE or gtype == cls.ALL or gtype == cls.LOWEST or gtype == cls.NONE:
return True
elif isinstance(gtype, cls.FIELDS):
return gtype.gtype == topology_pb2.Grouping.Value("FIELDS") and \
gtype.fields is not None
elif isinstance(gtype, cls.CUSTOM):
return gtype.gtype == topology_pb2.Grouping.Value("CUSTOM") and \
gtype.python_serialized is not None
else:
#pylint: disable=fixme
#TODO: DIRECT are not supported yet
return False
@classmethod
def fields(cls, *fields):
"""Field grouping"""
if len(fields) == 1 and isinstance(fields[0], list):
fields = fields[0]
else:
fields = list(fields)
for i in fields:
if not isinstance(i, str):
raise TypeError("Non-string cannot be specified in fields")
if not fields:
raise ValueError("List cannot be empty for fields grouping")
return cls.FIELDS(gtype=topology_pb2.Grouping.Value("FIELDS"),
fields=fields)
@classmethod
def custom(cls, customgrouper):
"""Custom grouping from a given implementation of ICustomGrouping
:param customgrouper: The ICustomGrouping implemention to use
"""
if customgrouper is None:
raise TypeError("Argument to custom() must be ICustomGrouping instance or classpath")
if not isinstance(customgrouper, ICustomGrouping) and not isinstance(customgrouper, str):
raise TypeError("Argument to custom() must be ICustomGrouping instance or classpath")
serialized = default_serializer.serialize(customgrouper)
return cls.custom_serialized(serialized, is_java=False)
@classmethod
def custom_serialized(cls, serialized, is_java=True):
"""Custom grouping from a given serialized string
This class is created for compatibility with ``custom_serialized(cls, java_serialized)`` method
of StreamParse API, although its functionality is not yet implemented (Java-serialized).
Currently only custom grouping implemented in Python is supported, and ``custom()`` method
should be used to indicate its classpath, rather than directly to use this method.
In the future, users can directly specify Java-serialized object with ``is_java=True`` in order
to use a custom grouping implemented in Java for python topology.
:param serialized: serialized classpath to custom grouping class to use (if python)
:param is_java: indicate whether this is Java serialized, or python serialized
"""
if not isinstance(serialized, bytes):
raise TypeError("Argument to custom_serialized() must be "
"a serialized Python class as bytes, given: %s" % str(serialized))
if not is_java:
return cls.CUSTOM(gtype=topology_pb2.Grouping.Value("CUSTOM"),
python_serialized=serialized)
else:
raise NotImplementedError("Custom grouping implemented in Java for Python topology"
"is not yet supported.")
@classmethod
def custom_object(cls, java_class_name, arg_list):
"""Tuples will be assigned to tasks by the given Java class."""
raise NotImplementedError("custom_object() method is not yet implemented")