WIP:
diff --git a/src/main/cpp/client/SessionImpl.cpp b/src/main/cpp/client/SessionImpl.cpp
index da7e9d1..f047b76 100644
--- a/src/main/cpp/client/SessionImpl.cpp
+++ b/src/main/cpp/client/SessionImpl.cpp
@@ -89,6 +89,95 @@
stub_->async()->AckMessage(client_context, request, response, callback);
}
+void SessionImpl::heartbeat(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::HeartbeatRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HeartbeatResponse&)> cb) {
+ auto response = new rmq::HeartbeatResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->Heartbeat(client_context, request, response, callback);
+}
+
+void SessionImpl::healthCheck(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::HealthCheckRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HealthCheckResponse&)> cb) {
+ auto response = new rmq::HealthCheckResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->HealthCheck(client_context, request, response, callback);
+}
+
+void SessionImpl::endTransaction(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::EndTransactionRequest* request,
+ std::function<void(const grpc::Status&, const rmq::EndTransactionResponse&)> cb) {
+ auto response = new rmq::EndTransactionResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->EndTransaction(client_context, request, response, callback);
+}
+
+void SessionImpl::queryOffset(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::QueryOffsetRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryOffsetResponse&)> cb) {
+ auto response = new rmq::QueryOffsetResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->QueryOffset(client_context, request, response, callback);
+}
+
+void SessionImpl::pull(absl::flat_hash_map<std::string, std::string> metadata, const rmq::PullMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::PullMessageResponse&)> cb) {
+ auto response = new rmq::PullMessageResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->PullMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::forwardMessageToDeadLetterQueue(
+ absl::flat_hash_map<std::string, std::string> metadata, const rmq::ForwardMessageToDeadLetterQueueRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ForwardMessageToDeadLetterQueueResponse&)> cb) {
+ auto response = new rmq::ForwardMessageToDeadLetterQueueResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->ForwardMessageToDeadLetterQueue(client_context, request, response, callback);
+}
+
void SessionImpl::addMetadata(const absl::flat_hash_map<std::string, std::string>& metadata,
grpc::ClientContext* client_context) {
for (const auto& entry : metadata) {
diff --git a/src/main/cpp/client/include/Session.h b/src/main/cpp/client/include/Session.h
index 4ad6de1..651f94d 100644
--- a/src/main/cpp/client/include/Session.h
+++ b/src/main/cpp/client/include/Session.h
@@ -56,6 +56,29 @@
virtual void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) PURE;
+
+ virtual void heartbeat(absl::flat_hash_map<std::string, std::string> metadata, const rmq::HeartbeatRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HeartbeatResponse&)> cb) PURE;
+
+ virtual void healthCheck(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::HealthCheckRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HealthCheckResponse&)> cb) PURE;
+
+ virtual void endTransaction(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::EndTransactionRequest* request,
+ std::function<void(const grpc::Status&, const rmq::EndTransactionResponse&)> cb) PURE;
+
+ virtual void queryOffset(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::QueryOffsetRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryOffsetResponse&)> cb) PURE;
+
+ virtual void pull(absl::flat_hash_map<std::string, std::string> metadata, const rmq::PullMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::PullMessageResponse&)> cb) PURE;
+
+ virtual void forwardMessageToDeadLetterQueue(
+ absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::ForwardMessageToDeadLetterQueueRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ForwardMessageToDeadLetterQueueResponse&)> cb) PURE;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/SessionImpl.h b/src/main/cpp/client/include/SessionImpl.h
index f07f801..9e84aff 100644
--- a/src/main/cpp/client/include/SessionImpl.h
+++ b/src/main/cpp/client/include/SessionImpl.h
@@ -35,6 +35,26 @@
void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) override;
+ void heartbeat(absl::flat_hash_map<std::string, std::string> metadata, const rmq::HeartbeatRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HeartbeatResponse&)> cb) override;
+
+ void healthCheck(absl::flat_hash_map<std::string, std::string> metadata, const rmq::HealthCheckRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HealthCheckResponse&)> cb) override;
+
+ void endTransaction(absl::flat_hash_map<std::string, std::string> metadata, const rmq::EndTransactionRequest* request,
+ std::function<void(const grpc::Status&, const rmq::EndTransactionResponse&)> cb) override;
+
+ void queryOffset(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryOffsetRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryOffsetResponse&)> cb) override;
+
+ void pull(absl::flat_hash_map<std::string, std::string> metadata, const rmq::PullMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::PullMessageResponse&)> cb) override;
+
+ void forwardMessageToDeadLetterQueue(
+ absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::ForwardMessageToDeadLetterQueueRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ForwardMessageToDeadLetterQueueResponse&)> cb) override;
+
private:
std::shared_ptr<grpc::Channel> channel_;
std::unique_ptr<rmq::MessagingService::Stub> stub_;