[ISSUE-435][PART-A]Support receiving traceOn signal. (#434)
* add traceOn to sendRedult
* add getExtField to RemoteCommand; use unordered_map to store extField
* complete extFields
* complete traceOn
diff --git a/include/SendResult.h b/include/SendResult.h
index cfe83ce..94ba543 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -37,7 +37,8 @@
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset,
- const std::string& regionId);
+ const std::string& regionId,
+ const bool traceOn);
virtual ~SendResult();
SendResult(const SendResult& other);
@@ -55,6 +56,8 @@
SendStatus getSendStatus() const;
MQMessageQueue getMessageQueue() const;
int64 getQueueOffset() const;
+ bool getTraceOn() const;
+
std::string toString() const;
private:
@@ -65,6 +68,7 @@
int64 m_queueOffset;
std::string m_transactionId;
std::string m_regionId;
+ bool m_traceOn;
};
} // namespace rocketmq
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 19cf3f5..671f328 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -542,10 +542,12 @@
}
if (res == 0) {
SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandHeader();
+ auto extFields = pResponse->getExtFields();
+ bool traceOn = (extFields->count("TRACE_ON") && extFields->at("TRACE_ON") == "true");
MQMessageQueue messageQueue(msg.getTopic(), brokerName, responseHeader->queueId);
string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset,
- responseHeader->regionId);
+ responseHeader->regionId, traceOn);
}
LOG_ERROR("processSendResponse error remark:%s, error code:%d", (pResponse->getRemark()).c_str(),
pResponse->getCode());
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 3b56ff3..f6926d5 100644
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -39,13 +39,15 @@
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset,
- const string& regionId)
+ const string& regionId,
+ const bool traceOn)
: m_sendStatus(sendStatus),
m_msgId(msgId),
m_offsetMsgId(offsetMsgId),
m_messageQueue(messageQueue),
m_queueOffset(queueOffset),
- m_regionId(regionId) {}
+ m_regionId(regionId),
+ m_traceOn(traceOn) {}
SendResult::SendResult(const SendResult& other) {
m_sendStatus = other.m_sendStatus;
@@ -54,6 +56,7 @@
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
m_regionId = other.m_regionId;
+ m_traceOn = other.m_traceOn;
}
SendResult& SendResult::operator=(const SendResult& other) {
@@ -64,6 +67,7 @@
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
m_regionId = other.m_regionId;
+ m_traceOn = other.m_traceOn;
}
return *this;
}
@@ -96,6 +100,10 @@
return m_queueOffset;
}
+bool SendResult::getTraceOn() const {
+ return m_traceOn;
+}
+
std::string SendResult::toString() const {
stringstream ss;
ss << "SendResult: ";
@@ -105,6 +113,8 @@
ss << ",queueOffset:" << m_queueOffset;
ss << ",transactionId:" << m_transactionId;
ss << ",messageQueue:" << m_messageQueue.toString();
+ ss << ",regionId:" << m_regionId;
+ ss << ",traceOn:" << m_traceOn;
return ss.str();
}
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index 91d023e..bdde864 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -180,6 +180,12 @@
if (bodyLen > 0) {
cmd->SetBody(pData + 4 + headLen, bodyLen);
}
+ if (object.isMember("extFields")) {
+ Json::Value& extFields = object["extFields"];
+ for (auto& it : extFields.getMemberNames()) {
+ cmd->m_extFields[it] = extFields[it].asString();
+ }
+ }
return cmd;
}
@@ -304,6 +310,10 @@
m_extFields[key] = value;
}
+const unordered_map<string, string>* RemotingCommand::getExtFields() const{
+ return &m_extFields;
+}
+
std::string RemotingCommand::ToString() const {
std::stringstream ss;
ss << "code:" << m_code << ",opaque:" << m_opaque << ",flag:" << m_flag << ",body.size:" << m_body.getSize()
diff --git a/src/protocol/RemotingCommand.h b/src/protocol/RemotingCommand.h
index 1e039c8..b0525b7 100644
--- a/src/protocol/RemotingCommand.h
+++ b/src/protocol/RemotingCommand.h
@@ -21,6 +21,7 @@
#include <boost/thread/thread.hpp>
#include <memory>
#include <sstream>
+#include <unordered_map>
#include "CommandHeader.h"
#include "dataBlock.h"
@@ -62,6 +63,7 @@
const int getFlag() const;
const int getVersion() const;
void addExtField(const string& key, const string& value);
+ const unordered_map<string, string>* getExtFields() const;
string getMsgBody() const;
void setMsgBody(const string& body);
@@ -81,7 +83,7 @@
int m_flag;
string m_remark;
string m_msgBody;
- map<string, string> m_extFields;
+ unordered_map<string, string> m_extFields;
MemoryBlock m_head;
MemoryBlock m_body;