blob: ba73f40262987091d4709d54f5ac97cf47417fb2 [file] [log] [blame]
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=missing-docstring
import unittest
import heron.common.tests.python.utils.mock_generator as mock_generator
from heron.proto import tuple_pb2
class OutgoingTupleHelperTest(unittest.TestCase):
DEFAULT_STREAM_ID = "stream_id"
def setUp(self):
pass
def test_sample_success(self):
out_helper = mock_generator.MockOutgoingTupleHelper()
prim_data_tuple, size = mock_generator.make_data_tuple_from_list(mock_generator.prim_list)
# Check adding data tuples and try sending out
out_helper.add_data_tuple(self.DEFAULT_STREAM_ID, prim_data_tuple, size)
# check if init_new_data_tuple() was properly called
self.assertTrue(out_helper.called_init_new_data)
self.assertEqual(out_helper.current_data_tuple_set.stream.id, self.DEFAULT_STREAM_ID)
# check if it was properly added
self.assertEqual(out_helper.current_data_tuple_size_in_bytes, size)
# try sending out
out_helper.send_out_tuples()
self.assertEqual(out_helper.current_data_tuple_size_in_bytes, 0)
self.assertEqual(out_helper.total_data_emitted_in_bytes, size)
self.assertIsNone(out_helper.current_data_tuple_set)
sent_data_tuple_set = out_helper.out_stream.poll().data
self.assertEqual(sent_data_tuple_set.stream.id, self.DEFAULT_STREAM_ID)
dtuple = tuple_pb2.HeronDataTuple()
dtuple.ParseFromString(sent_data_tuple_set.tuples[0])
self.assertEqual(dtuple, prim_data_tuple)