/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
#include "MQDecoder.h" | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sstream> | |
#include "Logging.h" | |
#include "MemoryOutputStream.h" | |
#include "MessageSysFlag.h" | |
#include "UtilAll.h" | |
namespace rocketmq { | |
//<!*************************************************************************** | |
const int MQDecoder::MSG_ID_LENGTH = 8 + 8; | |
const char MQDecoder::NAME_VALUE_SEPARATOR = 1; | |
const char MQDecoder::PROPERTY_SEPARATOR = 2; | |
int MQDecoder::MessageMagicCodePostion = 4; | |
int MQDecoder::MessageFlagPostion = 16; | |
int MQDecoder::MessagePhysicOffsetPostion = 28; | |
int MQDecoder::MessageStoreTimestampPostion = 56; | |
//<!*************************************************************************** | |
string MQDecoder::createMessageId(sockaddr addr, int64 offset) { | |
struct sockaddr_in* sa = (struct sockaddr_in*)&addr; | |
MemoryOutputStream outputmen(MSG_ID_LENGTH); | |
outputmen.writeIntBigEndian(sa->sin_addr.s_addr); | |
outputmen.writeRepeatedByte(0, 2); | |
outputmen.write(&(sa->sin_port), 2); | |
outputmen.writeInt64BigEndian(offset); | |
const char* bytes = static_cast<const char*>(outputmen.getData()); | |
int len = outputmen.getDataSize(); | |
return UtilAll::bytes2string(bytes, len); | |
} | |
MQMessageId MQDecoder::decodeMessageId(const string& msgId) { | |
string ipStr = msgId.substr(0, 8); | |
string portStr = msgId.substr(8, 8); | |
string offsetStr = msgId.substr(16); | |
char* end; | |
int ipInt = strtoul(ipStr.c_str(), &end, 16); | |
int portInt = strtoul(portStr.c_str(), &end, 16); | |
int64 offset = UtilAll::hexstr2ull(offsetStr.c_str()); | |
struct sockaddr_in sa; | |
sa.sin_family = AF_INET; | |
sa.sin_port = htons(portInt); | |
sa.sin_addr.s_addr = htonl(ipInt); | |
MQMessageId id(*((sockaddr*)&sa), offset); | |
return id; | |
} | |
MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer) { | |
return decode(byteBuffer, true); | |
} | |
MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { | |
MQMessageExt* msgExt = new MQMessageExt(); | |
// 1 TOTALSIZE | |
int storeSize = byteBuffer.readIntBigEndian(); | |
msgExt->setStoreSize(storeSize); | |
// 2 MAGICCODE sizeof(int) | |
byteBuffer.skipNextBytes(sizeof(int)); | |
// 3 BODYCRC | |
int bodyCRC = byteBuffer.readIntBigEndian(); | |
msgExt->setBodyCRC(bodyCRC); | |
// 4 QUEUEID | |
int queueId = byteBuffer.readIntBigEndian(); | |
msgExt->setQueueId(queueId); | |
// 5 FLAG | |
int flag = byteBuffer.readIntBigEndian(); | |
msgExt->setFlag(flag); | |
// 6 QUEUEOFFSET | |
int64 queueOffset = byteBuffer.readInt64BigEndian(); | |
msgExt->setQueueOffset(queueOffset); | |
// 7 PHYSICALOFFSET | |
int64 physicOffset = byteBuffer.readInt64BigEndian(); | |
msgExt->setCommitLogOffset(physicOffset); | |
// 8 SYSFLAG | |
int sysFlag = byteBuffer.readIntBigEndian(); | |
msgExt->setSysFlag(sysFlag); | |
// 9 BORNTIMESTAMP | |
int64 bornTimeStamp = byteBuffer.readInt64BigEndian(); | |
msgExt->setBornTimestamp(bornTimeStamp); | |
// 10 BORNHOST | |
int bornHost = byteBuffer.readIntBigEndian(); | |
int port = byteBuffer.readIntBigEndian(); | |
sockaddr bornAddr = IPPort2socketAddress(bornHost, port); | |
msgExt->setBornHost(bornAddr); | |
// 11 STORETIMESTAMP | |
int64 storeTimestamp = byteBuffer.readInt64BigEndian(); | |
msgExt->setStoreTimestamp(storeTimestamp); | |
// // 12 STOREHOST | |
int storeHost = byteBuffer.readIntBigEndian(); | |
port = byteBuffer.readIntBigEndian(); | |
sockaddr storeAddr = IPPort2socketAddress(storeHost, port); | |
msgExt->setStoreHost(storeAddr); | |
// 13 RECONSUMETIMES | |
int reconsumeTimes = byteBuffer.readIntBigEndian(); | |
msgExt->setReconsumeTimes(reconsumeTimes); | |
// 14 Prepared Transaction Offset | |
int64 preparedTransactionOffset = byteBuffer.readInt64BigEndian(); | |
msgExt->setPreparedTransactionOffset(preparedTransactionOffset); | |
// 15 BODY | |
int bodyLen = byteBuffer.readIntBigEndian(); | |
if (bodyLen > 0) { | |
if (readBody) { | |
MemoryBlock block; | |
byteBuffer.readIntoMemoryBlock(block, bodyLen); | |
const char* const pBody = static_cast<const char*>(block.getData()); | |
int len = block.getSize(); | |
string msgbody(pBody, len); | |
// decompress body | |
if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) { | |
string outbody; | |
if (UtilAll::inflate(msgbody, outbody)) { | |
msgExt->setBody(outbody); | |
} | |
} else { | |
msgExt->setBody(msgbody); | |
} | |
} else { | |
byteBuffer.skipNextBytes(bodyLen); | |
} | |
} | |
// 16 TOPIC | |
int topicLen = (int)byteBuffer.readByte(); | |
MemoryBlock block; | |
byteBuffer.readIntoMemoryBlock(block, topicLen); | |
const char* const pTopic = static_cast<const char*>(block.getData()); | |
topicLen = block.getSize(); | |
msgExt->setTopic(pTopic, topicLen); | |
// 17 properties | |
short propertiesLen = byteBuffer.readShortBigEndian(); | |
if (propertiesLen > 0) { | |
MemoryBlock block; | |
byteBuffer.readIntoMemoryBlock(block, propertiesLen); | |
const char* const pProperty = static_cast<const char*>(block.getData()); | |
int len = block.getSize(); | |
string propertiesString(pProperty, len); | |
map<string, string> propertiesMap; | |
string2messageProperties(propertiesString, propertiesMap); | |
msgExt->setPropertiesInternal(propertiesMap); | |
propertiesMap.clear(); | |
} | |
// 18 msg ID | |
string offsetMsgId = createMessageId(msgExt->getStoreHost(), (int64)msgExt->getCommitLogOffset()); | |
msgExt->setOffsetMsgId(offsetMsgId); | |
string msgId = msgExt->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | |
if (msgId.empty()) { | |
msgId = offsetMsgId; | |
} | |
msgExt->setMsgId(msgId); | |
// LOG_INFO("get msgExt from remote server, its contents are:%s", msgExt->toString().c_str()); | |
return msgExt; | |
} | |
void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec) { | |
mqvec.clear(); | |
decodes(mem, mqvec, true); | |
} | |
void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec, bool readBody) { | |
MemoryInputStream rawInput(*mem, true); | |
while (rawInput.getNumBytesRemaining() > 0) { | |
unique_ptr<MQMessageExt> msg(decode(rawInput, readBody)); | |
mqvec.push_back(*msg); | |
} | |
} | |
string MQDecoder::messageProperties2String(const map<string, string>& properties) { | |
string os; | |
for (const auto& it : properties) { | |
// os << it->first << NAME_VALUE_SEPARATOR << it->second << PROPERTY_SEPARATOR; | |
os.append(it.first); | |
os += NAME_VALUE_SEPARATOR; | |
os.append(it.second); | |
os += PROPERTY_SEPARATOR; | |
} | |
return os; | |
} | |
void MQDecoder::string2messageProperties(const string& propertiesString, map<string, string>& properties) { | |
vector<string> out; | |
UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR); | |
for (size_t i = 0; i < out.size(); i++) { | |
vector<string> outValue; | |
UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR); | |
if (outValue.size() == 2) { | |
properties[outValue[0]] = outValue[1]; | |
} | |
} | |
} | |
} //<!end namespace; |