WIP: Implement more API using async gRPC API
diff --git a/src/main/cpp/client/SessionImpl.cpp b/src/main/cpp/client/SessionImpl.cpp
index 470c90d..da7e9d1 100644
--- a/src/main/cpp/client/SessionImpl.cpp
+++ b/src/main/cpp/client/SessionImpl.cpp
@@ -1,5 +1,7 @@
 #include "SessionImpl.h"
 
+#include <chrono>
+#include <grpcpp/client_context.h>
 #include <string>
 
 #include "absl/memory/memory.h"
@@ -14,9 +16,8 @@
   auto response = new rmq::QueryRouteResponse;
   auto client_context = new grpc::ClientContext;
 
-  for (const auto& entry : metadata) {
-    client_context->AddMetadata(entry.first, entry.second);
-  }
+  addMetadata(metadata, client_context);
+  setDeadline(io_timeout_, client_context);
 
   auto callback = [=](grpc::Status s) {
     auto reply = absl::WrapUnique(response);
@@ -27,4 +28,76 @@
   stub_->async()->QueryRoute(client_context, request, response, callback);
 }
 
+void SessionImpl::send(absl::flat_hash_map<std::string, std::string> metadata, const rmq::SendMessageRequest* request,
+                       std::function<void(const grpc::Status&, const rmq::SendMessageResponse&)> cb) {
+  auto response = new rmq::SendMessageResponse;
+  auto client_context = new grpc::ClientContext;
+
+  setDeadline(io_timeout_, client_context);
+  addMetadata(metadata, client_context);
+
+  auto callback = [=](grpc::Status s) {
+    auto reply = absl::WrapUnique(response);
+    auto ctx = absl::WrapUnique(client_context);
+    cb(s, *reply);
+  };
+
+  stub_->async()->SendMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::queryAssignment(absl::flat_hash_map<std::string, std::string> metadata,
+                                  const rmq::QueryAssignmentRequest* request,
+                                  std::function<void(const grpc::Status&, const rmq::QueryAssignmentResponse&)> cb) {
+  auto response = new rmq::QueryAssignmentResponse;
+  auto client_context = new grpc::ClientContext;
+  addMetadata(metadata, client_context);
+  setDeadline(io_timeout_, client_context);
+  auto callback = [=](grpc::Status status) {
+    auto reply = absl::WrapUnique(response);
+    auto ctx = absl::WrapUnique(client_context);
+    cb(status, *reply);
+  };
+  stub_->async()->QueryAssignment(client_context, request, response, callback);
+}
+
+void SessionImpl::receive(absl::flat_hash_map<std::string, std::string> metadata,
+                          const rmq::ReceiveMessageRequest* request,
+                          std::function<void(const grpc::Status&, const rmq::ReceiveMessageResponse&)> cb) {
+  auto response = new rmq::ReceiveMessageResponse;
+  auto client_context = new grpc::ClientContext;
+  addMetadata(metadata, client_context);
+  setDeadline(io_timeout_, client_context);
+  auto callback = [=](grpc::Status status) {
+    auto reply = absl::WrapUnique(response);
+    auto ctx = absl::WrapUnique(client_context);
+    cb(status, *reply);
+  };
+  stub_->async()->ReceiveMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
+                      std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) {
+  auto response = new rmq::AckMessageResponse;
+  auto client_context = new grpc::ClientContext;
+  addMetadata(metadata, client_context);
+  setDeadline(io_timeout_, client_context);
+  auto callback = [=](grpc::Status status) {
+    auto reply = absl::WrapUnique(response);
+    auto ctx = absl::WrapUnique(client_context);
+    cb(status, *reply);
+  };
+  stub_->async()->AckMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::addMetadata(const absl::flat_hash_map<std::string, std::string>& metadata,
+                              grpc::ClientContext* client_context) {
+  for (const auto& entry : metadata) {
+    client_context->AddMetadata(entry.first, entry.second);
+  }
+}
+
+void SessionImpl::setDeadline(std::chrono::milliseconds timeout, grpc::ClientContext* client_context) {
+  client_context->set_deadline(std::chrono::system_clock::now() + io_timeout_);
+}
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/Session.h b/src/main/cpp/client/include/Session.h
index 46780ea..4ad6de1 100644
--- a/src/main/cpp/client/include/Session.h
+++ b/src/main/cpp/client/include/Session.h
@@ -41,7 +41,21 @@
   virtual ~Session() = default;
 
   virtual void queryRoute(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryRouteRequest* request,
-                          std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> callback) PURE;
+                          std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> cb) PURE;
+
+  virtual void send(absl::flat_hash_map<std::string, std::string> metadata, const rmq::SendMessageRequest* request,
+                    std::function<void(const grpc::Status&, const rmq::SendMessageResponse&)> cb) PURE;
+
+  virtual void queryAssignment(absl::flat_hash_map<std::string, std::string> metadata,
+                               const rmq::QueryAssignmentRequest* request,
+                               std::function<void(const grpc::Status&, const rmq::QueryAssignmentResponse&)> cb) PURE;
+
+  virtual void receive(absl::flat_hash_map<std::string, std::string> metadata,
+                       const rmq::ReceiveMessageRequest* request,
+                       std::function<void(const grpc::Status&, const rmq::ReceiveMessageResponse&)> cb) PURE;
+
+  virtual void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
+                   std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) PURE;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/SessionImpl.h b/src/main/cpp/client/include/SessionImpl.h
index 4780178..f07f801 100644
--- a/src/main/cpp/client/include/SessionImpl.h
+++ b/src/main/cpp/client/include/SessionImpl.h
@@ -2,6 +2,10 @@
 
 #include "Session.h"
 #include <apache/rocketmq/v1/definition.pb.h>
+#include <apache/rocketmq/v1/service.pb.h>
+#include <chrono>
+#include <functional>
+#include <grpcpp/client_context.h>
 #include <memory>
 #include <string>
 
@@ -18,9 +22,26 @@
   void queryRoute(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryRouteRequest* request,
                   std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> callback) override;
 
+  void send(absl::flat_hash_map<std::string, std::string> metadata, const rmq::SendMessageRequest* request,
+            std::function<void(const grpc::Status&, const rmq::SendMessageResponse&)> cb) override;
+
+  void queryAssignment(absl::flat_hash_map<std::string, std::string> metadata,
+                       const rmq::QueryAssignmentRequest* request,
+                       std::function<void(const grpc::Status&, const rmq::QueryAssignmentResponse&)> cb) override;
+
+  void receive(absl::flat_hash_map<std::string, std::string> metadata, const rmq::ReceiveMessageRequest* request,
+               std::function<void(const grpc::Status&, const rmq::ReceiveMessageResponse&)> cb) override;
+
+  void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
+           std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) override;
+
 private:
   std::shared_ptr<grpc::Channel> channel_;
   std::unique_ptr<rmq::MessagingService::Stub> stub_;
+  std::chrono::seconds io_timeout_{3};
+
+  void addMetadata(const absl::flat_hash_map<std::string, std::string>& metadata, grpc::ClientContext* client_context);
+  void setDeadline(std::chrono::milliseconds timeout, grpc::ClientContext* client_context);
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file