blob: cb7bd1d47639b9c794d714af80209fd71ec68ca1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RemotingCommand.h"
#include "Logging.h"
#include "MQProtos.h"
#include "MQVersion.h"
#include "SessionCredentials.h"
namespace rocketmq {
boost::atomic<int> RemotingCommand::s_seqNumber;
//<!************************************************************************
RemotingCommand::RemotingCommand(int code, CommandHeader* pExtHeader /* = NULL */)
: m_code(code),
m_language(MQVersion::s_CurrentLanguage),
m_version(MQVersion::s_CurrentVersion),
m_flag(0),
m_remark(""),
m_pExtHeader(pExtHeader) {
// mask sign bit
m_opaque = s_seqNumber.fetch_add(1, boost::memory_order_relaxed) & numeric_limits<int>::max();
}
RemotingCommand::RemotingCommand(int code,
string language,
int version,
int opaque,
int flag,
string remark,
CommandHeader* pExtHeader)
: m_code(code),
m_language(language),
m_version(version),
m_opaque(opaque),
m_flag(flag),
m_remark(remark),
m_pExtHeader(pExtHeader) {}
RemotingCommand::RemotingCommand(const RemotingCommand& command) {
Assign(command);
}
RemotingCommand& RemotingCommand::operator=(const RemotingCommand& command) {
if (this != &command) {
Assign(command);
}
return *this;
}
RemotingCommand::~RemotingCommand() {
m_pExtHeader = NULL;
}
void RemotingCommand::Assign(const RemotingCommand& command) {
m_code = command.m_code;
m_language = command.m_language;
m_version = command.m_version;
m_opaque = command.m_opaque;
m_flag = command.m_flag;
m_remark = command.m_remark;
m_msgBody = command.m_msgBody;
for (auto& it : command.m_extFields) {
m_extFields[it.first] = it.second;
}
m_head = command.m_head;
m_body = command.m_body;
m_parsedJson = command.m_parsedJson;
// m_pExtHeader = command.m_pExtHeader; //ignore this filed at this moment, if need please add it
}
void RemotingCommand::Encode() {
Json::Value root;
root["code"] = m_code;
root["language"] = m_language;
root["version"] = m_version;
root["opaque"] = m_opaque;
root["flag"] = m_flag;
root["remark"] = m_remark;
if (m_pExtHeader) {
Json::Value extJson;
m_pExtHeader->Encode(extJson);
extJson[SessionCredentials::Signature] = m_extFields[SessionCredentials::Signature];
extJson[SessionCredentials::AccessKey] = m_extFields[SessionCredentials::AccessKey];
extJson[SessionCredentials::ONSChannelKey] = m_extFields[SessionCredentials::ONSChannelKey];
root["extFields"] = extJson;
} else { // for heartbeat
Json::Value extJson;
extJson[SessionCredentials::Signature] = m_extFields[SessionCredentials::Signature];
extJson[SessionCredentials::AccessKey] = m_extFields[SessionCredentials::AccessKey];
extJson[SessionCredentials::ONSChannelKey] = m_extFields[SessionCredentials::ONSChannelKey];
root["extFields"] = extJson;
}
Json::FastWriter fastwrite;
string data = fastwrite.write(root);
uint32 headLen = data.size();
uint32 totalLen = 4 + headLen + m_body.getSize();
uint32 messageHeader[2];
messageHeader[0] = htonl(totalLen);
messageHeader[1] = htonl(headLen);
//<!include self 4 bytes, see : doc/protocol.txt;
m_head.setSize(4 + 4 + headLen);
m_head.copyFrom(messageHeader, 0, sizeof(messageHeader));
m_head.copyFrom(data.c_str(), sizeof(messageHeader), headLen);
}
const MemoryBlock* RemotingCommand::GetHead() const {
return &m_head;
}
const MemoryBlock* RemotingCommand::GetBody() const {
return &m_body;
}
void RemotingCommand::SetBody(const char* pData, int len) {
m_body.reset();
m_body.setSize(len);
m_body.copyFrom(pData, 0, len);
}
RemotingCommand* RemotingCommand::Decode(const MemoryBlock& mem) {
//<!decode 1 bytes,4+head+body
uint32 messageHeader[1];
mem.copyTo(messageHeader, 0, sizeof(messageHeader));
int totalLen = mem.getSize();
int headLen = ntohl(messageHeader[0]);
int bodyLen = totalLen - 4 - headLen;
//<!decode header;
const char* const pData = static_cast<const char*>(mem.getData());
Json::Reader reader;
Json::Value object;
const char* begin = pData + 4;
const char* end = pData + 4 + headLen;
if (!reader.parse(begin, end, object)) {
THROW_MQEXCEPTION(MQClientException, "conn't parse json", -1);
}
int code = object["code"].asInt();
string language = object["language"].asString();
int version = object["version"].asInt();
int opaque = object["opaque"].asInt();
int flag = object["flag"].asInt();
Json::Value v = object["remark"];
string remark = "";
if (!v.isNull()) {
remark = object["remark"].asString();
}
LOG_DEBUG(
"code:%d, language:%s, version:%d, opaque:%d, flag:%d, remark:%s, "
"headLen:%d, bodyLen:%d ",
code, language.c_str(), version, opaque, flag, remark.c_str(), headLen, bodyLen);
RemotingCommand* cmd = new RemotingCommand(code, language, version, opaque, flag, remark, NULL);
cmd->setParsedJson(object);
if (bodyLen > 0) {
cmd->SetBody(pData + 4 + headLen, bodyLen);
}
return cmd;
}
void RemotingCommand::markResponseType() {
int bits = 1 << RPC_TYPE;
m_flag |= bits;
}
bool RemotingCommand::isResponseType() {
int bits = 1 << RPC_TYPE;
return (m_flag & bits) == bits;
}
void RemotingCommand::markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
m_flag |= bits;
}
bool RemotingCommand::isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (m_flag & bits) == bits;
}
void RemotingCommand::setOpaque(const int opa) {
m_opaque = opa;
}
void RemotingCommand::SetExtHeader(int code) {
try {
Json::Value ext = m_parsedJson["extFields"];
if (!ext.isNull()) {
m_pExtHeader = NULL;
switch (code) {
case SEND_MESSAGE:
case SEND_MESSAGE_V2:
m_pExtHeader.reset(SendMessageResponseHeader::Decode(ext));
break;
case PULL_MESSAGE:
m_pExtHeader.reset(PullMessageResponseHeader::Decode(ext));
break;
case GET_MIN_OFFSET:
m_pExtHeader.reset(GetMinOffsetResponseHeader::Decode(ext));
break;
case GET_MAX_OFFSET:
m_pExtHeader.reset(GetMaxOffsetResponseHeader::Decode(ext));
break;
case SEARCH_OFFSET_BY_TIMESTAMP:
m_pExtHeader.reset(SearchOffsetResponseHeader::Decode(ext));
break;
case GET_EARLIEST_MSG_STORETIME:
m_pExtHeader.reset(GetEarliestMsgStoretimeResponseHeader::Decode(ext));
break;
case QUERY_CONSUMER_OFFSET:
m_pExtHeader.reset(QueryConsumerOffsetResponseHeader::Decode(ext));
break;
case RESET_CONSUMER_CLIENT_OFFSET:
m_pExtHeader.reset(ResetOffsetRequestHeader::Decode(ext));
break;
case GET_CONSUMER_RUNNING_INFO:
m_pExtHeader.reset(GetConsumerRunningInfoRequestHeader::Decode(ext));
break;
case NOTIFY_CONSUMER_IDS_CHANGED:
m_pExtHeader.reset(NotifyConsumerIdsChangedRequestHeader::Decode(ext));
break;
case CHECK_TRANSACTION_STATE:
m_pExtHeader.reset(CheckTransactionStateRequestHeader::Decode(ext));
break;
default:
break;
}
}
} catch (MQException& e) {
LOG_ERROR("set response head error");
}
}
void RemotingCommand::setCode(int code) {
m_code = code;
}
int RemotingCommand::getCode() const {
return m_code;
}
int RemotingCommand::getOpaque() const {
return m_opaque;
}
string RemotingCommand::getRemark() const {
return m_remark;
}
void RemotingCommand::setRemark(string mark) {
m_remark = mark;
}
CommandHeader* RemotingCommand::getCommandHeader() const {
return m_pExtHeader.get();
}
void RemotingCommand::setParsedJson(Json::Value json) {
m_parsedJson = json;
}
const int RemotingCommand::getFlag() const {
return m_flag;
}
const int RemotingCommand::getVersion() const {
return m_version;
}
void RemotingCommand::setMsgBody(const string& body) {
m_msgBody = body;
}
string RemotingCommand::getMsgBody() const {
return m_msgBody;
}
void RemotingCommand::addExtField(const string& key, const string& value) {
m_extFields[key] = value;
}
std::string RemotingCommand::ToString() const {
std::stringstream ss;
ss << "code:" << m_code << ",opaque:" << m_opaque << ",flag:" << m_flag << ",body.size:" << m_body.getSize()
<< ",header.size:" << m_head.getSize();
return ss.str();
}
} // namespace rocketmq