Support TableView (#251)
diff --git a/dependencies.yaml b/dependencies.yaml
index 9d9136e..9db4747 100644
--- a/dependencies.yaml
+++ b/dependencies.yaml
@@ -17,7 +17,7 @@
# under the License.
#
-pulsar-cpp: 3.7.0
+pulsar-cpp: 3.7.1
pybind11: 2.10.1
# The OpenSSL dependency is only used when building Python from source
openssl: 1.1.1q
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index f5b2b35..df7cad4 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -54,6 +54,7 @@
from pulsar.__about__ import __version__
from pulsar.exceptions import *
+from pulsar.tableview import TableView
from pulsar.functions.function import Function
from pulsar.functions.context import Context
@@ -1199,6 +1200,42 @@
self._consumers.append(c)
return c
+ def create_table_view(self, topic: str,
+ subscription_name: Optional[str] = None,
+ schema: schema.Schema = schema.BytesSchema()) -> TableView:
+ """
+ Create a table view on a particular topic
+
+ Parameters
+ ----------
+
+ topic: str
+ The name of the topic.
+ subscription_name: str, optional
+ The name of the subscription. If it's not specified, a random subscription name
+ will be used.
+ schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
+ Define the schema of this table view. If the schema is incompatible with the topic's
+ schema, this method will throw an exception. This schema is also used to deserialize
+ the value of messages in the table view.
+
+ Returns
+ -------
+ TableView
+ A table view instance.
+ """
+ _check_type(str, topic, 'topic')
+ _check_type_or_none(str, subscription_name, 'subscription_name')
+ _check_type(_schema.Schema, schema, 'schema')
+
+ tv_conf = _pulsar.TableViewConfiguration()
+ if subscription_name is not None:
+ tv_conf.subscription_name(subscription_name)
+ tv_conf.schema(schema.schema_info())
+ tv = self._client.create_table_view(topic, tv_conf)
+ self._table_view = TableView(tv, topic, subscription_name, schema)
+ return self._table_view
+
def get_topic_partitions(self, topic):
"""
Get the list of partitions for a given topic.
diff --git a/pulsar/tableview.py b/pulsar/tableview.py
new file mode 100644
index 0000000..702bb5b
--- /dev/null
+++ b/pulsar/tableview.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+The TableView implementation.
+"""
+
+from typing import Any, Callable, Optional
+from pulsar.schema.schema import Schema
+import _pulsar
+
+class TableView():
+
+ def __init__(self, table_view: _pulsar.TableView, topic: str,
+ subscription: Optional[str], schema: Schema) -> None:
+ self._table_view = table_view
+ self._topic = topic
+ self._subscription = subscription
+ self._schema = schema
+
+ def get(self, key: str) -> Optional[Any]:
+ """
+ Return the value associated with the given key in the table view.
+
+ Parameters
+ ----------
+ key: str
+ The message key
+
+ Returns
+ -------
+ Optional[Any]
+ The value associated with the key, or None if the key does not exist.
+ """
+ pair = self._table_view.get(key)
+ if pair[0]:
+ return self._schema.decode(pair[1])
+ else:
+ return None
+
+ def for_each(self, callback: Callable[[str, Any], None]) -> None:
+ """
+ Iterate over all entries in the table view and call the callback function
+ with the key and value for each entry.
+
+ Parameters
+ ----------
+ callback: Callable[[str, Any], None]
+ The callback function to call for each entry.
+ """
+ self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v)))
+
+ def for_each_and_listen(self, callback: Callable[[str, Any], None]) -> None:
+ """
+ Iterate over all entries in the table view and call the callback function
+ with the key and value for each entry, then listen for changes. The callback
+ will be called when a new entry is added or an existing entry is updated.
+
+ Parameters
+ ----------
+ callback: Callable[[str, Any], None]
+ The callback function to call for each entry.
+ """
+ self._table_view.for_each_and_listen(lambda k, v: callback(k, self._schema.decode(v)))
+
+ def close(self) -> None:
+ """
+ Close the table view.
+ """
+ self._table_view.close()
+
+ def __len__(self) -> int:
+ """
+ Return the number of entries in the table view.
+ """
+ return self._table_view.size()
+
+ def __str__(self) -> str:
+ if self._subscription is None:
+ return f"TableView(topic={self._topic})"
+ else:
+ return f"TableView(topic={self._topic}, subscription={self._subscription})"
+
+ def __repr__(self) -> str:
+ return self.__str__()
diff --git a/src/client.cc b/src/client.cc
index b25c63a..72c824f 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -89,6 +89,12 @@
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_pattern", &Client_subscribe_pattern)
.def("create_reader", &Client_createReader)
+ .def("create_table_view", [](Client& client, const std::string& topic,
+ const TableViewConfiguration& config) {
+ return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
+ client.createTableViewAsync(topic, config, callback);
+ });
+ })
.def("get_topic_partitions", &Client_getTopicPartitions)
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
diff --git a/src/pulsar.cc b/src/pulsar.cc
index 9bfeb59..6c42f8c 100644
--- a/src/pulsar.cc
+++ b/src/pulsar.cc
@@ -32,6 +32,7 @@
void export_authentication(Module& m);
void export_schema(Module& m);
void export_exceptions(Module& m);
+void export_table_view(Module& m);
PYBIND11_MODULE(_pulsar, m) {
export_exceptions(m);
@@ -44,4 +45,5 @@
export_enums(m);
export_authentication(m);
export_schema(m);
+ export_table_view(m);
}
diff --git a/src/table_view.cc b/src/table_view.cc
new file mode 100644
index 0000000..6252937
--- /dev/null
+++ b/src/table_view.cc
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pybind11/pybind11.h>
+#include <pulsar/TableView.h>
+#include <pulsar/Schema.h>
+#include <pulsar/TableViewConfiguration.h>
+#include <pybind11/stl.h>
+#include <pybind11/functional.h>
+#include <functional>
+#include <utility>
+#include "utils.h"
+
+namespace py = pybind11;
+using namespace pulsar;
+
+void export_table_view(py::module_& m) {
+ py::class_<TableViewConfiguration>(m, "TableViewConfiguration")
+ .def(py::init<>())
+ .def("subscription_name",
+ [](TableViewConfiguration& config, const std::string& name) { config.subscriptionName = name; })
+ .def("schema",
+ [](TableViewConfiguration& config, const SchemaInfo& schema) { config.schemaInfo = schema; });
+
+ py::class_<TableView>(m, "TableView")
+ .def(py::init<>())
+ .def("get",
+ [](const TableView& view, const std::string& key) -> std::pair<bool, py::bytes> {
+ py::gil_scoped_release release;
+ std::string value;
+ bool available = view.getValue(key, value);
+ py::gil_scoped_acquire acquire;
+ if (available) {
+ return std::make_pair(true, py::bytes(std::move(value)));
+ } else {
+ return std::make_pair(false, py::bytes());
+ }
+ })
+ .def("size", &TableView::size, py::call_guard<py::gil_scoped_release>())
+ .def("for_each",
+ [](TableView& view, std::function<void(std::string, py::bytes)> callback) {
+ py::gil_scoped_release release;
+ view.forEach([callback](const std::string& key, const std::string& value) {
+ py::gil_scoped_acquire acquire;
+ callback(key, py::bytes(value));
+ });
+ })
+ .def("for_each_and_listen",
+ [](TableView& view, std::function<void(std::string, py::bytes)> callback) {
+ py::gil_scoped_release release;
+ view.forEachAndListen([callback](const std::string& key, const std::string& value) {
+ py::gil_scoped_acquire acquire;
+ callback(key, py::bytes(value));
+ });
+ })
+ .def("close", [](TableView& view) {
+ waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); });
+ });
+}
diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh
index 0d6fabf..8d7600d 100755
--- a/tests/run-unit-tests.sh
+++ b/tests/run-unit-tests.sh
@@ -28,5 +28,6 @@
python3 interrupted_test.py
python3 pulsar_test.py
python3 schema_test.py
+python3 table_view_test.py
python3 reader_test.py
python3 asyncio_test.py
diff --git a/tests/table_view_test.py b/tests/table_view_test.py
new file mode 100644
index 0000000..d3adcd4
--- /dev/null
+++ b/tests/table_view_test.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from typing import Callable
+from unittest import TestCase, main
+import time
+
+from pulsar import Client
+from pulsar.schema.schema import StringSchema
+
+class TableViewTest(TestCase):
+
+ def setUp(self):
+ self._client: Client = Client('pulsar://localhost:6650')
+
+ def tearDown(self):
+ self._client.close()
+
+ def test_get(self):
+ topic = f'table_view_test_get-{time.time()}'
+ table_view = self._client.create_table_view(topic)
+ self.assertEqual(len(table_view), 0)
+
+ producer = self._client.create_producer(topic)
+ producer.send(b'value-0', partition_key='key-0')
+ producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes
+
+ self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
+ self.assertEqual(table_view.get('key-0'), b'value-0')
+ self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0')
+
+ producer.send(b'value-1', partition_key='key-0')
+ self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1'))
+
+ producer.close()
+ table_view.close()
+
+ def test_for_each(self):
+ topic = f'table_view_test_for_each-{time.time()}'
+ table_view = self._client.create_table_view(topic)
+ producer = self._client.create_producer(topic)
+ producer.send(b'value-0', partition_key='key-0')
+ producer.send(b'value-1', partition_key='key-1')
+ self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
+
+ d = dict()
+ table_view.for_each(lambda key, value: d.__setitem__(key, value))
+ self.assertEqual(d, {
+ 'key-0': b'value-0',
+ 'key-1': b'value-1'
+ })
+
+ def listener(key: str, value: str):
+ if len(value) == 0:
+ d.pop(key)
+ else:
+ d[key] = value
+
+ d.clear()
+ table_view.for_each_and_listen(listener)
+ self.assertEqual(d, {
+ 'key-0': b'value-0',
+ 'key-1': b'value-1'
+ })
+
+ producer.send(b'value-0-new', partition_key='key-0')
+ producer.send(b'', partition_key='key-1')
+ producer.send(b'value-2', partition_key='key-2')
+ def assert_latest_values():
+ self.assertEqual(d, {
+ 'key-0': b'value-0-new',
+ 'key-2': b'value-2'
+ })
+ self._wait_for_assertion(assert_latest_values)
+
+ def test_schema(self):
+ topic = f'table_view_test_schema-{time.time()}'
+ table_view = self._client.create_table_view(topic, schema=StringSchema())
+ producer = self._client.create_producer(topic, schema=StringSchema())
+ producer.send('value', partition_key='key')
+
+ self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'value'))
+ self.assertEqual(table_view.get('missed-key'), None)
+
+ entries = dict()
+ table_view.for_each(lambda key, value: entries.__setitem__(key, value))
+ self.assertEqual(entries, {'key': 'value'})
+
+ entries.clear()
+ table_view.for_each_and_listen(lambda key, value: entries.__setitem__(key, value))
+ self.assertEqual(entries, {'key': 'value'})
+
+ producer.send('new-value', partition_key='key')
+ self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'new-value'))
+
+ def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ try:
+ assertion()
+ return
+ except AssertionError:
+ time.sleep(0.1)
+ assertion()
+
+if __name__ == "__main__":
+ main()