blob: f8b5f46f38fbb5f68d6fbdb50ae166b29136ae9f [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=missing-docstring
# pylint: disable=protected-access
import unittest
from heronpy.api.component.component_spec import HeronComponentSpec, GlobalStreamId
from heronpy.api.stream import Grouping, Stream
class ComponentSpecTest(unittest.TestCase):
def test_sanitize_args(self):
# good args
spec = HeronComponentSpec(
name="string", python_class_path="string.path",
is_spout=True,
par=1
)
self.assertIsNotNone(spec)
name_none_spec = HeronComponentSpec(name=None, python_class_path="string.path",
is_spout=True, par=1)
self.assertIsNotNone(name_none_spec)
# bad name
with self.assertRaises(AssertionError):
HeronComponentSpec(123, "classpath", True, 1)
with self.assertRaises(AssertionError):
HeronComponentSpec(False, "classpath", True, 1)
# bad classpath
with self.assertRaises(AssertionError):
HeronComponentSpec("name", {}, True, 1)
with self.assertRaises(AssertionError):
HeronComponentSpec("name", None, True, 1)
# bad is_spout
with self.assertRaises(AssertionError):
HeronComponentSpec("name", "classpath", 1, 1)
with self.assertRaises(AssertionError):
HeronComponentSpec("name", "classpath", None, 1)
# bad par
with self.assertRaises(AssertionError):
HeronComponentSpec("name", "classpath", True, "1")
with self.assertRaises(AssertionError):
HeronComponentSpec("name", "classpath", True, 1.35)
with self.assertRaises(AssertionError):
HeronComponentSpec("name", "classpath", True, -21)
with self.assertRaises(AssertionError):
HeronComponentSpec("name", "classpath", True, None)
def test_sanitize_config(self):
# empty dict
ret = HeronComponentSpec._sanitize_config({})
self.assertEqual(ret, {})
# non-dict given
with self.assertRaises(TypeError):
HeronComponentSpec._sanitize_config("{key: value}")
with self.assertRaises(TypeError):
HeronComponentSpec._sanitize_config(True)
with self.assertRaises(TypeError):
HeronComponentSpec._sanitize_config(None)
# non-string key
with self.assertRaises(TypeError):
HeronComponentSpec._sanitize_config({['k', 'e', 'y']: "value"})
with self.assertRaises(TypeError):
HeronComponentSpec._sanitize_config({None: "value"})
# convert boolean value
ret = HeronComponentSpec._sanitize_config({"key": True})
self.assertEqual(ret["key"], "true")
ret = HeronComponentSpec._sanitize_config({"key": False})
self.assertEqual(ret["key"], "false")
# convert int and float
ret = HeronComponentSpec._sanitize_config({"key": 10})
self.assertEqual(ret["key"], "10")
ret = HeronComponentSpec._sanitize_config({"key": -2400000})
self.assertEqual(ret["key"], "-2400000")
ret = HeronComponentSpec._sanitize_config({"key": 0.0000001})
self.assertEqual(ret["key"], "1e-07")
ret = HeronComponentSpec._sanitize_config({"key": -15.33333})
self.assertEqual(ret["key"], "-15.33333")
# non-string value -> should expect the same object
ret = HeronComponentSpec._sanitize_config({"key": ['v', 'a', 'l', 'u', 'e']})
self.assertEqual(ret["key"], ['v', 'a', 'l', 'u', 'e'])
ret = HeronComponentSpec._sanitize_config({"key": None})
self.assertEqual(ret["key"], None)
def test_sanitize_inputs(self):
# Note that _sanitize_inputs() should only be called after HeronComponentSpec's
# name attribute is set
# invalid inputs given as argument (valid ones are either dict, list, tuple or None)
invalid_spec = HeronComponentSpec("name", "classpath", True, 1, inputs="string")
with self.assertRaises(TypeError):
invalid_spec._sanitize_inputs()
invalid_spec = HeronComponentSpec("name", "classpath", True, 1, inputs=100)
with self.assertRaises(TypeError):
invalid_spec._sanitize_inputs()
# dict <HeronComponentSpec -> Grouping>
from_spec = HeronComponentSpec("spout", "sp_clspath", True, 1)
to_spec = HeronComponentSpec("bolt", "bl_clspath", False, 1,
inputs={from_spec: Grouping.SHUFFLE})
ret = to_spec._sanitize_inputs()
self.assertEqual(ret, {GlobalStreamId("spout", "default"): Grouping.SHUFFLE})
from_spec = HeronComponentSpec("spout", "sp_clspath", True, 1)
from_spec.outputs = [Stream(name='another_stream')]
to_spec = HeronComponentSpec("bolt", "bl_clspath", False, 1,
inputs={from_spec['another_stream']: Grouping.ALL})
ret = to_spec._sanitize_inputs()
self.assertEqual(ret, {GlobalStreamId("spout", "another_stream"): Grouping.ALL})
# HeronComponentSpec's name attribute not set
from_spec = HeronComponentSpec(None, "sp_clspath", True, 1)
to_spec = HeronComponentSpec("bolt", "bl_clspath", False, 1,
inputs={from_spec: Grouping.ALL})
with self.assertRaises(RuntimeError):
to_spec._sanitize_inputs()
# dict <GlobalStreamId -> Grouping>
inputs_dict = {GlobalStreamId("some_spout", "some_stream"): Grouping.NONE,
GlobalStreamId("another_spout", "default"): Grouping.fields(['word', 'count'])}
spec = HeronComponentSpec("bolt", "classpath", False, 1, inputs=inputs_dict)
ret = spec._sanitize_inputs()
self.assertEqual(ret, inputs_dict)
# list of HeronComponentSpec
from_spec1 = HeronComponentSpec("spout1", "sp1_cls", True, 1)
from_spec2 = HeronComponentSpec("spout2", "sp2_cls", True, 1)
to_spec = HeronComponentSpec("bolt", "bl_cls", False, 1, inputs=[from_spec1, from_spec2])
ret = to_spec._sanitize_inputs()
self.assertEqual(ret, {GlobalStreamId("spout1", "default"): Grouping.SHUFFLE,
GlobalStreamId("spout2", "default"): Grouping.SHUFFLE})
# HeronComponentSpec's name attribute not set
from_spec = HeronComponentSpec(None, "sp_clspath", True, 1)
to_spec = HeronComponentSpec("bolt", "bl_clspath", False, 1, inputs=[from_spec])
with self.assertRaises(RuntimeError):
to_spec._sanitize_inputs()
# list of GlobalStreamId
inputs_list = [GlobalStreamId("spout1", "default"), GlobalStreamId("spout2", "some_stream")]
spec = HeronComponentSpec("bolt", "bl_cls", False, 1, inputs=inputs_list)
ret = spec._sanitize_inputs()
self.assertEqual(ret, dict(list(zip(inputs_list, [Grouping.SHUFFLE] * 2))))
# list of neither GlobalStreamId nor HeronComponentSpec
inputs_list = [None, 123, "string", [GlobalStreamId("sp", "default")]]
spec = HeronComponentSpec("bolt", "bl_cls", False, 1, inputs=inputs_list)
with self.assertRaises(ValueError):
spec._sanitize_inputs()
# pylint: disable=redefined-variable-type
# pylint: disable=pointless-statement
def test_sanitize_outputs(self):
# outputs is None (no argument to outputs)
spec = HeronComponentSpec("spout", "class", True, 1)
ret = spec._sanitize_outputs()
self.assertIsNone(ret)
# outputs neither list nor tuple
spec = HeronComponentSpec("spout", "class", True, 1)
spec.outputs = "string"
with self.assertRaises(TypeError):
spec._sanitize_outputs()
# output list contains a non-string and non-Stream object
spec = HeronComponentSpec("spout", "class", True, 1)
spec.outputs = ["string", False, 123]
with self.assertRaises(TypeError):
spec._sanitize_outputs()
# output list is all string
spec = HeronComponentSpec("spout", "class", True, 1)
spec.outputs = ["string", "hello", "heron"]
ret = spec._sanitize_outputs()
self.assertEqual(ret, {"default": ["string", "hello", "heron"]})
# output list has mixed stream
spec = HeronComponentSpec("spout", "class", True, 1)
spec.outputs = ["string", "hello", Stream(fields=["abc", "def"], name="another_stream"),
Stream(fields=["another", "default"], name="default")]
ret = spec._sanitize_outputs()
self.assertEqual(ret, {"default": ["string", "hello", "another", "default"],
"another_stream": ["abc", "def"]})
def test_get_out_streamids(self):
# outputs is none
spec = HeronComponentSpec("spout", "class", True, 1)
ret = spec.get_out_streamids()
self.assertEqual(ret, set())
# outputs neither list nor tuple
spec = HeronComponentSpec("spout", "class", True, 1)
spec.outputs = "string"
with self.assertRaises(TypeError):
spec.get_out_streamids()
# outputs sane
spec = HeronComponentSpec("spout", "class", True, 1)
spec.outputs = ["string", "hello", Stream(fields=["abc", "def"], name="another_stream"),
Stream(fields=["another", "default"], name="default")]
ret = spec.get_out_streamids()
self.assertEqual(ret, {"default", "another_stream"})
def test_get_item(self):
# HeronComponentSpec name set
spec = HeronComponentSpec("spout", "class", True, 1)
spec.outputs = ["string", "hello", Stream(fields=["abc", "def"], name="another_stream"),
Stream(fields=["another", "default"], name="default")]
ret = spec['another_stream']
self.assertEqual(ret, GlobalStreamId("spout", "another_stream"))
# HeronComponentSpec name not set
spec = HeronComponentSpec(None, "class", True, 1)
spec.outputs = ["string", "hello", Stream(fields=["abc", "def"], name="another_stream"),
Stream(fields=["another", "default"], name="default")]
ret = spec['default']
self.assertEqual(ret, GlobalStreamId(spec, "default"))
# stream id not registered
spec = HeronComponentSpec(None, "class", True, 1)
spec.outputs = ["string", "hello", Stream(fields=["abc", "def"], name="another_stream"),
Stream(fields=["another", "default"], name="default")]
with self.assertRaises(ValueError):
spec['non_existent_stream']
class GlobalStreamIdTest(unittest.TestCase):
def test_constructor(self):
# component id not string nor HeronComponentSpec
with self.assertRaises(TypeError):
GlobalStreamId(componentId=123, streamId="default")
# stream id not string
with self.assertRaises(TypeError):
GlobalStreamId(componentId="component", streamId=12345)
def test_component_id_property(self):
# component id is string
gsi = GlobalStreamId(componentId="component", streamId="stream")
self.assertEqual(gsi.component_id, "component")
# component id is HeronComponentSpec with name
spec = HeronComponentSpec("spout", "class", True, 1)
gsi = GlobalStreamId(spec, "stream")
self.assertEqual(gsi.component_id, "spout")
# component id is HeronComponentSpec without name
spec = HeronComponentSpec(None, "class", True, 1)
gsi = GlobalStreamId(spec, "stream")
# expecting "<No name available for HeronComponentSpec yet, uuid: %s>"
self.assertIn(spec.uuid, gsi.component_id)