blob: 176da3b9f7f11c0da224509467098ce97c9e78b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MQClientAPIImpl.h"
#include <cassert>
#include <cstring>
#include <typeindex>
#include "ClientRemotingProcessor.h"
#include "MQClientInstance.h"
#include "MessageBatch.h"
#include "MessageClientIDSetter.h"
#include "PullCallbackWrap.h"
#include "PullResultExt.hpp"
#include "SendCallbackWrap.h"
#include "TcpRemotingClient.h"
#include "protocol/body/LockBatchResponseBody.hpp"
namespace rocketmq {
MQClientAPIImpl::MQClientAPIImpl(ClientRemotingProcessor* clientRemotingProcessor,
RPCHookPtr rpcHook,
const MQClientConfig& clientConfig)
: remoting_client_(new TcpRemotingClient(clientConfig.tcp_transport_worker_thread_nums(),
clientConfig.tcp_transport_connect_timeout(),
clientConfig.tcp_transport_try_lock_timeout())) {
remoting_client_->registerRPCHook(rpcHook);
remoting_client_->registerProcessor(CHECK_TRANSACTION_STATE, clientRemotingProcessor);
remoting_client_->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED, clientRemotingProcessor);
remoting_client_->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET, clientRemotingProcessor);
remoting_client_->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT, clientRemotingProcessor);
remoting_client_->registerProcessor(GET_CONSUMER_RUNNING_INFO, clientRemotingProcessor);
remoting_client_->registerProcessor(CONSUME_MESSAGE_DIRECTLY, clientRemotingProcessor);
remoting_client_->registerProcessor(PUSH_REPLY_MESSAGE_TO_CLIENT, clientRemotingProcessor);
}
MQClientAPIImpl::~MQClientAPIImpl() = default;
void MQClientAPIImpl::start() {
remoting_client_->start();
}
void MQClientAPIImpl::shutdown() {
remoting_client_->shutdown();
}
void MQClientAPIImpl::updateNameServerAddressList(const std::string& addrs) {
// TODO: split addrs
remoting_client_->updateNameServerAddressList(addrs);
}
void MQClientAPIImpl::createTopic(const std::string& addr, const std::string& defaultTopic, TopicConfig topicConfig) {
auto* requestHeader = new CreateTopicRequestHeader();
requestHeader->topic = topicConfig.topic_name();
requestHeader->defaultTopic = defaultTopic;
requestHeader->readQueueNums = topicConfig.read_queue_nums();
requestHeader->writeQueueNums = topicConfig.write_queue_nums();
requestHeader->perm = topicConfig.perm();
requestHeader->topicFilterType = topicConfig.topic_filter_type();
RemotingCommand request(UPDATE_AND_CREATE_TOPIC, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
return;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
std::unique_ptr<SendResult> MQClientAPIImpl::sendMessage(const std::string& addr,
const std::string& brokerName,
const MessagePtr msg,
std::unique_ptr<SendMessageRequestHeader> requestHeader,
int timeoutMillis,
CommunicationMode communicationMode,
DefaultMQProducerImplPtr producer) {
return sendMessage(addr, brokerName, msg, std::move(requestHeader), timeoutMillis, communicationMode, nullptr,
nullptr, nullptr, 0, producer);
}
std::unique_ptr<SendResult> MQClientAPIImpl::sendMessage(const std::string& addr,
const std::string& brokerName,
const MessagePtr msg,
std::unique_ptr<SendMessageRequestHeader> requestHeader,
int timeoutMillis,
CommunicationMode communicationMode,
SendCallback* sendCallback,
TopicPublishInfoPtr topicPublishInfo,
MQClientInstancePtr instance,
int retryTimesWhenSendFailed,
DefaultMQProducerImplPtr producer) {
int code = SEND_MESSAGE;
std::unique_ptr<CommandCustomHeader> header;
const auto& msgType = msg->getProperty(MQMessageConst::PROPERTY_MESSAGE_TYPE);
bool isReply = msgType == REPLY_MESSAGE_FLAG;
if (isReply) {
code = SEND_REPLY_MESSAGE_V2;
} else if (msg->isBatch()) {
code = SEND_BATCH_MESSAGE;
} else {
code = SEND_MESSAGE_V2;
}
if (code != SEND_MESSAGE && code != SEND_REPLY_MESSAGE) {
header = SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(requestHeader.get());
} else {
header = std::move(requestHeader);
}
RemotingCommand request(code, header.release());
request.set_body(msg->body());
switch (communicationMode) {
case CommunicationMode::ONEWAY:
remoting_client_->invokeOneway(addr, request);
return nullptr;
case CommunicationMode::ASYNC:
sendMessageAsync(addr, brokerName, msg, std::move(request), sendCallback, topicPublishInfo, instance,
timeoutMillis, retryTimesWhenSendFailed, producer);
return nullptr;
case CommunicationMode::SYNC:
return sendMessageSync(addr, brokerName, msg, request, timeoutMillis);
default:
assert(false);
break;
}
return nullptr;
}
void MQClientAPIImpl::sendMessageAsync(const std::string& addr,
const std::string& brokerName,
const MessagePtr msg,
RemotingCommand&& request,
SendCallback* sendCallback,
TopicPublishInfoPtr topicPublishInfo,
MQClientInstancePtr instance,
int64_t timeoutMillis,
int retryTimesWhenSendFailed,
DefaultMQProducerImplPtr producer) {
std::unique_ptr<InvokeCallback> cbw(
new SendCallbackWrap(addr, brokerName, msg, std::forward<RemotingCommand>(request), sendCallback,
topicPublishInfo, instance, retryTimesWhenSendFailed, 0, producer));
sendMessageAsyncImpl(cbw, timeoutMillis);
}
void MQClientAPIImpl::sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis) {
auto* scbw = static_cast<SendCallbackWrap*>(cbw.get());
const auto& addr = scbw->getAddr();
auto& request = scbw->getRemotingCommand();
remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
}
std::unique_ptr<SendResult> MQClientAPIImpl::sendMessageSync(const std::string& addr,
const std::string& brokerName,
const MessagePtr msg,
RemotingCommand& request,
int timeoutMillis) {
// block until response
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
return processSendResponse(brokerName, msg, response.get());
}
std::unique_ptr<SendResult> MQClientAPIImpl::processSendResponse(const std::string& brokerName,
const MessagePtr msg,
RemotingCommand* response) {
SendStatus sendStatus = SEND_OK;
switch (response->code()) {
case FLUSH_DISK_TIMEOUT:
sendStatus = SEND_FLUSH_DISK_TIMEOUT;
break;
case FLUSH_SLAVE_TIMEOUT:
sendStatus = SEND_FLUSH_SLAVE_TIMEOUT;
break;
case SLAVE_NOT_AVAILABLE:
sendStatus = SEND_SLAVE_NOT_AVAILABLE;
break;
case SUCCESS:
sendStatus = SEND_OK;
break;
default:
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
return nullptr;
}
auto* responseHeader = response->decodeCommandCustomHeader<SendMessageResponseHeader>();
assert(responseHeader != nullptr);
MQMessageQueue messageQueue(msg->topic(), brokerName, responseHeader->queueId);
std::string uniqMsgId = MessageClientIDSetter::getUniqID(*msg);
// MessageBatch
if (msg->isBatch()) {
const auto& messages = dynamic_cast<MessageBatch*>(msg.get())->messages();
uniqMsgId.clear();
uniqMsgId.reserve(33 * messages.size() + 1);
for (const auto& message : messages) {
uniqMsgId.append(MessageClientIDSetter::getUniqID(message));
uniqMsgId.append(",");
}
if (!uniqMsgId.empty()) {
uniqMsgId.resize(uniqMsgId.length() - 1);
}
}
std::unique_ptr<SendResult> sendResult(
new SendResult(sendStatus, uniqMsgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset));
sendResult->set_transaction_id(responseHeader->transactionId);
return sendResult;
}
std::unique_ptr<PullResult> MQClientAPIImpl::pullMessage(const std::string& addr,
PullMessageRequestHeader* requestHeader,
int timeoutMillis,
CommunicationMode communicationMode,
PullCallback* pullCallback) {
RemotingCommand request(PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case CommunicationMode::ASYNC:
pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return nullptr;
case CommunicationMode::SYNC:
return pullMessageSync(addr, request, timeoutMillis);
default:
assert(false);
return nullptr;
}
}
void MQClientAPIImpl::pullMessageAsync(const std::string& addr,
RemotingCommand& request,
int timeoutMillis,
PullCallback* pullCallback) {
std::unique_ptr<InvokeCallback> cbw(new PullCallbackWrap(pullCallback, this));
remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
}
std::unique_ptr<PullResult> MQClientAPIImpl::pullMessageSync(const std::string& addr,
RemotingCommand& request,
int timeoutMillis) {
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
return processPullResponse(response.get());
}
std::unique_ptr<PullResult> MQClientAPIImpl::processPullResponse(RemotingCommand* response) {
PullStatus pullStatus = NO_NEW_MSG;
switch (response->code()) {
case SUCCESS:
pullStatus = FOUND;
break;
case PULL_NOT_FOUND:
pullStatus = NO_NEW_MSG;
break;
case PULL_RETRY_IMMEDIATELY:
if ("OFFSET_OVERFLOW_BADLY" == response->remark()) {
pullStatus = NO_LATEST_MSG;
} else {
pullStatus = NO_MATCHED_MSG;
}
break;
case PULL_OFFSET_MOVED:
pullStatus = OFFSET_ILLEGAL;
break;
default:
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
// return of decodeCommandCustomHeader is non-null
auto* responseHeader = response->decodeCommandCustomHeader<PullMessageResponseHeader>();
assert(responseHeader != nullptr);
return std::unique_ptr<PullResult>(new PullResultExt(pullStatus, responseHeader->nextBeginOffset,
responseHeader->minOffset, responseHeader->maxOffset,
(int)responseHeader->suggestWhichBrokerId, response->body()));
}
MQMessageExt MQClientAPIImpl::viewMessage(const std::string& addr, int64_t phyoffset, int timeoutMillis) {
auto* requestHeader = new ViewMessageRequestHeader();
requestHeader->offset = phyoffset;
RemotingCommand request(VIEW_MESSAGE_BY_ID, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
// TODO: ...
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
int64_t MQClientAPIImpl::searchOffset(const std::string& addr,
const std::string& topic,
int queueId,
int64_t timestamp,
int timeoutMillis) {
auto* requestHeader = new SearchOffsetRequestHeader();
requestHeader->topic = topic;
requestHeader->queueId = queueId;
requestHeader->timestamp = timestamp;
RemotingCommand request(SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto* responseHeader = response->decodeCommandCustomHeader<SearchOffsetResponseHeader>();
assert(responseHeader != nullptr);
return responseHeader->offset;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
int64_t MQClientAPIImpl::getMaxOffset(const std::string& addr,
const std::string& topic,
int queueId,
int timeoutMillis) {
auto* requestHeader = new GetMaxOffsetRequestHeader();
requestHeader->topic = topic;
requestHeader->queueId = queueId;
RemotingCommand request(GET_MAX_OFFSET, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto* responseHeader = response->decodeCommandCustomHeader<GetMaxOffsetResponseHeader>();
return responseHeader->offset;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
int64_t MQClientAPIImpl::getMinOffset(const std::string& addr,
const std::string& topic,
int queueId,
int timeoutMillis) {
auto* requestHeader = new GetMinOffsetRequestHeader();
requestHeader->topic = topic;
requestHeader->queueId = queueId;
RemotingCommand request(GET_MIN_OFFSET, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto* responseHeader = response->decodeCommandCustomHeader<GetMinOffsetResponseHeader>();
assert(responseHeader != nullptr);
return responseHeader->offset;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
int64_t MQClientAPIImpl::getEarliestMsgStoretime(const std::string& addr,
const std::string& topic,
int queueId,
int timeoutMillis) {
auto* requestHeader = new GetEarliestMsgStoretimeRequestHeader();
requestHeader->topic = topic;
requestHeader->queueId = queueId;
RemotingCommand request(GET_EARLIEST_MSG_STORETIME, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto* responseHeader = response->decodeCommandCustomHeader<GetEarliestMsgStoretimeResponseHeader>();
assert(responseHeader != nullptr);
return responseHeader->timestamp;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
void MQClientAPIImpl::getConsumerIdListByGroup(const std::string& addr,
const std::string& consumerGroup,
std::vector<std::string>& cids,
int timeoutMillis) {
auto* requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader->consumerGroup = consumerGroup;
RemotingCommand request(GET_CONSUMER_LIST_BY_GROUP, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto responseBody = response->body();
if (responseBody != nullptr && responseBody->size() > 0) {
std::unique_ptr<GetConsumerListByGroupResponseBody> body(
GetConsumerListByGroupResponseBody::Decode(*responseBody));
cids = std::move(body->consumerIdList);
return;
}
}
case SYSTEM_ERROR:
// no consumer for this group
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
int64_t MQClientAPIImpl::queryConsumerOffset(const std::string& addr,
QueryConsumerOffsetRequestHeader* requestHeader,
int timeoutMillis) {
RemotingCommand request(QUERY_CONSUMER_OFFSET, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto* responseHeader = response->decodeCommandCustomHeader<QueryConsumerOffsetResponseHeader>();
assert(responseHeader != nullptr);
return responseHeader->offset;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
void MQClientAPIImpl::updateConsumerOffset(const std::string& addr,
UpdateConsumerOffsetRequestHeader* requestHeader,
int timeoutMillis) {
RemotingCommand request(UPDATE_CONSUMER_OFFSET, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
return;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
void MQClientAPIImpl::updateConsumerOffsetOneway(const std::string& addr,
UpdateConsumerOffsetRequestHeader* requestHeader,
int timeoutMillis) {
RemotingCommand request(UPDATE_CONSUMER_OFFSET, requestHeader);
remoting_client_->invokeOneway(addr, request);
}
void MQClientAPIImpl::sendHearbeat(const std::string& addr, HeartbeatData* heartbeatData, long timeoutMillis) {
RemotingCommand request(HEART_BEAT, nullptr);
request.set_body(heartbeatData->encode());
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
LOG_DEBUG_NEW("sendHeartbeat to broker:{} success", addr);
return;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
void MQClientAPIImpl::unregisterClient(const std::string& addr,
const std::string& clientID,
const std::string& producerGroup,
const std::string& consumerGroup) {
LOG_INFO("unregisterClient to broker:%s", addr.c_str());
RemotingCommand request(UNREGISTER_CLIENT, new UnregisterClientRequestHeader(clientID, producerGroup, consumerGroup));
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS:
LOG_INFO("unregisterClient to:%s success", addr.c_str());
return;
default:
break;
}
LOG_WARN("unregisterClient fail:%s, %d", response->remark().c_str(), response->code());
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
void MQClientAPIImpl::endTransactionOneway(const std::string& addr,
EndTransactionRequestHeader* requestHeader,
const std::string& remark) {
RemotingCommand request(END_TRANSACTION, requestHeader);
request.set_remark(remark);
remoting_client_->invokeOneway(addr, request);
}
void MQClientAPIImpl::consumerSendMessageBack(const std::string& addr,
MessageExtPtr msg,
const std::string& consumerGroup,
int delayLevel,
int timeoutMillis,
int maxConsumeRetryTimes) {
auto* requestHeader = new ConsumerSendMsgBackRequestHeader();
requestHeader->group = consumerGroup;
requestHeader->originTopic = msg->topic();
requestHeader->offset = msg->commit_log_offset();
requestHeader->delayLevel = delayLevel;
requestHeader->originMsgId = msg->msg_id();
requestHeader->maxReconsumeTimes = maxConsumeRetryTimes;
RemotingCommand request(CONSUMER_SEND_MSG_BACK, requestHeader);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
return;
}
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
void MQClientAPIImpl::lockBatchMQ(const std::string& addr,
LockBatchRequestBody* requestBody,
std::vector<MQMessageQueue>& mqs,
int timeoutMillis) {
RemotingCommand request(LOCK_BATCH_MQ, nullptr);
request.set_body(requestBody->encode());
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto requestBody = response->body();
if (requestBody != nullptr && requestBody->size() > 0) {
std::unique_ptr<LockBatchResponseBody> body(LockBatchResponseBody::Decode(*requestBody));
mqs = std::move(body->lock_ok_mq_set());
} else {
mqs.clear();
}
return;
} break;
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
void MQClientAPIImpl::unlockBatchMQ(const std::string& addr,
UnlockBatchRequestBody* requestBody,
int timeoutMillis,
bool oneway) {
RemotingCommand request(UNLOCK_BATCH_MQ, nullptr);
request.set_body(requestBody->encode());
if (oneway) {
remoting_client_->invokeOneway(addr, request);
} else {
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
return;
} break;
default:
break;
}
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
}
std::unique_ptr<TopicRouteData> MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic,
int timeoutMillis) {
RemotingCommand request(GET_ROUTEINFO_BY_TOPIC, new GetRouteInfoRequestHeader(topic));
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(null, request, timeoutMillis));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto responseBody = response->body();
if (responseBody != nullptr && responseBody->size() > 0) {
return TopicRouteData::Decode(*responseBody);
}
}
case TOPIC_NOT_EXIST:
default:
break;
}
THROW_MQEXCEPTION(MQClientException, response->remark(), response->code());
}
std::unique_ptr<TopicList> MQClientAPIImpl::getTopicListFromNameServer() {
RemotingCommand request(GET_ALL_TOPIC_LIST_FROM_NAMESERVER, nullptr);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(null, request));
assert(response != nullptr);
switch (response->code()) {
case SUCCESS: {
auto responseBody = response->body();
if (responseBody != nullptr && responseBody->size() > 0) {
return TopicList::Decode(*responseBody);
}
}
default:
break;
}
THROW_MQEXCEPTION(MQClientException, response->remark(), response->code());
}
int MQClientAPIImpl::wipeWritePermOfBroker(const std::string& namesrvAddr,
const std::string& brokerName,
int timeoutMillis) {
return 0;
}
void MQClientAPIImpl::deleteTopicInBroker(const std::string& addr, const std::string& topic, int timeoutMillis) {}
void MQClientAPIImpl::deleteTopicInNameServer(const std::string& addr, const std::string& topic, int timeoutMillis) {}
void MQClientAPIImpl::deleteSubscriptionGroup(const std::string& addr,
const std::string& groupName,
int timeoutMillis) {}
std::string MQClientAPIImpl::getKVConfigByValue(const std::string& projectNamespace,
const std::string& projectGroup,
int timeoutMillis) {
return "";
}
void MQClientAPIImpl::deleteKVConfigByValue(const std::string& projectNamespace,
const std::string& projectGroup,
int timeoutMillis) {}
KVTable MQClientAPIImpl::getKVListByNamespace(const std::string& projectNamespace, int timeoutMillis) {
return KVTable();
}
} // namespace rocketmq