blob: d3adcd4116cf61e0e64bec34b045f76c434985b5 [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from typing import Callable
from unittest import TestCase, main
import time
from pulsar import Client
from pulsar.schema.schema import StringSchema
class TableViewTest(TestCase):
def setUp(self):
self._client: Client = Client('pulsar://localhost:6650')
def tearDown(self):
self._client.close()
def test_get(self):
topic = f'table_view_test_get-{time.time()}'
table_view = self._client.create_table_view(topic)
self.assertEqual(len(table_view), 0)
producer = self._client.create_producer(topic)
producer.send(b'value-0', partition_key='key-0')
producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
self.assertEqual(table_view.get('key-0'), b'value-0')
self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0')
producer.send(b'value-1', partition_key='key-0')
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1'))
producer.close()
table_view.close()
def test_for_each(self):
topic = f'table_view_test_for_each-{time.time()}'
table_view = self._client.create_table_view(topic)
producer = self._client.create_producer(topic)
producer.send(b'value-0', partition_key='key-0')
producer.send(b'value-1', partition_key='key-1')
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
d = dict()
table_view.for_each(lambda key, value: d.__setitem__(key, value))
self.assertEqual(d, {
'key-0': b'value-0',
'key-1': b'value-1'
})
def listener(key: str, value: str):
if len(value) == 0:
d.pop(key)
else:
d[key] = value
d.clear()
table_view.for_each_and_listen(listener)
self.assertEqual(d, {
'key-0': b'value-0',
'key-1': b'value-1'
})
producer.send(b'value-0-new', partition_key='key-0')
producer.send(b'', partition_key='key-1')
producer.send(b'value-2', partition_key='key-2')
def assert_latest_values():
self.assertEqual(d, {
'key-0': b'value-0-new',
'key-2': b'value-2'
})
self._wait_for_assertion(assert_latest_values)
def test_schema(self):
topic = f'table_view_test_schema-{time.time()}'
table_view = self._client.create_table_view(topic, schema=StringSchema())
producer = self._client.create_producer(topic, schema=StringSchema())
producer.send('value', partition_key='key')
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'value'))
self.assertEqual(table_view.get('missed-key'), None)
entries = dict()
table_view.for_each(lambda key, value: entries.__setitem__(key, value))
self.assertEqual(entries, {'key': 'value'})
entries.clear()
table_view.for_each_and_listen(lambda key, value: entries.__setitem__(key, value))
self.assertEqual(entries, {'key': 'value'})
producer.send('new-value', partition_key='key')
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'new-value'))
def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
start_time = time.time()
while time.time() - start_time < timeout:
try:
assertion()
return
except AssertionError:
time.sleep(0.1)
assertion()
if __name__ == "__main__":
main()