Add cpp_httplib-based http-client (#19)
Use httplib based HTTP client
diff --git a/bazel/rocketmq_deps.bzl b/bazel/rocketmq_deps.bzl
index a179559..acee2ac 100644
--- a/bazel/rocketmq_deps.bzl
+++ b/bazel/rocketmq_deps.bzl
@@ -119,11 +119,10 @@
],
)
- # Curl library
maybe(
http_archive,
- name = "com_github_curl",
- build_file = "@org_apache_rocketmq//third_party:curl.BUILD",
- strip_prefix = "curl-master",
- urls = ["https://github.com/curl/curl/archive/master.zip"],
+ name = "com_github_yhirose_cpp_httplib",
+ build_file = "@org_apache_rocketmq//third_party:cpp_httplib.BUILD",
+ strip_prefix = "cpp-httplib-0.9.4",
+ urls = ["https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.9.4.tar.gz"],
)
\ No newline at end of file
diff --git a/src/main/cpp/base/BUILD.bazel b/src/main/cpp/base/BUILD.bazel
index 88aecd0..1d99854 100644
--- a/src/main/cpp/base/BUILD.bazel
+++ b/src/main/cpp/base/BUILD.bazel
@@ -3,67 +3,27 @@
package(default_visibility = ["//visibility:public"])
cc_library(
- name = "http_client_interface",
- hdrs = [
- "include/HttpClient.h",
- ],
- strip_include_prefix = "//src/main/cpp/base/include",
- deps = [
- "//api:rocketmq_interface",
- "@com_google_absl//absl/container:flat_hash_map",
- ],
-)
-
-cc_library(
- name = "ghttp_client",
- hdrs = [
- "include/GHttpClient.h",
- ],
- srcs = [
- "GHttpClient.cpp",
- ],
- deps = [
- ":http_client_interface",
- "//src/main/cpp/log:log_library",
- "@com_github_fmtlib_fmt//:fmtlib",
- "@com_github_grpc_grpc//:grpc",
- ],
- strip_include_prefix = "//src/main/cpp/base/include",
-)
-
-cc_library(
- name = "top_addressing",
- hdrs = [
- "include/TopAddressing.h",
- "include/HostInfo.h"
- ],
- srcs = [
- "TopAddressing.cpp",
- "HostInfo.cpp",
- ],
- strip_include_prefix = "//src/main/cpp/base/include",
- deps = [
- ":ghttp_client",
- ],
-)
-
-cc_library(
name = "base_library",
hdrs = glob(["include/*.h"]),
srcs = glob(["*.cpp"]),
strip_include_prefix = "//src/main/cpp/base/include",
deps = [
"//src/main/cpp/log:log_library",
+ "@com_github_fmtlib_fmt//:fmtlib",
+ "@com_google_absl//absl/base",
+ "@com_google_absl//absl/container:flat_hash_map",
+ "@com_google_absl//absl/container:flat_hash_set",
+ "@com_google_absl//absl/synchronization",
"@com_google_absl//absl/random",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/time",
"@com_github_gabime_spdlog//:spdlog",
"@com_github_grpc_grpc//:grpc",
"@com_googlesource_code_re2//:re2",
- "@zlib//:zlib",
"@boringssl//:crypto",
"@boringssl//:ssl",
"//external:madler_zlib",
+ "@com_github_yhirose_cpp_httplib//:cpp_httplib",
"@asio//:asio",
],
)
\ No newline at end of file
diff --git a/src/main/cpp/base/GHttpClient.cpp b/src/main/cpp/base/GHttpClient.cpp
deleted file mode 100644
index e4e1c87..0000000
--- a/src/main/cpp/base/GHttpClient.cpp
+++ /dev/null
@@ -1,164 +0,0 @@
-#include "GHttpClient.h"
-#include "absl/strings/str_join.h"
-#include "fmt/format.h"
-#include <atomic>
-#include <chrono>
-#include <cstddef>
-#include <cstring>
-#include <functional>
-#include <string>
-#include <thread>
-
-#include "LoggerImpl.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-GHttpClient::GHttpClient() : shutdown_(false) {
- grpc_core::ExecCtx exec_ctx;
- grpc_httpcli_context_init(&http_context_);
- grpc_pollset* pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(pollset, &http_mtx_);
- http_polling_entity_ = grpc_polling_entity_create_from_pollset(pollset);
- GRPC_CLOSURE_INIT(&destroy_, &GHttpClient::destroyPollingEntity, &http_polling_entity_, grpc_schedule_on_exec_ctx);
-}
-
-GHttpClient::~GHttpClient() {
- SPDLOG_INFO("GHttpClient::~GHttpClient() starts");
- shutdown();
- grpc_core::ExecCtx exec_ctx;
- grpc_httpcli_context_destroy(&http_context_);
- grpc_pollset_shutdown(grpc_polling_entity_pollset(&http_polling_entity_), &destroy_);
- SPDLOG_INFO("GHttpClient::~GHttpClient() completed");
-}
-
-const int64_t GHttpClient::POLL_INTERVAL = 1000;
-
-const int GHttpClient::STATUS_OK = 200;
-
-void GHttpClient::poll() {
- grpc_core::ExecCtx exec_ctx;
- while (!shutdown_) {
- gpr_mu_lock(http_mtx_);
- grpc_error_handle error = grpc_pollset_work(grpc_polling_entity_pollset(&http_polling_entity_), &worker_,
- grpc_core::ExecCtx::Get()->Now() + POLL_INTERVAL);
- gpr_mu_unlock(http_mtx_);
- if (error) {
- SPDLOG_WARN("grpc_pollset_work failed");
- } else {
- SPDLOG_TRACE("grpc_pollset_work returned. grpc_pollset_size: {}", grpc_pollset_size());
- }
- submit0();
- }
- SPDLOG_INFO("GHttpClient::poll completed");
-}
-
-void GHttpClient::start() {
- SPDLOG_INFO("GHttpClient::start()");
- if (!shutdown_) {
- loop_ = std::thread(std::bind(&GHttpClient::poll, this));
- SPDLOG_INFO("GHttpClient starts to poll");
- }
-}
-
-void GHttpClient::shutdown() {
- bool expected = false;
- if (shutdown_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
- SPDLOG_INFO("GHttpClient::shutdown()");
- if (loop_.joinable()) {
- loop_.join();
- SPDLOG_INFO("GHttpClient#loop thread quit OK");
- }
- }
-}
-
-void GHttpClient::get(
- HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>& cb) {
- auto http_invocation_context = new HttpInvocationContext();
- std::string http_host = fmt::format("{}:{}", host, port);
- http_invocation_context->host = http_host;
- http_invocation_context->path = path;
- http_invocation_context->request.host = const_cast<char*>(http_invocation_context->host.c_str());
- http_invocation_context->request.http.path = const_cast<char*>(http_invocation_context->path.c_str());
- http_invocation_context->request.handshaker = &grpc_httpcli_plaintext;
- http_invocation_context->callback = cb;
-
- {
- absl::MutexLock lk(&pending_requests_mtx_);
- pending_requests_.emplace_back(http_invocation_context);
- SPDLOG_TRACE("Add HTTP request to pending list");
- }
-
- {
- grpc_core::ExecCtx exec_ctx;
- SPDLOG_TRACE("Prepare to grpc_pollset_kick");
- {
- gpr_mu_lock(http_mtx_);
- grpc_error_handle error = grpc_pollset_kick(grpc_polling_entity_pollset(&http_polling_entity_), nullptr);
- gpr_mu_unlock(http_mtx_);
- if (GRPC_ERROR_NONE != error) {
- SPDLOG_WARN("grpc_pollset_kick failed");
- } else {
- SPDLOG_TRACE("grpc_pollset_kick completed");
- }
- }
- }
-}
-
-void GHttpClient::submit0() {
- absl::MutexLock lk(&pending_requests_mtx_);
- if (pending_requests_.empty()) {
- SPDLOG_TRACE("No pending HTTP requests");
- return;
- }
-
- SPDLOG_DEBUG("Add {} pending HTTP requests to pollset", pending_requests_.size());
- grpc_core::ExecCtx exec_ctx;
- {
- for (auto it = pending_requests_.begin(); it != pending_requests_.end();) {
- auto http_invocation_context = *it;
- SPDLOG_TRACE("Prepare to create quota");
- grpc_resource_quota* resource_quota = grpc_resource_quota_create("get");
- SPDLOG_TRACE("Quota created");
- SPDLOG_TRACE("grpc_httpcli_get starts");
- grpc_httpcli_get(
- &http_context_, &http_polling_entity_, resource_quota, &http_invocation_context->request,
- grpc_core::ExecCtx::Get()->Now() + absl::ToInt64Milliseconds(absl::Seconds(3)),
- GRPC_CLOSURE_CREATE(&GHttpClient::onCompletion, http_invocation_context, grpc_schedule_on_exec_ctx),
- &http_invocation_context->response);
- SPDLOG_TRACE("grpc_httpcli_get completed");
- grpc_resource_quota_unref_internal(resource_quota);
- SPDLOG_TRACE("Resource quota unref completed");
- it = pending_requests_.erase(it);
- }
- }
-}
-
-void GHttpClient::destroyPollingEntity(void* arg, grpc_error_handle error) {
- auto polling_entity = reinterpret_cast<grpc_polling_entity*>(arg);
- grpc_pollset_destroy(grpc_polling_entity_pollset(polling_entity));
-}
-
-void GHttpClient::onCompletion(void* arg, grpc_error_handle error) {
- auto context = reinterpret_cast<HttpInvocationContext*>(arg);
- absl::flat_hash_map<std::string, std::string> metadata;
-
- if (error) {
- context->callback(500, metadata, std::string());
- SPDLOG_WARN("HTTP request failed");
- delete context;
- return;
- }
-
- for (size_t i = 0; i < context->response.hdr_count; i++) {
- grpc_http_header* hdr = context->response.hdrs + i;
- metadata.insert({std::string(hdr->key), std::string(hdr->value)});
- }
- std::string body(context->response.body, context->response.body_length);
- SPDLOG_DEBUG("HTTP response received. Code: {}, response-headers: {}, response-body: {}", context->response.status,
- absl::StrJoin(metadata, ",", absl::PairFormatter("=")), body);
- context->callback(context->response.status, metadata, body);
- delete context;
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/base/HttpClientImpl.cpp b/src/main/cpp/base/HttpClientImpl.cpp
new file mode 100644
index 0000000..e4fbfa5
--- /dev/null
+++ b/src/main/cpp/base/HttpClientImpl.cpp
@@ -0,0 +1,76 @@
+#include "HttpClientImpl.h"
+
+#include <memory>
+#include <string>
+
+#include "fmt/format.h"
+#include "spdlog/spdlog.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+HttpClientImpl::HttpClientImpl() {}
+
+HttpClientImpl::~HttpClientImpl() {}
+
+void HttpClientImpl::start() {}
+
+void HttpClientImpl::shutdown() {}
+
+/**
+ * @brief We current implement this function in sync mode since async http request in CURL is sort of unnecessarily
+ * complex.
+ *
+ * @param protocol
+ * @param host
+ * @param port
+ * @param path
+ * @param cb
+ */
+void HttpClientImpl::get(
+ HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) {
+
+ std::string key;
+ switch (protocol) {
+ case HttpProtocol::HTTP:
+ key = fmt::format("http://{}:{}", host, port);
+ break;
+ case HttpProtocol::HTTPS:
+ key = fmt::format("https://{}:{}", host, port);
+ break;
+ }
+
+ std::shared_ptr<httplib::Client> client;
+ {
+ absl::MutexLock lk(&clients_mtx_);
+ if (clients_.contains(key)) {
+ client = clients_[key];
+ }
+
+ if (!client || !client->is_valid()) {
+ client = std::make_shared<httplib::Client>(key);
+ clients_.insert_or_assign(key, client);
+ }
+ }
+
+ if (!client || !client->is_valid()) {
+ int code = 400;
+ std::multimap<std::string, std::string> headers;
+ std::string response;
+ cb(code, headers, response);
+ return;
+ }
+
+ auto res = client->Get(path.c_str());
+
+ std::multimap<std::string, std::string> headers;
+ for (auto& header : headers) {
+ headers.insert({header.first, header.second});
+ }
+
+ cb(res->status, headers, res->body);
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/base/TopAddressing.cpp b/src/main/cpp/base/TopAddressing.cpp
index 17e36ce..0c21969 100644
--- a/src/main/cpp/base/TopAddressing.cpp
+++ b/src/main/cpp/base/TopAddressing.cpp
@@ -1,6 +1,6 @@
#include "TopAddressing.h"
-#include "GHttpClient.h"
+#include "HttpClientImpl.h"
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "absl/strings/str_split.h"
@@ -12,7 +12,7 @@
TopAddressing::TopAddressing() : TopAddressing("jmenv.tbsite.net", 8080, "/rocketmq/nsaddr") {}
TopAddressing::TopAddressing(std::string host, int port, std::string path)
- : host_(std::move(host)), port_(port), path_(std::move(path)), http_client_(absl::make_unique<GHttpClient>()) {
+ : host_(std::move(host)), port_(port), path_(std::move(path)), http_client_(absl::make_unique<HttpClientImpl>()) {
http_client_->start();
}
@@ -36,10 +36,9 @@
query_string.append("nofix=1");
}
- auto callback = [cb](int code, const absl::flat_hash_map<std::string, std::string>& metadata,
- const std::string& body) {
+ auto callback = [cb](int code, const std::multimap<std::string, std::string>& metadata, const std::string& body) {
SPDLOG_DEBUG("Receive HTTP response. Code: {}, body: {}", code, body);
- if (GHttpClient::STATUS_OK == code) {
+ if (static_cast<int>(HttpStatus::OK) == code) {
cb(true, absl::StrSplit(body, ';'));
} else {
std::vector<std::string> name_server_list;
diff --git a/src/main/cpp/base/include/GHttpClient.h b/src/main/cpp/base/include/GHttpClient.h
deleted file mode 100644
index 0cfcbc1..0000000
--- a/src/main/cpp/base/include/GHttpClient.h
+++ /dev/null
@@ -1,67 +0,0 @@
-#include "rocketmq/RocketMQ.h"
-
-#include "HttpClient.h"
-#include "absl/base/thread_annotations.h"
-#include "absl/synchronization/mutex.h"
-#include "src/core/lib/http/httpcli.h"
-#include "src/core/lib/iomgr/iomgr.h"
-#include <atomic>
-#include <cstdint>
-#include <cstring>
-#include <functional>
-#include <string>
-#include <thread>
-#include <vector>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-struct HttpInvocationContext {
- HttpInvocationContext() { memset(&request, 0, sizeof(request)); }
- std::string host;
- std::string path;
- grpc_httpcli_request request;
- grpc_http_response response;
- std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)> callback;
-};
-
-class GHttpClient : public HttpClient {
-public:
- GHttpClient();
-
- ~GHttpClient() override;
-
- void start() override;
-
- void shutdown() override;
-
- void get(HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>& cb)
- override;
-
- static const int STATUS_OK;
-
-private:
- static void onCompletion(void* arg, grpc_error_handle error);
-
- static void destroyPollingEntity(void* arg, grpc_error_handle error);
-
- void poll();
-
- void submit0() LOCKS_EXCLUDED(pending_requests_mtx_);
-
- grpc_httpcli_context http_context_;
- grpc_polling_entity http_polling_entity_;
- gpr_mu* http_mtx_{nullptr};
-
- std::thread loop_;
- grpc_pollset_worker* worker_;
- grpc_closure destroy_;
- std::atomic_bool shutdown_;
-
- std::vector<HttpInvocationContext*> pending_requests_ GUARDED_BY(pending_requests_mtx_);
- absl::Mutex pending_requests_mtx_;
-
- static const int64_t POLL_INTERVAL;
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/base/include/HttpClient.h b/src/main/cpp/base/include/HttpClient.h
index 84b35e7..4ab9476 100644
--- a/src/main/cpp/base/include/HttpClient.h
+++ b/src/main/cpp/base/include/HttpClient.h
@@ -1,8 +1,11 @@
#pragma once
-#include "absl/container/flat_hash_map.h"
-#include "rocketmq/RocketMQ.h"
+
#include <cstdint>
#include <functional>
+#include <map>
+#include <string>
+
+#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -11,6 +14,11 @@
HTTPS = 2,
};
+enum class HttpStatus : int {
+ OK = 200,
+ INTERNAL = 500,
+};
+
class HttpClient {
public:
virtual ~HttpClient() = default;
@@ -21,7 +29,7 @@
virtual void
get(HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>& cb) = 0;
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) = 0;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/base/include/HttpClientImpl.h b/src/main/cpp/base/include/HttpClientImpl.h
new file mode 100644
index 0000000..8e42a33
--- /dev/null
+++ b/src/main/cpp/base/include/HttpClientImpl.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "HttpClient.h"
+
+#include "absl/base/thread_annotations.h"
+#include "absl/container/flat_hash_map.h"
+#include "absl/synchronization/mutex.h"
+#include "httplib.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class HttpClientImpl : public HttpClient {
+public:
+ HttpClientImpl();
+
+ ~HttpClientImpl() override;
+
+ void start() override;
+
+ void shutdown() override;
+
+ void
+ get(HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) override;
+
+private:
+ absl::flat_hash_map<std::string, std::shared_ptr<httplib::Client>> clients_ GUARDED_BY(clients_mtx_);
+ absl::Mutex clients_mtx_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/base/mocks/include/HttpClientMock.h b/src/main/cpp/base/mocks/include/HttpClientMock.h
index 392756a..6b76cb4 100644
--- a/src/main/cpp/base/mocks/include/HttpClientMock.h
+++ b/src/main/cpp/base/mocks/include/HttpClientMock.h
@@ -1,19 +1,21 @@
#pragma once
-#include "gmock/gmock.h"
#include "HttpClient.h"
#include "rocketmq/RocketMQ.h"
+#include "gmock/gmock.h"
ROCKETMQ_NAMESPACE_BEGIN
class HttpClientMock : public HttpClient {
public:
~HttpClientMock() override = default;
-
+
MOCK_METHOD(void, start, (), (override));
MOCK_METHOD(void, shutdown, (), (override));
- MOCK_METHOD(void, get, (HttpProtocol, const std::string&, std::uint16_t, const std::string&,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>&), (override));
+ MOCK_METHOD(void, get,
+ (HttpProtocol, const std::string&, std::uint16_t, const std::string&,
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>&),
+ (override));
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index d91250a..4bbc90f 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -1,14 +1,3 @@
-#include "ClientImpl.h"
-#include "ClientManagerFactory.h"
-#include "GHttpClient.h"
-#include "InvocationContext.h"
-#include "LoggerImpl.h"
-#include "MessageAccessor.h"
-#include "Signature.h"
-#include "absl/strings/str_join.h"
-#include "absl/strings/str_split.h"
-#include "apache/rocketmq/v1/definition.pb.h"
-#include "rocketmq/MQMessageExt.h"
#include <algorithm>
#include <chrono>
#include <cstdint>
@@ -17,6 +6,19 @@
#include <string>
#include <utility>
+#include "absl/strings/str_join.h"
+#include "absl/strings/str_split.h"
+#include "apache/rocketmq/v1/definition.pb.h"
+
+#include "ClientImpl.h"
+#include "ClientManagerFactory.h"
+#include "HttpClientImpl.h"
+#include "InvocationContext.h"
+#include "LoggerImpl.h"
+#include "MessageAccessor.h"
+#include "Signature.h"
+#include "rocketmq/MQMessageExt.h"
+
ROCKETMQ_NAMESPACE_BEGIN
ClientImpl::ClientImpl(std::string group_name) : ClientConfigImpl(std::move(group_name)), state_(State::CREATED) {}
@@ -180,7 +182,7 @@
std::vector<std::string> list;
SPDLOG_DEBUG("Begin to renew name server list");
auto callback = [this](int code, const std::vector<std::string>& name_server_list) {
- if (GHttpClient::STATUS_OK != code) {
+ if (static_cast<int>(HttpStatus::OK) != code) {
SPDLOG_WARN("Failed to fetch name server list");
return;
}
diff --git a/src/main/cpp/rocketmq/CredentialsProvider.cpp b/src/main/cpp/rocketmq/CredentialsProvider.cpp
index 942ecb3..3d79926 100644
--- a/src/main/cpp/rocketmq/CredentialsProvider.cpp
+++ b/src/main/cpp/rocketmq/CredentialsProvider.cpp
@@ -1,18 +1,19 @@
-#include "GHttpClient.h"
-#include "MixAll.h"
-#include "StsCredentialsProviderImpl.h"
+#include <cstdlib>
+#include <fstream>
+#include <iostream>
+#include <string>
+
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "fmt/format.h"
#include "ghc/filesystem.hpp"
#include "google/protobuf/struct.pb.h"
#include "google/protobuf/util/json_util.h"
-#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
-#include <cstdlib>
-#include <fstream>
-#include <iostream>
-#include <string>
+
+#include "MixAll.h"
+#include "StsCredentialsProviderImpl.h"
+#include "rocketmq/Logger.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -100,9 +101,7 @@
Credentials StsCredentialsProvider::getCredentials() { return impl_->getCredentials(); }
StsCredentialsProviderImpl::StsCredentialsProviderImpl(std::string ram_role_name)
- : ram_role_name_(std::move(ram_role_name)), http_client_(absl::make_unique<GHttpClient>()) {
- http_client_->start();
-}
+ : ram_role_name_(std::move(ram_role_name)) {}
StsCredentialsProviderImpl::~StsCredentialsProviderImpl() { http_client_->shutdown(); }
@@ -122,10 +121,9 @@
absl::Mutex sync_mtx;
absl::CondVar sync_cv;
bool completed = false;
- auto callback = [&, this](int code, const absl::flat_hash_map<std::string, std::string>& headers,
- const std::string& body) {
+ auto callback = [&, this](int code, const std::multimap<std::string, std::string>& headers, const std::string& body) {
SPDLOG_DEBUG("Received STS response. Code: {}", code);
- if (GHttpClient::STATUS_OK == code) {
+ if (static_cast<int>(HttpStatus::OK) == code) {
google::protobuf::Struct doc;
google::protobuf::util::Status status = google::protobuf::util::JsonStringToMessage(body, &doc);
if (status.ok()) {
diff --git a/src/main/cpp/rocketmq/include/StsCredentialsProviderImpl.h b/src/main/cpp/rocketmq/include/StsCredentialsProviderImpl.h
index d936195..9429355 100644
--- a/src/main/cpp/rocketmq/include/StsCredentialsProviderImpl.h
+++ b/src/main/cpp/rocketmq/include/StsCredentialsProviderImpl.h
@@ -2,8 +2,9 @@
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
-#include "rocketmq/CredentialsProvider.h"
+
#include "HttpClient.h"
+#include "rocketmq/CredentialsProvider.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -15,14 +16,7 @@
Credentials getCredentials() override;
- /**
- * @brief Expose http_client for test purpose only.
- *
- * @param http_client
- */
- void setHttpClient(std::unique_ptr<HttpClient> http_client) {
- http_client_ = std::move(http_client);
- }
+ void withHttpClient(std::unique_ptr<HttpClient> http_client) { http_client_ = std::move(http_client); }
private:
static const char* RAM_ROLE_HOST;
@@ -32,7 +26,7 @@
static const char* FIELD_SESSION_TOKEN;
static const char* FIELD_EXPIRATION;
static const char* EXPIRATION_DATE_TIME_FORMAT;
-
+
std::string ram_role_name_;
std::string access_key_ GUARDED_BY(mtx_);
diff --git a/src/test/cpp/it/BUILD.bazel b/src/test/cpp/it/BUILD.bazel
index 6bc10df..c2e9190 100644
--- a/src/test/cpp/it/BUILD.bazel
+++ b/src/test/cpp/it/BUILD.bazel
@@ -5,7 +5,6 @@
"TopAddressingTest.cpp",
],
deps = [
- "//src/main/cpp/base:top_addressing",
"//src/main/cpp/base:base_library",
"@com_google_googletest//:gtest_main",
],
diff --git a/src/test/cpp/it/TopAddressingTest.cpp b/src/test/cpp/it/TopAddressingTest.cpp
index 80d0dee..91873d4 100644
--- a/src/test/cpp/it/TopAddressingTest.cpp
+++ b/src/test/cpp/it/TopAddressingTest.cpp
@@ -1,13 +1,16 @@
-#include "TopAddressing.h"
-#include "RateLimiter.h"
-#include "grpc/grpc.h"
-#include "spdlog/spdlog.h"
-#include "gtest/gtest.h"
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <thread>
+#include "absl/synchronization/mutex.h"
+#include "grpc/grpc.h"
+#include "spdlog/spdlog.h"
+#include "gtest/gtest.h"
+
+#include "RateLimiter.h"
+#include "TopAddressing.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class TopAddressingTest : public testing::Test {
@@ -21,15 +24,15 @@
void SetEnv(const char* key, const char* value) {
int overwrite = 1;
- #ifdef _WIN32
+#ifdef _WIN32
std::string env;
env.append(key);
env.push_back('=');
env.append(value);
_putenv(env.c_str());
- #else
+#else
setenv(key, value, overwrite);
- #endif
+#endif
}
};
diff --git a/src/test/cpp/ut/base/BUILD.bazel b/src/test/cpp/ut/base/BUILD.bazel
index cecd4f7..3e60c18 100644
--- a/src/test/cpp/ut/base/BUILD.bazel
+++ b/src/test/cpp/ut/base/BUILD.bazel
@@ -47,12 +47,12 @@
)
cc_test(
- name = "ghttp_client_test",
+ name = "http_client_test",
srcs = [
- "GHttpClientTest.cpp",
+ "HttpClientTest.cpp",
],
deps = [
- "//src/main/cpp/base:ghttp_client",
+ "//src/main/cpp/base:base_library",
"@com_google_googletest//:gtest_main",
],
)
@@ -64,7 +64,6 @@
],
deps = [
"//src/main/cpp/base:base_library",
- "@com_google_absl//absl/container:flat_hash_set",
"@com_google_googletest//:gtest_main",
],
)
diff --git a/src/test/cpp/ut/base/GHttpClientTest.cpp b/src/test/cpp/ut/base/GHttpClientTest.cpp
deleted file mode 100644
index eebbf26..0000000
--- a/src/test/cpp/ut/base/GHttpClientTest.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-#include "GHttpClient.h"
-#include "LoggerImpl.h"
-#include "rocketmq/RocketMQ.h"
-#include "gtest/gtest.h"
-#include <chrono>
-#include <iostream>
-#include <string>
-#include <thread>
-ROCKETMQ_NAMESPACE_BEGIN
-
-class GHttpClientTest : public testing::Test {
-public:
- void SetUp() override {
- SPDLOG_DEBUG("GHttpClient::SetUp() starts");
- grpc_init();
- SPDLOG_DEBUG("GHttpClient::SetUp() completed");
- }
-
- void TearDown() override {
- SPDLOG_DEBUG("GHttpClientTest::TearDown() starts");
- grpc_shutdown();
- SPDLOG_DEBUG("GHttpClientTest::TearDown() completed");
- }
-};
-
-TEST_F(GHttpClientTest, testCtor) {
- spdlog::set_level(spdlog::level::debug);
- GHttpClient http_client;
- absl::Mutex mtx;
- absl::CondVar cv;
- bool completed = false;
- auto callback = [&](int status, const absl::flat_hash_map<std::string, std::string>& metadata,
- const std::string& body) {
- absl::MutexLock lk(&mtx);
- completed = true;
- SPDLOG_DEBUG("HTTP status-code: {}, response body: {}", status, body);
- cv.SignalAll();
- };
-
- std::string host("grpc.io");
- http_client.start();
- http_client.get(HttpProtocol::HTTP, host, 8080, "/", callback);
- if (!completed) {
- absl::MutexLock lk(&mtx);
- cv.WaitWithTimeout(&mtx, absl::Seconds(5));
- }
- ASSERT_TRUE(completed);
- http_client.shutdown();
-}
-
-TEST_F(GHttpClientTest, testDestructWithoutShutdown) {
- spdlog::set_level(spdlog::level::debug);
- GHttpClient http_client;
- absl::Mutex mtx;
- absl::CondVar cv;
- bool completed = false;
- auto callback = [&](int status, const absl::flat_hash_map<std::string, std::string>& metadata,
- const std::string& body) {
- absl::MutexLock lk(&mtx);
- completed = true;
- SPDLOG_DEBUG("HTTP status-code: {}, response body: {}", status, body);
- cv.SignalAll();
- };
-
- std::string host("www.aliyun.com");
- http_client.start();
- http_client.get(HttpProtocol::HTTP, host, 8080, "/", callback);
- if (!completed) {
- absl::MutexLock lk(&mtx);
- cv.WaitWithTimeout(&mtx, absl::Seconds(5));
- }
- ASSERT_TRUE(completed);
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/base/HttpClientTest.cpp b/src/test/cpp/ut/base/HttpClientTest.cpp
new file mode 100644
index 0000000..786bc13
--- /dev/null
+++ b/src/test/cpp/ut/base/HttpClientTest.cpp
@@ -0,0 +1,50 @@
+#include <chrono>
+#include <iostream>
+#include <string>
+#include <thread>
+
+#include "gtest/gtest.h"
+
+#include "HttpClientImpl.h"
+#include "LoggerImpl.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class HttpClientTest : public testing::Test {
+public:
+ void SetUp() override {
+ SPDLOG_DEBUG("GHttpClient::SetUp() starts");
+ http_client.start();
+ SPDLOG_DEBUG("GHttpClient::SetUp() completed");
+ }
+
+ void TearDown() override {
+ SPDLOG_DEBUG("GHttpClientTest::TearDown() starts");
+ http_client.shutdown();
+ SPDLOG_DEBUG("GHttpClientTest::TearDown() completed");
+ }
+
+protected:
+ HttpClientImpl http_client;
+};
+
+TEST_F(HttpClientTest, testBasics) {}
+
+TEST_F(HttpClientTest, testGet) {
+ auto cb = [](int code, const std::multimap<std::string, std::string>& headers, const std::string& body) {
+ SPDLOG_INFO("Response received. Status-code: {}, Body: {}", code, body);
+ };
+
+ http_client.get(HttpProtocol::HTTP, "www.baidu.com", 80, "/", cb);
+}
+
+TEST_F(HttpClientTest, DISABLED_testJMEnv) {
+ auto cb = [](int code, const std::multimap<std::string, std::string>& headers, const std::string& body) {
+ SPDLOG_INFO("Response received. Status-code: {}, Body: {}", code, body);
+ };
+
+ http_client.get(HttpProtocol::HTTP, "jmenv.tbsite.net", 8080, "/rocketmq/nsaddr", cb);
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/base/TopAddressingTest.cpp b/src/test/cpp/ut/base/TopAddressingTest.cpp
index 6e24e1b..836132f 100644
--- a/src/test/cpp/ut/base/TopAddressingTest.cpp
+++ b/src/test/cpp/ut/base/TopAddressingTest.cpp
@@ -1,12 +1,15 @@
-#include "TopAddressing.h"
-#include "HttpClientMock.h"
-#include "grpc/grpc.h"
-#include "gtest/gtest.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include "absl/synchronization/mutex.h"
+#include "grpc/grpc.h"
+#include "gtest/gtest.h"
+
+#include "HttpClientMock.h"
+#include "TopAddressing.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class TopAddressingTest : public testing::Test {
@@ -17,9 +20,8 @@
auto http_client_ = absl::make_unique<testing::NiceMock<HttpClientMock>>();
auto mock_get =
[](HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>&
- cb) {
- absl::flat_hash_map<std::string, std::string> headers;
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) {
+ std::multimap<std::string, std::string> headers;
std::string body("10.0.0.1:9876");
cb(200, headers, body);
};
diff --git a/src/test/cpp/ut/rocketmq/ClientImplTest.cpp b/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
index a243de1..ea071f1 100644
--- a/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
@@ -52,7 +52,7 @@
std::string once{"10.0.0.1:9876"};
std::string then{"10.0.0.1:9876;10.0.0.2:9876"};
- absl::flat_hash_map<std::string, std::string> header;
+ std::multimap<std::string, std::string> header;
bool completed = false;
absl::Mutex mtx;
@@ -61,12 +61,12 @@
int http_status = 200;
auto once_cb =
[&](HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>&
- cb) { cb(http_status, header, once); };
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) {
+ cb(http_status, header, once);
+ };
auto then_cb =
[&](HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>&
- cb) {
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) {
cb(http_status, header, then);
absl::MutexLock lk(&mtx);
completed = true;
diff --git a/src/test/cpp/ut/rocketmq/StsCredentialsProviderImplTest.cpp b/src/test/cpp/ut/rocketmq/StsCredentialsProviderImplTest.cpp
index f9b85fc..19ccf8f 100644
--- a/src/test/cpp/ut/rocketmq/StsCredentialsProviderImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/StsCredentialsProviderImplTest.cpp
@@ -1,12 +1,12 @@
#include "StsCredentialsProviderImpl.h"
#include "HttpClientMock.h"
#include "absl/memory/memory.h"
+#include "grpc/grpc.h"
#include "rocketmq/RocketMQ.h"
#include "gtest/gtest.h"
#include <algorithm>
#include <memory>
#include <string>
-#include "grpc/grpc.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -18,9 +18,8 @@
auto http_client_ = absl::make_unique<testing::NiceMock<HttpClientMock>>();
auto http_get_action =
[](HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path,
- const std::function<void(int, const absl::flat_hash_map<std::string, std::string>&, const std::string&)>&
- cb) {
- absl::flat_hash_map<std::string, std::string> header;
+ const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) {
+ std::multimap<std::string, std::string> header;
std::string body = R"(
{
"AccessKeyId": "key",
@@ -35,12 +34,10 @@
};
EXPECT_CALL(*http_client_, get).Times(testing::AtLeast(1)).WillRepeatedly(testing::Invoke(http_get_action));
- sts_credentials_provider->setHttpClient(std::move(http_client_));
+ sts_credentials_provider->withHttpClient(std::move(http_client_));
}
- void TearDown() override {
- grpc_shutdown();
- }
+ void TearDown() override { grpc_shutdown(); }
protected:
std::shared_ptr<StsCredentialsProviderImpl> sts_credentials_provider;
diff --git a/third_party/asio.BUILD b/third_party/asio.BUILD
index af74274..98f590d 100644
--- a/third_party/asio.BUILD
+++ b/third_party/asio.BUILD
@@ -3,7 +3,9 @@
name = "asio",
hdrs = glob(["include/**/*.hpp", "include/**/*.ipp"]),
visibility = ["//visibility:public"],
- strip_include_prefix = "include",
+ includes = [
+ "include",
+ ],
defines = [
"ASIO_STANDALONE",
"ASIO_HAS_STD_ADDRESSOF",
diff --git a/third_party/cpp_httplib.BUILD b/third_party/cpp_httplib.BUILD
new file mode 100644
index 0000000..cf425f7
--- /dev/null
+++ b/third_party/cpp_httplib.BUILD
@@ -0,0 +1,17 @@
+load("@rules_cc//cc:defs.bzl", "cc_library")
+
+cc_library(
+ name = "cpp_httplib",
+ hdrs = [
+ "httplib.h",
+ ],
+ visibility = [
+ "//visibility:public",
+ ],
+ deps = [
+ "//external:madler_zlib",
+ ],
+ defines = [
+ "CPPHTTPLIB_ZLIB_SUPPORT",
+ ],
+)
\ No newline at end of file