| # Copyright 2016 Twitter. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| '''component_spec.py''' |
| import uuid |
| |
| from heronpy.api.serializer import default_serializer |
| from heronpy.api.api_constants import TOPOLOGY_COMPONENT_PARALLELISM |
| from heronpy.proto import topology_pb2 |
| |
| from heronpy.api.stream import Stream, Grouping |
| |
| # pylint: disable=too-many-instance-attributes |
| class HeronComponentSpec(object): |
| """Class to specify the information and location of components in a topology |
| |
| This class is generated by the ``spec()`` method of Spout and Bolt class and |
| specifies how this component is located in the topology and how it is connected to |
| the other components. This class also retains the Python class path of the component, |
| so pex_loader can load the component appropriately. |
| """ |
| def __init__(self, name, python_class_path, is_spout, par, |
| inputs=None, outputs=None, config=None): |
| self._sanitize_args(name, python_class_path, is_spout, par) |
| |
| self.name = name |
| self.python_class_path = python_class_path |
| self.is_spout = is_spout |
| self.parallelism = par |
| |
| # inputs, outputs, config will be sanitized later |
| self.inputs = inputs |
| self.outputs = outputs |
| self.custom_config = config |
| |
| # This is used for identification, especially when name is not specified by argument |
| # Note that ``self.name`` might not be available until it is set by TopologyType metaclass |
| # so this is necessary for identification purposes. Used mainly by GlobalStreamId. |
| self.uuid = str(uuid.uuid4()) |
| |
| @staticmethod |
| def _sanitize_args(name, py_class_path, is_spout, par): |
| # name can be None at the time this spec is initialized |
| assert name is None or isinstance(name, str) |
| assert isinstance(py_class_path, str) |
| assert isinstance(is_spout, bool) |
| assert isinstance(par, int) and par > 0 |
| |
| def get_protobuf(self): |
| """Returns protobuf message (Spout or Bolt) of this component""" |
| if self.is_spout: |
| return self._get_spout() |
| else: |
| return self._get_bolt() |
| |
| def _get_spout(self): |
| """Returns Spout protobuf message""" |
| spout = topology_pb2.Spout() |
| spout.comp.CopyFrom(self._get_base_component()) |
| |
| # Add output streams |
| self._add_out_streams(spout) |
| return spout |
| |
| def _get_bolt(self): |
| """Returns Bolt protobuf message""" |
| bolt = topology_pb2.Bolt() |
| bolt.comp.CopyFrom(self._get_base_component()) |
| |
| # Add streams |
| self._add_in_streams(bolt) |
| self._add_out_streams(bolt) |
| return bolt |
| |
| def _get_base_component(self): |
| """Returns Component protobuf message""" |
| comp = topology_pb2.Component() |
| comp.name = self.name |
| comp.spec = topology_pb2.ComponentObjectSpec.Value("PYTHON_CLASS_NAME") |
| comp.class_name = self.python_class_path |
| comp.config.CopyFrom(self._get_comp_config()) |
| return comp |
| |
| def _get_comp_config(self): |
| """Returns component-specific Config protobuf message |
| |
| It first adds ``topology.component.parallelism``, and is overriden by |
| a user-defined component-specific configuration, specified by spec(). |
| """ |
| proto_config = topology_pb2.Config() |
| |
| # first add parallelism |
| key = proto_config.kvs.add() |
| key.key = TOPOLOGY_COMPONENT_PARALLELISM |
| key.value = str(self.parallelism) |
| key.type = topology_pb2.ConfigValueType.Value("STRING_VALUE") |
| |
| # iterate through self.custom_config |
| if self.custom_config is not None: |
| sanitized = self._sanitize_config(self.custom_config) |
| for key, value in sanitized.items(): |
| if isinstance(value, str): |
| kvs = proto_config.kvs.add() |
| kvs.key = key |
| kvs.value = value |
| kvs.type = topology_pb2.ConfigValueType.Value("STRING_VALUE") |
| else: |
| # need to serialize |
| kvs = proto_config.kvs.add() |
| kvs.key = key |
| kvs.serialized_value = default_serializer.serialize(value) |
| kvs.type = topology_pb2.ConfigValueType.Value("PYTHON_SERIALIZED_VALUE") |
| |
| return proto_config |
| |
| @staticmethod |
| def _sanitize_config(custom_config): |
| """Checks whether ``custom_config`` is sane and returns a sanitized dict <str -> (str|object)> |
| |
| It checks if keys are all strings and sanitizes values of a given dictionary as follows: |
| |
| - If string, number or boolean is given as a value, it is converted to string. |
| For string and number (int, float), it is converted to string by a built-in ``str()`` method. |
| For a boolean value, ``True`` is converted to "true" instead of "True", and ``False`` is |
| converted to "false" instead of "False", in order to keep the consistency with |
| Java configuration. |
| |
| - If neither of the above is given as a value, it is inserted into the sanitized dict as it is. |
| These values will need to be serialized before adding to a protobuf message. |
| """ |
| if not isinstance(custom_config, dict): |
| raise TypeError("Component-specific configuration must be given as a dict type, given: %s" |
| % str(type(custom_config))) |
| sanitized = {} |
| for key, value in custom_config.items(): |
| if not isinstance(key, str): |
| raise TypeError("Key for component-specific configuration must be string, given: %s:%s" |
| % (str(type(key)), str(key))) |
| |
| if isinstance(value, bool): |
| sanitized[key] = "true" if value else "false" |
| elif isinstance(value, (str, int, float)): |
| sanitized[key] = str(value) |
| else: |
| sanitized[key] = value |
| |
| return sanitized |
| |
| def _add_in_streams(self, bolt): |
| """Adds inputs to a given protobuf Bolt message""" |
| if self.inputs is None: |
| return |
| # sanitize inputs and get a map <GlobalStreamId -> Grouping> |
| input_dict = self._sanitize_inputs() |
| |
| for global_streamid, gtype in input_dict.items(): |
| in_stream = bolt.inputs.add() |
| in_stream.stream.CopyFrom(self._get_stream_id(global_streamid.component_id, |
| global_streamid.stream_id)) |
| if isinstance(gtype, Grouping.FIELDS): |
| # it's a field grouping |
| in_stream.gtype = gtype.gtype |
| in_stream.grouping_fields.CopyFrom(self._get_stream_schema(gtype.fields)) |
| elif isinstance(gtype, Grouping.CUSTOM): |
| # it's a custom grouping |
| in_stream.gtype = gtype.gtype |
| in_stream.custom_grouping_object = gtype.python_serialized |
| in_stream.type = topology_pb2.CustomGroupingObjectType.Value("PYTHON_OBJECT") |
| else: |
| in_stream.gtype = gtype |
| |
| # pylint: disable=too-many-branches |
| def _sanitize_inputs(self): |
| """Sanitizes input fields and returns a map <GlobalStreamId -> Grouping>""" |
| ret = {} |
| if self.inputs is None: |
| return |
| |
| if isinstance(self.inputs, dict): |
| # inputs are dictionary, must be either <HeronComponentSpec -> Grouping> or |
| # <GlobalStreamId -> Grouping> |
| for key, grouping in self.inputs.items(): |
| if not Grouping.is_grouping_sane(grouping): |
| raise ValueError('A given grouping is not supported') |
| if isinstance(key, HeronComponentSpec): |
| # use default streamid |
| if key.name is None: |
| # should not happen as TopologyType metaclass sets name attribute |
| # before calling this method |
| raise RuntimeError("In _sanitize_inputs(): HeronComponentSpec doesn't have a name") |
| global_streamid = GlobalStreamId(key.name, Stream.DEFAULT_STREAM_ID) |
| ret[global_streamid] = grouping |
| elif isinstance(key, GlobalStreamId): |
| ret[key] = grouping |
| else: |
| raise ValueError("%s is not supported as a key to inputs" % str(key)) |
| elif isinstance(self.inputs, (list, tuple)): |
| # inputs are lists, must be either a list of HeronComponentSpec or GlobalStreamId |
| # will use SHUFFLE grouping |
| for input_obj in self.inputs: |
| if isinstance(input_obj, HeronComponentSpec): |
| if input_obj.name is None: |
| # should not happen as TopologyType metaclass sets name attribute |
| # before calling this method |
| raise RuntimeError("In _sanitize_inputs(): HeronComponentSpec doesn't have a name") |
| global_streamid = GlobalStreamId(input_obj.name, Stream.DEFAULT_STREAM_ID) |
| ret[global_streamid] = Grouping.SHUFFLE |
| elif isinstance(input_obj, GlobalStreamId): |
| ret[input_obj] = Grouping.SHUFFLE |
| else: |
| raise ValueError("%s is not supported as an input" % str(input_obj)) |
| else: |
| raise TypeError("Inputs must be a list, dict, or None, given: %s" % str(self.inputs)) |
| |
| return ret |
| |
| def _add_out_streams(self, spbl): |
| """Adds outputs to a given protobuf Bolt or Spout message""" |
| if self.outputs is None: |
| return |
| |
| # sanitize outputs and get a map <stream_id -> out fields> |
| output_map = self._sanitize_outputs() |
| |
| for stream_id, out_fields in output_map.items(): |
| out_stream = spbl.outputs.add() |
| out_stream.stream.CopyFrom(self._get_stream_id(self.name, stream_id)) |
| out_stream.schema.CopyFrom(self._get_stream_schema(out_fields)) |
| |
| def _sanitize_outputs(self): |
| """Sanitizes output fields and returns a map <stream_id -> list of output fields>""" |
| ret = {} |
| if self.outputs is None: |
| return |
| |
| if not isinstance(self.outputs, (list, tuple)): |
| raise TypeError("Argument to outputs must be either list or tuple, given: %s" |
| % str(type(self.outputs))) |
| |
| for output in self.outputs: |
| if not isinstance(output, (str, Stream)): |
| raise TypeError("Outputs must be a list of strings or Streams, given: %s" % str(output)) |
| |
| if isinstance(output, str): |
| # it's a default stream |
| if Stream.DEFAULT_STREAM_ID not in ret: |
| ret[Stream.DEFAULT_STREAM_ID] = list() |
| ret[Stream.DEFAULT_STREAM_ID].append(output) |
| else: |
| # output is a Stream object |
| if output.stream_id == Stream.DEFAULT_STREAM_ID and Stream.DEFAULT_STREAM_ID in ret: |
| # some default stream fields are already in there |
| ret[Stream.DEFAULT_STREAM_ID].extend(output.fields) |
| else: |
| ret[output.stream_id] = output.fields |
| return ret |
| |
| def get_out_streamids(self): |
| """Returns a set of output stream ids registered for this component""" |
| if self.outputs is None: |
| return set() |
| |
| if not isinstance(self.outputs, (list, tuple)): |
| raise TypeError("Argument to outputs must be either list or tuple, given: %s" |
| % str(type(self.outputs))) |
| ret_lst = [] |
| for output in self.outputs: |
| if not isinstance(output, (str, Stream)): |
| raise TypeError("Outputs must be a list of strings or Streams, given: %s" % str(output)) |
| ret_lst.append(Stream.DEFAULT_STREAM_ID if isinstance(output, str) else output.stream_id) |
| return set(ret_lst) |
| |
| def __getitem__(self, stream_id): |
| """Get GlobalStreamId for a given stream_id""" |
| if stream_id not in self.get_out_streamids(): |
| raise ValueError("A given stream id does not exist on this component: %s" % stream_id) |
| |
| component_id = self.name or self |
| return GlobalStreamId(componentId=component_id, streamId=stream_id) |
| |
| @staticmethod |
| def _get_stream_id(comp_name, stream_id): |
| """Returns a StreamId protobuf message""" |
| proto_stream_id = topology_pb2.StreamId() |
| proto_stream_id.id = stream_id |
| proto_stream_id.component_name = comp_name |
| return proto_stream_id |
| |
| @staticmethod |
| def _get_stream_schema(fields): |
| """Returns a StreamSchema protobuf message""" |
| stream_schema = topology_pb2.StreamSchema() |
| for field in fields: |
| key = stream_schema.keys.add() |
| key.key = field |
| key.type = topology_pb2.Type.Value("OBJECT") |
| |
| return stream_schema |
| |
| class GlobalStreamId(object): |
| """Wrapper class to define stream_id and its component name |
| |
| Constructor method is compatible with StreamParse's GlobalStreamId class, although |
| the object itself is completely different, as Heron does not use Thrift. |
| This is mainly used for declaring input fields when defining a topology, and internally |
| in HeronComponentSpec. |
| |
| Note that topology writers never have to create an instance of this class by themselves, |
| as it is created automatically. |
| """ |
| def __init__(self, componentId, streamId): |
| """ |
| :type componentId: str or HeronComponentSpec |
| :param componentId: component id from which the tuple is emitted, or HeronComponentSpec object. |
| :type streamId: str |
| :param streamId: stream id through which the tuple is transmitted |
| """ |
| if not isinstance(componentId, (str, HeronComponentSpec)): |
| raise TypeError('GlobalStreamId: componentId must be either string or HeronComponentSpec') |
| if not isinstance(streamId, str): |
| raise TypeError('GlobalStreamId: streamId must be string type') |
| |
| self._component_id = componentId |
| self.stream_id = streamId |
| |
| @property |
| def component_id(self): |
| """Returns component_id of this GlobalStreamId |
| |
| Note that if HeronComponentSpec is specified as componentId and its name is not yet |
| available (i.e. when ``name`` argument was not given in ``spec()`` method in Bolt or Spout), |
| this property returns a message with uuid. However, this is provided only for safety |
| with __eq__(), __str__(), and __hash__() methods, and not meant to be called explicitly |
| before TopologyType class finally sets the name attribute of HeronComponentSpec. |
| """ |
| if isinstance(self._component_id, HeronComponentSpec): |
| if self._component_id.name is None: |
| # HeronComponentSpec instance's name attribute might not be available until |
| # TopologyType metaclass finally sets it. This statement is to support __eq__(), |
| # __hash__() and __str__() methods with safety, as raising Exception is not |
| # appropriate this case. |
| return "<No name available for HeronComponentSpec yet, uuid: %s>" % self._component_id.uuid |
| return self._component_id.name |
| elif isinstance(self._component_id, str): |
| return self._component_id |
| else: |
| raise ValueError("Component Id for this GlobalStreamId is not properly set: <%s:%s>" |
| % (str(type(self._component_id)), str(self._component_id))) |
| |
| def __eq__(self, other): |
| return hasattr(other, 'component_id') and self.component_id == other.component_id \ |
| and hasattr(other, 'stream_id') and self.stream_id == other.stream_id |
| |
| def __hash__(self): |
| return hash(self.__str__()) |
| |
| def __str__(self): |
| return "%s:%s" % (self.component_id, self.stream_id) |