blob: bf134d7b48ad0cef3857d758554ab2ae3119a076 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''component_spec.py'''
import uuid
from heronpy.api.serializer import default_serializer
from heronpy.api.api_constants import TOPOLOGY_COMPONENT_PARALLELISM
from heronpy.proto import topology_pb2
from heronpy.api.stream import Stream, Grouping
# pylint: disable=too-many-instance-attributes
class HeronComponentSpec(object):
"""Class to specify the information and location of components in a topology
This class is generated by the ``spec()`` method of Spout and Bolt class and
specifies how this component is located in the topology and how it is connected to
the other components. This class also retains the Python class path of the component,
so pex_loader can load the component appropriately.
"""
def __init__(self, name, python_class_path, is_spout, par,
inputs=None, outputs=None, config=None):
self._sanitize_args(name, python_class_path, is_spout, par)
self.name = name
self.python_class_path = python_class_path
self.is_spout = is_spout
self.parallelism = par
# inputs, outputs, config will be sanitized later
self.inputs = inputs
self.outputs = outputs
self.custom_config = config
# This is used for identification, especially when name is not specified by argument
# Note that ``self.name`` might not be available until it is set by TopologyType metaclass
# so this is necessary for identification purposes. Used mainly by GlobalStreamId.
self.uuid = str(uuid.uuid4())
@staticmethod
def _sanitize_args(name, py_class_path, is_spout, par):
# name can be None at the time this spec is initialized
assert name is None or isinstance(name, str)
assert isinstance(py_class_path, str)
assert isinstance(is_spout, bool)
assert isinstance(par, int) and par > 0
def get_protobuf(self):
"""Returns protobuf message (Spout or Bolt) of this component"""
if self.is_spout:
return self._get_spout()
else:
return self._get_bolt()
def _get_spout(self):
"""Returns Spout protobuf message"""
spout = topology_pb2.Spout()
spout.comp.CopyFrom(self._get_base_component())
# Add output streams
self._add_out_streams(spout)
return spout
def _get_bolt(self):
"""Returns Bolt protobuf message"""
bolt = topology_pb2.Bolt()
bolt.comp.CopyFrom(self._get_base_component())
# Add streams
self._add_in_streams(bolt)
self._add_out_streams(bolt)
return bolt
def _get_base_component(self):
"""Returns Component protobuf message"""
comp = topology_pb2.Component()
comp.name = self.name
comp.spec = topology_pb2.ComponentObjectSpec.Value("PYTHON_CLASS_NAME")
comp.class_name = self.python_class_path
comp.config.CopyFrom(self._get_comp_config())
return comp
def _get_comp_config(self):
"""Returns component-specific Config protobuf message
It first adds ``topology.component.parallelism``, and is overriden by
a user-defined component-specific configuration, specified by spec().
"""
proto_config = topology_pb2.Config()
# first add parallelism
key = proto_config.kvs.add()
key.key = TOPOLOGY_COMPONENT_PARALLELISM
key.value = str(self.parallelism)
key.type = topology_pb2.ConfigValueType.Value("STRING_VALUE")
# iterate through self.custom_config
if self.custom_config is not None:
sanitized = self._sanitize_config(self.custom_config)
for key, value in list(sanitized.items()):
if isinstance(value, str):
kvs = proto_config.kvs.add()
kvs.key = key
kvs.value = value
kvs.type = topology_pb2.ConfigValueType.Value("STRING_VALUE")
else:
# need to serialize
kvs = proto_config.kvs.add()
kvs.key = key
kvs.serialized_value = default_serializer.serialize(value)
kvs.type = topology_pb2.ConfigValueType.Value("PYTHON_SERIALIZED_VALUE")
return proto_config
@staticmethod
def _sanitize_config(custom_config):
"""Checks whether ``custom_config`` is sane and returns a sanitized dict <str -> (str|object)>
It checks if keys are all strings and sanitizes values of a given dictionary as follows:
- If string, number or boolean is given as a value, it is converted to string.
For string and number (int, float), it is converted to string by a built-in ``str()`` method.
For a boolean value, ``True`` is converted to "true" instead of "True", and ``False`` is
converted to "false" instead of "False", in order to keep the consistency with
Java configuration.
- If neither of the above is given as a value, it is inserted into the sanitized dict as it is.
These values will need to be serialized before adding to a protobuf message.
"""
if not isinstance(custom_config, dict):
raise TypeError("Component-specific configuration must be given as a dict type, given: %s"
% str(type(custom_config)))
sanitized = {}
for key, value in list(custom_config.items()):
if not isinstance(key, str):
raise TypeError("Key for component-specific configuration must be string, given: %s:%s"
% (str(type(key)), str(key)))
if isinstance(value, bool):
sanitized[key] = "true" if value else "false"
elif isinstance(value, (str, int, float)):
sanitized[key] = str(value)
else:
sanitized[key] = value
return sanitized
def _add_in_streams(self, bolt):
"""Adds inputs to a given protobuf Bolt message"""
if self.inputs is None:
return
# sanitize inputs and get a map <GlobalStreamId -> Grouping>
input_dict = self._sanitize_inputs()
for global_streamid, gtype in list(input_dict.items()):
in_stream = bolt.inputs.add()
in_stream.stream.CopyFrom(self._get_stream_id(global_streamid.component_id,
global_streamid.stream_id))
if isinstance(gtype, Grouping.FIELDS):
# it's a field grouping
in_stream.gtype = gtype.gtype
in_stream.grouping_fields.CopyFrom(self._get_stream_schema(gtype.fields))
elif isinstance(gtype, Grouping.CUSTOM):
# it's a custom grouping
in_stream.gtype = gtype.gtype
in_stream.custom_grouping_object = gtype.python_serialized
in_stream.type = topology_pb2.CustomGroupingObjectType.Value("PYTHON_OBJECT")
else:
in_stream.gtype = gtype
# pylint: disable=too-many-branches
def _sanitize_inputs(self):
"""Sanitizes input fields and returns a map <GlobalStreamId -> Grouping>"""
ret = {}
if self.inputs is None:
return
if isinstance(self.inputs, dict):
# inputs are dictionary, must be either <HeronComponentSpec -> Grouping> or
# <GlobalStreamId -> Grouping>
for key, grouping in list(self.inputs.items()):
if not Grouping.is_grouping_sane(grouping):
raise ValueError('A given grouping is not supported')
if isinstance(key, HeronComponentSpec):
# use default streamid
if key.name is None:
# should not happen as TopologyType metaclass sets name attribute
# before calling this method
raise RuntimeError("In _sanitize_inputs(): HeronComponentSpec doesn't have a name")
global_streamid = GlobalStreamId(key.name, Stream.DEFAULT_STREAM_ID)
ret[global_streamid] = grouping
elif isinstance(key, GlobalStreamId):
ret[key] = grouping
else:
raise ValueError("%s is not supported as a key to inputs" % str(key))
elif isinstance(self.inputs, (list, tuple)):
# inputs are lists, must be either a list of HeronComponentSpec or GlobalStreamId
# will use SHUFFLE grouping
for input_obj in self.inputs:
if isinstance(input_obj, HeronComponentSpec):
if input_obj.name is None:
# should not happen as TopologyType metaclass sets name attribute
# before calling this method
raise RuntimeError("In _sanitize_inputs(): HeronComponentSpec doesn't have a name")
global_streamid = GlobalStreamId(input_obj.name, Stream.DEFAULT_STREAM_ID)
ret[global_streamid] = Grouping.SHUFFLE
elif isinstance(input_obj, GlobalStreamId):
ret[input_obj] = Grouping.SHUFFLE
else:
raise ValueError("%s is not supported as an input" % str(input_obj))
else:
raise TypeError("Inputs must be a list, dict, or None, given: %s" % str(self.inputs))
return ret
def _add_out_streams(self, spbl):
"""Adds outputs to a given protobuf Bolt or Spout message"""
if self.outputs is None:
return
# sanitize outputs and get a map <stream_id -> out fields>
output_map = self._sanitize_outputs()
for stream_id, out_fields in list(output_map.items()):
out_stream = spbl.outputs.add()
out_stream.stream.CopyFrom(self._get_stream_id(self.name, stream_id))
out_stream.schema.CopyFrom(self._get_stream_schema(out_fields))
def _sanitize_outputs(self):
"""Sanitizes output fields and returns a map <stream_id -> list of output fields>"""
ret = {}
if self.outputs is None:
return
if not isinstance(self.outputs, (list, tuple)):
raise TypeError("Argument to outputs must be either list or tuple, given: %s"
% str(type(self.outputs)))
for output in self.outputs:
if not isinstance(output, (str, Stream)):
raise TypeError("Outputs must be a list of strings or Streams, given: %s" % str(output))
if isinstance(output, str):
# it's a default stream
if Stream.DEFAULT_STREAM_ID not in ret:
ret[Stream.DEFAULT_STREAM_ID] = list()
ret[Stream.DEFAULT_STREAM_ID].append(output)
else:
# output is a Stream object
if output.stream_id == Stream.DEFAULT_STREAM_ID and Stream.DEFAULT_STREAM_ID in ret:
# some default stream fields are already in there
ret[Stream.DEFAULT_STREAM_ID].extend(output.fields)
else:
ret[output.stream_id] = output.fields
return ret
def get_out_streamids(self):
"""Returns a set of output stream ids registered for this component"""
if self.outputs is None:
return set()
if not isinstance(self.outputs, (list, tuple)):
raise TypeError("Argument to outputs must be either list or tuple, given: %s"
% str(type(self.outputs)))
ret_lst = []
for output in self.outputs:
if not isinstance(output, (str, Stream)):
raise TypeError("Outputs must be a list of strings or Streams, given: %s" % str(output))
ret_lst.append(Stream.DEFAULT_STREAM_ID if isinstance(output, str) else output.stream_id)
return set(ret_lst)
def __getitem__(self, stream_id):
"""Get GlobalStreamId for a given stream_id"""
if stream_id not in self.get_out_streamids():
raise ValueError("A given stream id does not exist on this component: %s" % stream_id)
component_id = self.name or self
return GlobalStreamId(componentId=component_id, streamId=stream_id)
@staticmethod
def _get_stream_id(comp_name, stream_id):
"""Returns a StreamId protobuf message"""
proto_stream_id = topology_pb2.StreamId()
proto_stream_id.id = stream_id
proto_stream_id.component_name = comp_name
return proto_stream_id
@staticmethod
def _get_stream_schema(fields):
"""Returns a StreamSchema protobuf message"""
stream_schema = topology_pb2.StreamSchema()
for field in fields:
key = stream_schema.keys.add()
key.key = field
key.type = topology_pb2.Type.Value("OBJECT")
return stream_schema
class GlobalStreamId(object):
"""Wrapper class to define stream_id and its component name
Constructor method is compatible with StreamParse's GlobalStreamId class, although
the object itself is completely different, as Heron does not use Thrift.
This is mainly used for declaring input fields when defining a topology, and internally
in HeronComponentSpec.
Note that topology writers never have to create an instance of this class by themselves,
as it is created automatically.
"""
def __init__(self, componentId, streamId):
"""
:type componentId: str or HeronComponentSpec
:param componentId: component id from which the tuple is emitted, or HeronComponentSpec object.
:type streamId: str
:param streamId: stream id through which the tuple is transmitted
"""
if not isinstance(componentId, (str, HeronComponentSpec)):
raise TypeError('GlobalStreamId: componentId must be either string or HeronComponentSpec')
if not isinstance(streamId, str):
raise TypeError('GlobalStreamId: streamId must be string type')
self._component_id = componentId
self.stream_id = streamId
@property
def component_id(self):
"""Returns component_id of this GlobalStreamId
Note that if HeronComponentSpec is specified as componentId and its name is not yet
available (i.e. when ``name`` argument was not given in ``spec()`` method in Bolt or Spout),
this property returns a message with uuid. However, this is provided only for safety
with __eq__(), __str__(), and __hash__() methods, and not meant to be called explicitly
before TopologyType class finally sets the name attribute of HeronComponentSpec.
"""
if isinstance(self._component_id, HeronComponentSpec):
if self._component_id.name is None:
# HeronComponentSpec instance's name attribute might not be available until
# TopologyType metaclass finally sets it. This statement is to support __eq__(),
# __hash__() and __str__() methods with safety, as raising Exception is not
# appropriate this case.
return "<No name available for HeronComponentSpec yet, uuid: %s>" % self._component_id.uuid
return self._component_id.name
elif isinstance(self._component_id, str):
return self._component_id
else:
raise ValueError("Component Id for this GlobalStreamId is not properly set: <%s:%s>"
% (str(type(self._component_id)), str(self._component_id)))
def __eq__(self, other):
return hasattr(other, 'component_id') and self.component_id == other.component_id \
and hasattr(other, 'stream_id') and self.stream_id == other.stream_id
def __hash__(self):
return hash(self.__str__())
def __str__(self):
return "%s:%s" % (self.component_id, self.stream_id)