blob: f047b767ce335b798858e9b1b74f527766393f1d [file] [log] [blame]
#include "SessionImpl.h"
#include <chrono>
#include <grpcpp/client_context.h>
#include <string>
#include "absl/memory/memory.h"
#include "apache/rocketmq/v1/service.pb.h"
#include "grpcpp/client_context.h"
ROCKETMQ_NAMESPACE_BEGIN
void SessionImpl::queryRoute(absl::flat_hash_map<std::string, std::string> metadata,
const rmq::QueryRouteRequest* request,
std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> cb) {
auto response = new rmq::QueryRouteResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status s) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(s, *response);
};
stub_->async()->QueryRoute(client_context, request, response, callback);
}
void SessionImpl::send(absl::flat_hash_map<std::string, std::string> metadata, const rmq::SendMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::SendMessageResponse&)> cb) {
auto response = new rmq::SendMessageResponse;
auto client_context = new grpc::ClientContext;
setDeadline(io_timeout_, client_context);
addMetadata(metadata, client_context);
auto callback = [=](grpc::Status s) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(s, *reply);
};
stub_->async()->SendMessage(client_context, request, response, callback);
}
void SessionImpl::queryAssignment(absl::flat_hash_map<std::string, std::string> metadata,
const rmq::QueryAssignmentRequest* request,
std::function<void(const grpc::Status&, const rmq::QueryAssignmentResponse&)> cb) {
auto response = new rmq::QueryAssignmentResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->QueryAssignment(client_context, request, response, callback);
}
void SessionImpl::receive(absl::flat_hash_map<std::string, std::string> metadata,
const rmq::ReceiveMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::ReceiveMessageResponse&)> cb) {
auto response = new rmq::ReceiveMessageResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->ReceiveMessage(client_context, request, response, callback);
}
void SessionImpl::ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) {
auto response = new rmq::AckMessageResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->AckMessage(client_context, request, response, callback);
}
void SessionImpl::heartbeat(absl::flat_hash_map<std::string, std::string> metadata,
const rmq::HeartbeatRequest* request,
std::function<void(const grpc::Status&, const rmq::HeartbeatResponse&)> cb) {
auto response = new rmq::HeartbeatResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->Heartbeat(client_context, request, response, callback);
}
void SessionImpl::healthCheck(absl::flat_hash_map<std::string, std::string> metadata,
const rmq::HealthCheckRequest* request,
std::function<void(const grpc::Status&, const rmq::HealthCheckResponse&)> cb) {
auto response = new rmq::HealthCheckResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->HealthCheck(client_context, request, response, callback);
}
void SessionImpl::endTransaction(absl::flat_hash_map<std::string, std::string> metadata,
const rmq::EndTransactionRequest* request,
std::function<void(const grpc::Status&, const rmq::EndTransactionResponse&)> cb) {
auto response = new rmq::EndTransactionResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->EndTransaction(client_context, request, response, callback);
}
void SessionImpl::queryOffset(absl::flat_hash_map<std::string, std::string> metadata,
const rmq::QueryOffsetRequest* request,
std::function<void(const grpc::Status&, const rmq::QueryOffsetResponse&)> cb) {
auto response = new rmq::QueryOffsetResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->QueryOffset(client_context, request, response, callback);
}
void SessionImpl::pull(absl::flat_hash_map<std::string, std::string> metadata, const rmq::PullMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::PullMessageResponse&)> cb) {
auto response = new rmq::PullMessageResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->PullMessage(client_context, request, response, callback);
}
void SessionImpl::forwardMessageToDeadLetterQueue(
absl::flat_hash_map<std::string, std::string> metadata, const rmq::ForwardMessageToDeadLetterQueueRequest* request,
std::function<void(const grpc::Status&, const rmq::ForwardMessageToDeadLetterQueueResponse&)> cb) {
auto response = new rmq::ForwardMessageToDeadLetterQueueResponse;
auto client_context = new grpc::ClientContext;
addMetadata(metadata, client_context);
setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status status) {
auto reply = absl::WrapUnique(response);
auto ctx = absl::WrapUnique(client_context);
cb(status, *reply);
};
stub_->async()->ForwardMessageToDeadLetterQueue(client_context, request, response, callback);
}
void SessionImpl::addMetadata(const absl::flat_hash_map<std::string, std::string>& metadata,
grpc::ClientContext* client_context) {
for (const auto& entry : metadata) {
client_context->AddMetadata(entry.first, entry.second);
}
}
void SessionImpl::setDeadline(std::chrono::milliseconds timeout, grpc::ClientContext* client_context) {
client_context->set_deadline(std::chrono::system_clock::now() + io_timeout_);
}
ROCKETMQ_NAMESPACE_END