| # Copyright 2016 Twitter. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # pylint: disable=missing-docstring |
| import unittest |
| |
| import heron.common.tests.python.utils.mock_generator as mock_generator |
| |
| from heron.proto import tuple_pb2 |
| |
| class OutgoingTupleHelperTest(unittest.TestCase): |
| DEFAULT_STREAM_ID = "stream_id" |
| def setUp(self): |
| pass |
| |
| def test_sample_success(self): |
| out_helper = mock_generator.MockOutgoingTupleHelper() |
| |
| prim_data_tuple, size = mock_generator.make_data_tuple_from_list(mock_generator.prim_list) |
| |
| # Check adding data tuples and try sending out |
| |
| out_helper.add_data_tuple(self.DEFAULT_STREAM_ID, prim_data_tuple, size) |
| # check if init_new_data_tuple() was properly called |
| self.assertTrue(out_helper.called_init_new_data) |
| self.assertEqual(out_helper.current_data_tuple_set.stream.id, self.DEFAULT_STREAM_ID) |
| # check if it was properly added |
| self.assertEqual(out_helper.current_data_tuple_size_in_bytes, size) |
| # try sending out |
| out_helper.send_out_tuples() |
| self.assertEqual(out_helper.current_data_tuple_size_in_bytes, 0) |
| self.assertEqual(out_helper.total_data_emitted_in_bytes, size) |
| self.assertIsNone(out_helper.current_data_tuple_set) |
| |
| sent_data_tuple_set = out_helper.out_stream.poll().data |
| self.assertEqual(sent_data_tuple_set.stream.id, self.DEFAULT_STREAM_ID) |
| dtuple = tuple_pb2.HeronDataTuple() |
| dtuple.ParseFromString(sent_data_tuple_set.tuples[0]) |
| self.assertEqual(dtuple, prim_data_tuple) |