| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "RemotingCommand.h" |
| #include "Logging.h" |
| #include "MQProtos.h" |
| #include "MQVersion.h" |
| #include "SessionCredentials.h" |
| |
| namespace rocketmq { |
| |
| boost::atomic<int> RemotingCommand::s_seqNumber; |
| |
| //<!************************************************************************ |
| RemotingCommand::RemotingCommand(int code, CommandHeader* pExtHeader /* = NULL */) |
| : m_code(code), |
| m_language("CPP"), |
| m_version(MQVersion::s_CurrentVersion), |
| m_flag(0), |
| m_remark(""), |
| m_pExtHeader(pExtHeader) { |
| // mask sign bit |
| m_opaque = s_seqNumber.fetch_add(1, boost::memory_order_relaxed) & numeric_limits<int>::max(); |
| } |
| |
| RemotingCommand::RemotingCommand(int code, |
| string language, |
| int version, |
| int opaque, |
| int flag, |
| string remark, |
| CommandHeader* pExtHeader) |
| : m_code(code), |
| m_language(language), |
| m_version(version), |
| m_opaque(opaque), |
| m_flag(flag), |
| m_remark(remark), |
| m_pExtHeader(pExtHeader) {} |
| |
| RemotingCommand::RemotingCommand(const RemotingCommand& command) { |
| Assign(command); |
| } |
| |
| RemotingCommand& RemotingCommand::operator=(const RemotingCommand& command) { |
| if (this != &command) { |
| Assign(command); |
| } |
| return *this; |
| } |
| |
| RemotingCommand::~RemotingCommand() { |
| m_pExtHeader = NULL; |
| } |
| |
| void RemotingCommand::Assign(const RemotingCommand& command) { |
| m_code = command.m_code; |
| m_language = command.m_language; |
| m_version = command.m_version; |
| m_opaque = command.m_opaque; |
| m_flag = command.m_flag; |
| m_remark = command.m_remark; |
| m_msgBody = command.m_msgBody; |
| |
| for (auto& it : command.m_extFields) { |
| m_extFields[it.first] = it.second; |
| } |
| |
| m_head = command.m_head; |
| m_body = command.m_body; |
| m_parsedJson = command.m_parsedJson; |
| // m_pExtHeader = command.m_pExtHeader; //ignore this filed at this moment, if need please add it |
| } |
| |
| void RemotingCommand::Encode() { |
| Json::Value root; |
| root["code"] = m_code; |
| root["language"] = "CPP"; |
| root["version"] = m_version; |
| root["opaque"] = m_opaque; |
| root["flag"] = m_flag; |
| root["remark"] = m_remark; |
| |
| if (m_pExtHeader) { |
| Json::Value extJson; |
| m_pExtHeader->Encode(extJson); |
| |
| extJson[SessionCredentials::Signature] = m_extFields[SessionCredentials::Signature]; |
| extJson[SessionCredentials::AccessKey] = m_extFields[SessionCredentials::AccessKey]; |
| extJson[SessionCredentials::ONSChannelKey] = m_extFields[SessionCredentials::ONSChannelKey]; |
| |
| root["extFields"] = extJson; |
| } else { // for heartbeat |
| Json::Value extJson; |
| extJson[SessionCredentials::Signature] = m_extFields[SessionCredentials::Signature]; |
| extJson[SessionCredentials::AccessKey] = m_extFields[SessionCredentials::AccessKey]; |
| extJson[SessionCredentials::ONSChannelKey] = m_extFields[SessionCredentials::ONSChannelKey]; |
| root["extFields"] = extJson; |
| } |
| |
| Json::FastWriter fastwrite; |
| string data = fastwrite.write(root); |
| |
| uint32 headLen = data.size(); |
| uint32 totalLen = 4 + headLen + m_body.getSize(); |
| |
| uint32 messageHeader[2]; |
| messageHeader[0] = htonl(totalLen); |
| messageHeader[1] = htonl(headLen); |
| |
| //<!include self 4 bytes, see : doc/protocol.txt; |
| m_head.setSize(4 + 4 + headLen); |
| m_head.copyFrom(messageHeader, 0, sizeof(messageHeader)); |
| m_head.copyFrom(data.c_str(), sizeof(messageHeader), headLen); |
| } |
| |
| const MemoryBlock* RemotingCommand::GetHead() const { |
| return &m_head; |
| } |
| |
| const MemoryBlock* RemotingCommand::GetBody() const { |
| return &m_body; |
| } |
| |
| void RemotingCommand::SetBody(const char* pData, int len) { |
| m_body.reset(); |
| m_body.setSize(len); |
| m_body.copyFrom(pData, 0, len); |
| } |
| |
| RemotingCommand* RemotingCommand::Decode(const MemoryBlock& mem) { |
| //<!decode 1 bytes,4+head+body |
| uint32 messageHeader[1]; |
| mem.copyTo(messageHeader, 0, sizeof(messageHeader)); |
| int totalLen = mem.getSize(); |
| int headLen = ntohl(messageHeader[0]); |
| int bodyLen = totalLen - 4 - headLen; |
| |
| //<!decode header; |
| const char* const pData = static_cast<const char*>(mem.getData()); |
| Json::Reader reader; |
| Json::Value object; |
| const char* begin = pData + 4; |
| const char* end = pData + 4 + headLen; |
| |
| if (!reader.parse(begin, end, object)) { |
| THROW_MQEXCEPTION(MQClientException, "conn't parse json", -1); |
| } |
| |
| int code = object["code"].asInt(); |
| |
| string language = object["language"].asString(); |
| int version = object["version"].asInt(); |
| int opaque = object["opaque"].asInt(); |
| int flag = object["flag"].asInt(); |
| Json::Value v = object["remark"]; |
| string remark = ""; |
| if (!v.isNull()) { |
| remark = object["remark"].asString(); |
| } |
| LOG_DEBUG( |
| "code:%d, remark:%s, version:%d, opaque:%d, flag:%d, remark:%s, " |
| "headLen:%d, bodyLen:%d ", |
| code, language.c_str(), version, opaque, flag, remark.c_str(), headLen, bodyLen); |
| RemotingCommand* cmd = new RemotingCommand(code, language, version, opaque, flag, remark, NULL); |
| cmd->setParsedJson(object); |
| if (bodyLen > 0) { |
| cmd->SetBody(pData + 4 + headLen, bodyLen); |
| } |
| return cmd; |
| } |
| |
| void RemotingCommand::markResponseType() { |
| int bits = 1 << RPC_TYPE; |
| m_flag |= bits; |
| } |
| |
| bool RemotingCommand::isResponseType() { |
| int bits = 1 << RPC_TYPE; |
| return (m_flag & bits) == bits; |
| } |
| |
| void RemotingCommand::markOnewayRPC() { |
| int bits = 1 << RPC_ONEWAY; |
| m_flag |= bits; |
| } |
| |
| bool RemotingCommand::isOnewayRPC() { |
| int bits = 1 << RPC_ONEWAY; |
| return (m_flag & bits) == bits; |
| } |
| |
| void RemotingCommand::setOpaque(const int opa) { |
| m_opaque = opa; |
| } |
| |
| void RemotingCommand::SetExtHeader(int code) { |
| try { |
| Json::Value ext = m_parsedJson["extFields"]; |
| if (!ext.isNull()) { |
| m_pExtHeader = NULL; |
| switch (code) { |
| case SEND_MESSAGE: |
| m_pExtHeader.reset(SendMessageResponseHeader::Decode(ext)); |
| break; |
| case PULL_MESSAGE: |
| m_pExtHeader.reset(PullMessageResponseHeader::Decode(ext)); |
| break; |
| case GET_MIN_OFFSET: |
| m_pExtHeader.reset(GetMinOffsetResponseHeader::Decode(ext)); |
| break; |
| case GET_MAX_OFFSET: |
| m_pExtHeader.reset(GetMaxOffsetResponseHeader::Decode(ext)); |
| break; |
| case SEARCH_OFFSET_BY_TIMESTAMP: |
| m_pExtHeader.reset(SearchOffsetResponseHeader::Decode(ext)); |
| break; |
| case GET_EARLIEST_MSG_STORETIME: |
| m_pExtHeader.reset(GetEarliestMsgStoretimeResponseHeader::Decode(ext)); |
| break; |
| case QUERY_CONSUMER_OFFSET: |
| m_pExtHeader.reset(QueryConsumerOffsetResponseHeader::Decode(ext)); |
| break; |
| case RESET_CONSUMER_CLIENT_OFFSET: |
| m_pExtHeader.reset(ResetOffsetRequestHeader::Decode(ext)); |
| break; |
| case GET_CONSUMER_RUNNING_INFO: |
| m_pExtHeader.reset(GetConsumerRunningInfoRequestHeader::Decode(ext)); |
| break; |
| case NOTIFY_CONSUMER_IDS_CHANGED: |
| m_pExtHeader.reset(NotifyConsumerIdsChangedRequestHeader::Decode(ext)); |
| default: |
| break; |
| } |
| } |
| } catch (MQException& e) { |
| LOG_ERROR("set response head error"); |
| } |
| } |
| |
| void RemotingCommand::setCode(int code) { |
| m_code = code; |
| } |
| |
| int RemotingCommand::getCode() const { |
| return m_code; |
| } |
| |
| int RemotingCommand::getOpaque() const { |
| return m_opaque; |
| } |
| |
| string RemotingCommand::getRemark() const { |
| return m_remark; |
| } |
| |
| void RemotingCommand::setRemark(string mark) { |
| m_remark = mark; |
| } |
| |
| CommandHeader* RemotingCommand::getCommandHeader() const { |
| return m_pExtHeader.get(); |
| } |
| |
| void RemotingCommand::setParsedJson(Json::Value json) { |
| m_parsedJson = json; |
| } |
| |
| const int RemotingCommand::getFlag() const { |
| return m_flag; |
| } |
| |
| const int RemotingCommand::getVersion() const { |
| return m_version; |
| } |
| |
| void RemotingCommand::setMsgBody(const string& body) { |
| m_msgBody = body; |
| } |
| |
| string RemotingCommand::getMsgBody() const { |
| return m_msgBody; |
| } |
| |
| void RemotingCommand::addExtField(const string& key, const string& value) { |
| m_extFields[key] = value; |
| } |
| |
| std::string RemotingCommand::ToString() const { |
| std::stringstream ss; |
| ss << "code:" << m_code << ",opaque:" << m_opaque << ",flag:" << m_flag << ",body.size:" << m_body.getSize() |
| << ",header.size:" << m_head.getSize(); |
| return ss.str(); |
| } |
| |
| } //<!end namespace; |