Support TableView (#251)

diff --git a/dependencies.yaml b/dependencies.yaml
index 9d9136e..9db4747 100644
--- a/dependencies.yaml
+++ b/dependencies.yaml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-pulsar-cpp: 3.7.0
+pulsar-cpp: 3.7.1
 pybind11: 2.10.1
 # The OpenSSL dependency is only used when building Python from source
 openssl: 1.1.1q
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index f5b2b35..df7cad4 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -54,6 +54,7 @@
 from pulsar.__about__ import __version__
 
 from pulsar.exceptions import *
+from pulsar.tableview import TableView
 
 from pulsar.functions.function import Function
 from pulsar.functions.context import Context
@@ -1199,6 +1200,42 @@
         self._consumers.append(c)
         return c
 
+    def create_table_view(self, topic: str,
+                          subscription_name: Optional[str] = None,
+                          schema: schema.Schema = schema.BytesSchema()) -> TableView:
+        """
+        Create a table view on a particular topic
+
+        Parameters
+        ----------
+
+        topic: str
+            The name of the topic.
+        subscription_name: str, optional
+            The name of the subscription. If it's not specified, a random subscription name
+            will be used.
+        schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
+            Define the schema of this table view. If the schema is incompatible with the topic's
+            schema, this method will throw an exception. This schema is also used to deserialize
+            the value of messages in the table view.
+
+        Returns
+        -------
+        TableView
+            A table view instance.
+        """
+        _check_type(str, topic, 'topic')
+        _check_type_or_none(str, subscription_name, 'subscription_name')
+        _check_type(_schema.Schema, schema, 'schema')
+
+        tv_conf = _pulsar.TableViewConfiguration()
+        if subscription_name is not None:
+            tv_conf.subscription_name(subscription_name)
+        tv_conf.schema(schema.schema_info())
+        tv = self._client.create_table_view(topic, tv_conf)
+        self._table_view = TableView(tv, topic, subscription_name, schema)
+        return self._table_view
+
     def get_topic_partitions(self, topic):
         """
         Get the list of partitions for a given topic.
diff --git a/pulsar/tableview.py b/pulsar/tableview.py
new file mode 100644
index 0000000..702bb5b
--- /dev/null
+++ b/pulsar/tableview.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+The TableView implementation.
+"""
+
+from typing import Any, Callable, Optional
+from pulsar.schema.schema import Schema
+import _pulsar
+
+class TableView():
+
+    def __init__(self, table_view: _pulsar.TableView, topic: str,
+                 subscription: Optional[str], schema: Schema) -> None:
+        self._table_view = table_view
+        self._topic = topic
+        self._subscription = subscription
+        self._schema = schema
+
+    def get(self, key: str) -> Optional[Any]:
+        """
+        Return the value associated with the given key in the table view.
+
+        Parameters
+        ----------
+        key: str
+            The message key
+
+        Returns
+        -------
+        Optional[Any]
+            The value associated with the key, or None if the key does not exist.
+        """
+        pair = self._table_view.get(key)
+        if pair[0]:
+            return self._schema.decode(pair[1])
+        else:
+            return None
+
+    def for_each(self, callback: Callable[[str, Any], None]) -> None:
+        """
+        Iterate over all entries in the table view and call the callback function
+        with the key and value for each entry.
+
+        Parameters
+        ----------
+        callback: Callable[[str, Any], None]
+            The callback function to call for each entry.
+        """
+        self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v)))
+
+    def for_each_and_listen(self, callback: Callable[[str, Any], None]) -> None:
+        """
+        Iterate over all entries in the table view and call the callback function
+        with the key and value for each entry, then listen for changes. The callback
+        will be called when a new entry is added or an existing entry is updated.
+
+        Parameters
+        ----------
+        callback: Callable[[str, Any], None]
+            The callback function to call for each entry.
+        """
+        self._table_view.for_each_and_listen(lambda k, v: callback(k, self._schema.decode(v)))
+
+    def close(self) -> None:
+        """
+        Close the table view.
+        """
+        self._table_view.close()
+
+    def __len__(self) -> int:
+        """
+        Return the number of entries in the table view.
+        """
+        return self._table_view.size()
+
+    def __str__(self) -> str:
+        if self._subscription is None:
+            return f"TableView(topic={self._topic})"
+        else:
+            return f"TableView(topic={self._topic}, subscription={self._subscription})"
+
+    def __repr__(self) -> str:
+        return self.__str__()
diff --git a/src/client.cc b/src/client.cc
index b25c63a..72c824f 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -89,6 +89,12 @@
         .def("subscribe_topics", &Client_subscribe_topics)
         .def("subscribe_pattern", &Client_subscribe_pattern)
         .def("create_reader", &Client_createReader)
+        .def("create_table_view", [](Client& client, const std::string& topic,
+                                     const TableViewConfiguration& config) {
+            return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
+                client.createTableViewAsync(topic, config, callback);
+            });
+        })
         .def("get_topic_partitions", &Client_getTopicPartitions)
         .def("get_schema_info", &Client_getSchemaInfo)
         .def("close", &Client_close)
diff --git a/src/pulsar.cc b/src/pulsar.cc
index 9bfeb59..6c42f8c 100644
--- a/src/pulsar.cc
+++ b/src/pulsar.cc
@@ -32,6 +32,7 @@
 void export_authentication(Module& m);
 void export_schema(Module& m);
 void export_exceptions(Module& m);
+void export_table_view(Module& m);
 
 PYBIND11_MODULE(_pulsar, m) {
     export_exceptions(m);
@@ -44,4 +45,5 @@
     export_enums(m);
     export_authentication(m);
     export_schema(m);
+    export_table_view(m);
 }
diff --git a/src/table_view.cc b/src/table_view.cc
new file mode 100644
index 0000000..6252937
--- /dev/null
+++ b/src/table_view.cc
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pybind11/pybind11.h>
+#include <pulsar/TableView.h>
+#include <pulsar/Schema.h>
+#include <pulsar/TableViewConfiguration.h>
+#include <pybind11/stl.h>
+#include <pybind11/functional.h>
+#include <functional>
+#include <utility>
+#include "utils.h"
+
+namespace py = pybind11;
+using namespace pulsar;
+
+void export_table_view(py::module_& m) {
+    py::class_<TableViewConfiguration>(m, "TableViewConfiguration")
+        .def(py::init<>())
+        .def("subscription_name",
+             [](TableViewConfiguration& config, const std::string& name) { config.subscriptionName = name; })
+        .def("schema",
+             [](TableViewConfiguration& config, const SchemaInfo& schema) { config.schemaInfo = schema; });
+
+    py::class_<TableView>(m, "TableView")
+        .def(py::init<>())
+        .def("get",
+             [](const TableView& view, const std::string& key) -> std::pair<bool, py::bytes> {
+                 py::gil_scoped_release release;
+                 std::string value;
+                 bool available = view.getValue(key, value);
+                 py::gil_scoped_acquire acquire;
+                 if (available) {
+                     return std::make_pair(true, py::bytes(std::move(value)));
+                 } else {
+                     return std::make_pair(false, py::bytes());
+                 }
+             })
+        .def("size", &TableView::size, py::call_guard<py::gil_scoped_release>())
+        .def("for_each",
+             [](TableView& view, std::function<void(std::string, py::bytes)> callback) {
+                 py::gil_scoped_release release;
+                 view.forEach([callback](const std::string& key, const std::string& value) {
+                     py::gil_scoped_acquire acquire;
+                     callback(key, py::bytes(value));
+                 });
+             })
+        .def("for_each_and_listen",
+             [](TableView& view, std::function<void(std::string, py::bytes)> callback) {
+                 py::gil_scoped_release release;
+                 view.forEachAndListen([callback](const std::string& key, const std::string& value) {
+                     py::gil_scoped_acquire acquire;
+                     callback(key, py::bytes(value));
+                 });
+             })
+        .def("close", [](TableView& view) {
+            waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); });
+        });
+}
diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh
index 0d6fabf..8d7600d 100755
--- a/tests/run-unit-tests.sh
+++ b/tests/run-unit-tests.sh
@@ -28,5 +28,6 @@
 python3 interrupted_test.py
 python3 pulsar_test.py
 python3 schema_test.py
+python3 table_view_test.py
 python3 reader_test.py
 python3 asyncio_test.py
diff --git a/tests/table_view_test.py b/tests/table_view_test.py
new file mode 100644
index 0000000..d3adcd4
--- /dev/null
+++ b/tests/table_view_test.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from typing import Callable
+from unittest import TestCase, main
+import time
+
+from pulsar import Client
+from pulsar.schema.schema import StringSchema
+
+class TableViewTest(TestCase):
+
+    def setUp(self):
+        self._client: Client = Client('pulsar://localhost:6650')
+
+    def tearDown(self):
+        self._client.close()
+
+    def test_get(self):
+        topic = f'table_view_test_get-{time.time()}'
+        table_view = self._client.create_table_view(topic)
+        self.assertEqual(len(table_view), 0)
+
+        producer = self._client.create_producer(topic)
+        producer.send(b'value-0', partition_key='key-0')
+        producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes
+
+        self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
+        self.assertEqual(table_view.get('key-0'), b'value-0')
+        self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0')
+
+        producer.send(b'value-1', partition_key='key-0')
+        self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1'))
+
+        producer.close()
+        table_view.close()
+        
+    def test_for_each(self):
+        topic = f'table_view_test_for_each-{time.time()}'
+        table_view = self._client.create_table_view(topic)
+        producer = self._client.create_producer(topic)
+        producer.send(b'value-0', partition_key='key-0')
+        producer.send(b'value-1', partition_key='key-1')
+        self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
+
+        d = dict()
+        table_view.for_each(lambda key, value: d.__setitem__(key, value))
+        self.assertEqual(d, {
+            'key-0': b'value-0',
+            'key-1': b'value-1'
+        })
+
+        def listener(key: str, value: str):
+            if len(value) == 0:
+                d.pop(key)
+            else:
+                d[key] = value
+
+        d.clear()
+        table_view.for_each_and_listen(listener)
+        self.assertEqual(d, {
+            'key-0': b'value-0',
+            'key-1': b'value-1'
+        })
+
+        producer.send(b'value-0-new', partition_key='key-0')
+        producer.send(b'', partition_key='key-1')
+        producer.send(b'value-2', partition_key='key-2')
+        def assert_latest_values():
+            self.assertEqual(d, {
+                'key-0': b'value-0-new',
+                'key-2': b'value-2'
+            })
+        self._wait_for_assertion(assert_latest_values)
+
+    def test_schema(self):
+        topic = f'table_view_test_schema-{time.time()}'
+        table_view = self._client.create_table_view(topic, schema=StringSchema())
+        producer = self._client.create_producer(topic, schema=StringSchema())
+        producer.send('value', partition_key='key')
+
+        self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'value'))
+        self.assertEqual(table_view.get('missed-key'), None)
+
+        entries = dict()
+        table_view.for_each(lambda key, value: entries.__setitem__(key, value))
+        self.assertEqual(entries, {'key': 'value'})
+
+        entries.clear()
+        table_view.for_each_and_listen(lambda key, value: entries.__setitem__(key, value))
+        self.assertEqual(entries, {'key': 'value'})
+
+        producer.send('new-value', partition_key='key')
+        self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'new-value'))
+
+    def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
+        start_time = time.time()
+        while time.time() - start_time < timeout:
+            try:
+                assertion()
+                return
+            except AssertionError:
+                time.sleep(0.1)
+        assertion()
+
+if __name__ == "__main__":
+    main()