Use latest protocol specification
diff --git a/example/rocketmq/ExampleProducer.cpp b/example/rocketmq/ExampleProducer.cpp
index 8dee436..6452509 100644
--- a/example/rocketmq/ExampleProducer.cpp
+++ b/example/rocketmq/ExampleProducer.cpp
@@ -66,12 +66,12 @@
   std::thread stats_thread(stats_lambda);
 
   std::string body = randomString(1024 * 4);
-  std::cout << "Message body: " << body << std::endl;
+  std::cout << "Message body size: " << body.length() << std::endl;
   message.setBody(body);
 
   try {
     producer.start();
-    for (int i = 0; i < 102400; ++i) {
+    for (int i = 0; i < 16; ++i) {
       SendResult sendResult = producer.send(message);
       std::cout << sendResult.getMessageQueue().simpleName() << ": " << sendResult.getMsgId() << std::endl;
       count++;
diff --git a/proto/apache/rocketmq/v1/admin.proto b/proto/apache/rocketmq/v1/admin.proto
index 8ed2037..554207b 100644
--- a/proto/apache/rocketmq/v1/admin.proto
+++ b/proto/apache/rocketmq/v1/admin.proto
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 syntax = "proto3";
 
 package apache.rocketmq.v1;
@@ -7,9 +22,7 @@
 option java_package = "apache.rocketmq.v1";
 option java_generate_equals_and_hash = true;
 option java_string_check_utf8 = true;
-
-// Ali Cloud Service
-option java_outer_classname = "ACS";
+option java_outer_classname = "MQAdmin";
 
 message ChangeLogLevelRequest {
   enum Level {
diff --git a/proto/apache/rocketmq/v1/definition.proto b/proto/apache/rocketmq/v1/definition.proto
index 9ecd46a..c92aad3 100644
--- a/proto/apache/rocketmq/v1/definition.proto
+++ b/proto/apache/rocketmq/v1/definition.proto
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 syntax = "proto3";
 
 import "google/protobuf/timestamp.proto";
@@ -9,8 +24,6 @@
 option java_package = "apache.rocketmq.v1";
 option java_generate_equals_and_hash = true;
 option java_string_check_utf8 = true;
-
-// Ali Cloud Service
 option java_outer_classname = "MQDomain";
 
 enum Permission {
@@ -46,7 +59,7 @@
   //
   // This field will be honored on a best effort basis.
   //
-  // If this parameter is 0, a default value of 5 is used.
+  // If this parameter is 0, a default value of 16 is used.
   int32 max_delivery_attempts = 1;
 
   reserved 2 to 64;
@@ -216,6 +229,7 @@
   repeated string keys = 2;
 
   // Message identifier, client-side generated, remains unique.
+  // if message_id is empty, the send message request will be aborted with status `INVALID_ARGUMENT`
   string message_id = 3;
 
   // Message body digest
@@ -284,6 +298,9 @@
 message Message {
   Resource topic = 1;
   // User defined key-value pairs.
+  // If user_attribute contains the reserved keys by RocketMQ,
+  // the send message request will be aborted with status `INVALID_ARGUMENT`. See below links for the reserved keys
+  // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
   map<string, string> user_attribute = 2;
   SystemAttribute system_attribute = 3;
   bytes body = 4;
diff --git a/proto/apache/rocketmq/v1/service.proto b/proto/apache/rocketmq/v1/service.proto
index 5deb63c..6220f51 100644
--- a/proto/apache/rocketmq/v1/service.proto
+++ b/proto/apache/rocketmq/v1/service.proto
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 syntax = "proto3";
 
 import "google/protobuf/duration.proto";
@@ -13,8 +28,6 @@
 option java_package = "apache.rocketmq.v1";
 option java_generate_equals_and_hash = true;
 option java_string_check_utf8 = true;
-
-// Ali Cloud Service
 option java_outer_classname = "MQService";
 
 message ResponseCommon {
@@ -28,10 +41,13 @@
   reserved 7 to 64;
 }
 
+// A QueryRouteRequest requests a set of Partitions of the specific topic with
+// necessary route infos.
 message QueryRouteRequest {
   Resource topic = 1;
 
-  // Service access point
+  // The service access points used to issue QueryRouteRequest
+  // The QueryRouteResponse will indicate the adress of subsequent RPCs.
   Endpoints endpoints = 2;
 
   reserved 3 to 64;
@@ -248,20 +264,6 @@
   reserved 6 to 64;
 }
 
-message UpdateOffsetRequest {
-  Resource group = 1;
-  Partition partition = 2;
-  int64 partition_offset = 3;
-
-  reserved 4 to 64;
-}
-
-message UpdateOffsetResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
 message GenericPollingRequest {
   string client_id = 1;
   repeated Resource topics = 2;
@@ -336,10 +338,13 @@
 }
 
 message NotifyClientTerminationRequest {
-  Resource group = 1;
-  string client_id = 2;
+  oneof group {
+    Resource producer_group = 1;
+    Resource consumer_group = 2;
+  }
+  string client_id = 3;
 
-  reserved 3 to 64;
+  reserved 4 to 64;
 }
 
 message NotifyClientTerminationResponse {
@@ -348,37 +353,103 @@
   reserved 2 to 64;
 }
 
+// For all the rpcs in MessagingService may return below erros:
+//
+// If the request doesn't have a valid authentication credentials, returns `UNAUTHENTICATED`.
+// If the caller doesn't permission to execute the specified operation, returns `PERMISSION_DENIED`.
+// If the per-user rate quota has been exceeded, returns `RESOURCE_EXHAUSTED`.
+// If any unexpected server-side exception occurs, returns `INTERNAL`.
 service MessagingService {
-  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
 
-  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
+  // Querys the route info of a topic from specific endpoints, the server returns a set of partition if success.
+  //
+  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
+  }
 
-  rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
+  // Producer or consumer sends HeartbeatRequest to server in order to report necessary
+  // client-side information, like subscription data of consumer. Returns `OK` if success.
+  //
+  // If the client language info is invalid, returns `INVALID_ARGUMENT`
+  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
+  }
 
-  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
+  // Checks the health status of message server, returns `OK` if no network issues.
+  // Clients could use this RPC to detect the availability of server, and adpot necessary isolation measures.
+  rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {
+  }
 
-  rpc QueryAssignment(QueryAssignmentRequest)
-      returns (QueryAssignmentResponse) {}
+  // Sends one message to the specific partition of a topic, returns message id or transaction id with status `OK`.
+  //
+  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
+  }
 
-  rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
+  // Querys the assigned partition route info of a topic for current consumer,
+  // the returned assignment result is descided by server-side load balacner.
+  //
+  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+  rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
+  }
 
-  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+  // Receives messages from the server in batch manner, returns a set of messages if success.
+  // The received messages should be acked or uacked after processed.
+  //
+  // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
+  // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
+  // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+  // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
+  // may suffer from false empty responses.
+  rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {
+  }
 
-  rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
+  // Acknowledges the message associated with the `receipt_handle` or `offset` in the
+  // `AckMessageRequest`, it means the message has been successfully processed.
+  // Returns `OK` if the message server remove the relevant message successfully.
+  //
+  // If the given receipt_handle is illegal or out of date, returns `INVALID_ARGUMENT`.
+  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
+  }
 
+  // Signals that the message has not been successfully processed. The message server should resend the message
+  // follow the retry policy defined at server-side.
+  //
+  // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+  rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {
+  }
+
+  // Forwards one message to dead letter queue if the DeadLetterPolicy is triggered by this message at client-side,
+  // return `OK` if success.
   rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
-      returns (ForwardMessageToDeadLetterQueueResponse) {}
+      returns (ForwardMessageToDeadLetterQueueResponse) {
+  }
 
-  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
+  // Commits or rollback one transactional message.
+  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
+  }
 
-  rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
+  // Querys the offset of the specific partition, returns the offset with `OK` if success.
+  // The message server should maintain a numerical offset for each message in a parition.
+  rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {
+  }
 
-  rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
+  // Pulls messages from the specific partition, returns a set of messages with next pull offset.
+  // The pulled messages can't be acked or nacked, while the client is responsible for manage offesets for consumer,
+  // typically update consume offset to local memory or a third-party storage service.
+  //
+  // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
+  // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
+  // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+  // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
+  // may suffer from false empty responses.
+  rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {
+  }
 
-  rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+  rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {
+  }
 
-  rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {}
-
-  rpc NotifyClientTermination(NotifyClientTerminationRequest)
-      returns (NotifyClientTerminationResponse) {}
+  rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
+  }
 }
\ No newline at end of file
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 0dfc00b..24d56be 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -55,9 +55,12 @@
   tls_channel_credential_options_.watch_identity_key_cert_pairs();
   channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_options_);
 
-  int max_message_size = 1024 * 1024 * 16;
-  channel_arguments_.SetMaxReceiveMessageSize(max_message_size);
-  channel_arguments_.SetMaxSendMessageSize(max_message_size);
+
+  // Use unlimited receive message size.
+  channel_arguments_.SetMaxReceiveMessageSize(-1);
+
+  int max_send_message_size = 1024 * 1024 * 16;
+  channel_arguments_.SetMaxSendMessageSize(max_send_message_size);
 
   /*
    * Keep-alive settings:
diff --git a/src/main/cpp/client/include/Client.h b/src/main/cpp/client/include/Client.h
index 9a14e8c..917115d 100644
--- a/src/main/cpp/client/include/Client.h
+++ b/src/main/cpp/client/include/Client.h
@@ -12,23 +12,27 @@
 public:
   ~Client() override = default;
 
-  virtual void endpointsInUse(absl::flat_hash_set<std::string>& endpoints) = 0;
+  virtual void endpointsInUse(absl::flat_hash_set<std::string> &endpoints) = 0;
 
   virtual void heartbeat() = 0;
 
   virtual bool active() = 0;
 
-  virtual void onRemoteEndpointRemoval(const std::vector<std::string>&) = 0;
+  virtual void onRemoteEndpointRemoval(const std::vector<std::string> &) = 0;
 
   /**
-   * For endpoints that are marked as inactive due to one or multiple business operation failure, this function is to
-   * initiate health-check RPCs; Once the health-check passes, they are conceptually add back to serve further business
-   * workload.
+   * For endpoints that are marked as inactive due to one or multiple business
+   * operation failure, this function is to initiate health-check RPCs; Once the
+   * health-check passes, they are conceptually add back to serve further
+   * business workload.
    */
   virtual void healthCheck() = 0;
 
-  virtual void schedule(const std::string& task_name, const std::function<void(void)>& task,
+  virtual void schedule(const std::string &task_name,
+                        const std::function<void(void)> &task,
                         std::chrono::milliseconds delay) = 0;
+
+  virtual void notifyClientTermination() = 0;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/mocks/include/ClientMock.h b/src/main/cpp/client/mocks/include/ClientMock.h
index dfd7a66..1765992 100644
--- a/src/main/cpp/client/mocks/include/ClientMock.h
+++ b/src/main/cpp/client/mocks/include/ClientMock.h
@@ -19,6 +19,8 @@
 
   MOCK_METHOD(void, schedule, (const std::string&, const std::function<void()>&, std::chrono::milliseconds),
               (override));
+
+  MOCK_METHOD(void, notifyClientTermination, (), (override));
 };
 
 ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index e868c4c..294de27 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -10,6 +10,7 @@
 #include <system_error>
 #include <utility>
 
+#include "RpcClient.h"
 #include "absl/strings/str_join.h"
 #include "absl/strings/str_split.h"
 #include "apache/rocketmq/v1/definition.pb.h"
@@ -69,7 +70,6 @@
     if (route_update_handle_) {
       client_manager_->getScheduler().cancel(route_update_handle_);
     }
-    notifyClientTermination();
     client_manager_.reset();
   } else {
     SPDLOG_ERROR("Try to shutdown ClientImpl, but its state is not as expected. Expecting: {}, Actual: {}",
@@ -476,15 +476,16 @@
 }
 
 void ClientImpl::notifyClientTermination() {
+  SPDLOG_WARN("Should NOT reach here. Subclass should have overridden this function.");
+  std::abort();
+}
+
+void ClientImpl::notifyClientTermination(const NotifyClientTerminationRequest& request) {
   absl::flat_hash_set<std::string> endpoints;
   endpointsInUse(endpoints);
 
   Metadata metadata;
   Signature::sign(this, metadata);
-  NotifyClientTerminationRequest request;
-  request.mutable_group()->set_resource_namespace(resource_namespace_);
-  request.mutable_group()->set_name(group_name_);
-  request.set_client_id(clientId());
 
   for (const auto& endpoint : endpoints) {
     client_manager_->notifyClientTermination(endpoint, metadata, request, absl::ToChronoMilliseconds(io_timeout_));
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 99a8c23..8a6ae27 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -5,6 +5,7 @@
 #include <system_error>
 #include <utility>
 
+#include "Client.h"
 #include "absl/strings/str_join.h"
 #include "opencensus/trace/propagation/trace_context.h"
 #include "opencensus/trace/span.h"
@@ -59,11 +60,21 @@
     return;
   }
 
+  notifyClientTermination();
+
   ClientImpl::shutdown();
   assert(State::STOPPED == state_.load());
   SPDLOG_INFO("Producer instance stopped");
 }
 
+void ProducerImpl::notifyClientTermination() {
+  NotifyClientTerminationRequest request;
+  request.mutable_producer_group()->set_resource_namespace(resource_namespace_);
+  request.mutable_producer_group()->set_name(group_name_);
+  request.set_client_id(clientId());
+  ClientImpl::notifyClientTermination(request);
+}
+
 bool ProducerImpl::isRunning() const {
   return State::STARTED == state_.load(std::memory_order_relaxed);
 }
diff --git a/src/main/cpp/rocketmq/PullConsumerImpl.cpp b/src/main/cpp/rocketmq/PullConsumerImpl.cpp
index 7bb747e..43f8203 100644
--- a/src/main/cpp/rocketmq/PullConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PullConsumerImpl.cpp
@@ -24,6 +24,8 @@
 void PullConsumerImpl::shutdown() {
   // Shutdown services started by current tier
 
+  notifyClientTermination();
+
   // Shutdown services that are started by the parent
   ClientImpl::shutdown();
   State expected = State::STOPPING;
@@ -160,4 +162,12 @@
   }
 }
 
+void PullConsumerImpl::notifyClientTermination() {
+  NotifyClientTerminationRequest request;
+  request.mutable_consumer_group()->set_resource_namespace(resource_namespace_);
+  request.mutable_consumer_group()->set_name(group_name_);
+  request.set_client_id(clientId());
+  ClientImpl::notifyClientTermination(request);
+}
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 4386b84..a8cbc7b 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -630,4 +630,12 @@
   return std::move(resource_bundle);
 }
 
+void PushConsumerImpl::notifyClientTermination() {
+  NotifyClientTerminationRequest request;
+  request.mutable_consumer_group()->set_resource_namespace(resource_namespace_);
+  request.mutable_consumer_group()->set_name(group_name_);
+  request.set_client_id(clientId());
+  ClientImpl::notifyClientTermination(request);
+}
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ClientImpl.h b/src/main/cpp/rocketmq/include/ClientImpl.h
index 3de7452..998d357 100644
--- a/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -3,8 +3,10 @@
 #include <atomic>
 #include <chrono>
 #include <cstdint>
+#include <cstdlib>
 #include <system_error>
 
+#include "RpcClient.h"
 #include "absl/strings/string_view.h"
 #include "apache/rocketmq/v1/definition.pb.h"
 
@@ -118,7 +120,9 @@
 
   void setAccessPoint(rmq::Endpoints* endpoints);
 
-  virtual void notifyClientTermination();
+  void notifyClientTermination() override;
+
+  void notifyClientTermination(const NotifyClientTerminationRequest& request);
 
 private:
   /**
diff --git a/src/main/cpp/rocketmq/include/ProducerImpl.h b/src/main/cpp/rocketmq/include/ProducerImpl.h
index 8d03e35..dea27a2 100644
--- a/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -108,6 +108,8 @@
 
   void resolveOrphanedTransactionalMessage(const std::string& transaction_id, const MQMessageExt& message) override;
 
+  void notifyClientTermination() override;
+
 private:
   absl::flat_hash_map<std::string, TopicPublishInfoPtr> topic_publish_info_table_ GUARDED_BY(topic_publish_info_mtx_);
   absl::Mutex topic_publish_info_mtx_; // protects topic_publish_info_
diff --git a/src/main/cpp/rocketmq/include/PullConsumerImpl.h b/src/main/cpp/rocketmq/include/PullConsumerImpl.h
index 058743b..c009113 100644
--- a/src/main/cpp/rocketmq/include/PullConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PullConsumerImpl.h
@@ -36,6 +36,8 @@
     return shared_from_this();
   }
 
+  void notifyClientTermination() override;
+
   MessageModel message_model_{MessageModel::CLUSTERING};
 };
 
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 44e1b54..e695810 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -183,6 +183,8 @@
 
   ClientResourceBundle resourceBundle() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_) override;
 
+  void notifyClientTermination() override;
+
 private:
   absl::flat_hash_map<std::string, FilterExpression>
       topic_filter_expression_table_ GUARDED_BY(topic_filter_expression_table_mtx_);