blob: 1ba8a951fab8f4110696dfe8ba569060c10f405e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MQDecoder.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sstream>
#include "Logging.h"
#include "MemoryOutputStream.h"
#include "MessageSysFlag.h"
#include "UtilAll.h"
namespace rocketmq {
//<!***************************************************************************
const int MQDecoder::MSG_ID_LENGTH = 8 + 8;
const char MQDecoder::NAME_VALUE_SEPARATOR = 1;
const char MQDecoder::PROPERTY_SEPARATOR = 2;
int MQDecoder::MessageMagicCodePostion = 4;
int MQDecoder::MessageFlagPostion = 16;
int MQDecoder::MessagePhysicOffsetPostion = 28;
int MQDecoder::MessageStoreTimestampPostion = 56;
//<!***************************************************************************
string MQDecoder::createMessageId(sockaddr addr, int64 offset) {
int host, port;
socketAddress2IPPort(addr, host, port);
MemoryOutputStream outputmen(MSG_ID_LENGTH);
outputmen.writeIntBigEndian(host);
outputmen.writeIntBigEndian(port);
outputmen.writeInt64BigEndian(offset);
const char* bytes = static_cast<const char*>(outputmen.getData());
int len = outputmen.getDataSize();
return UtilAll::bytes2string(bytes, len);
}
MQMessageId MQDecoder::decodeMessageId(const string& msgId) {
string ipStr = msgId.substr(0, 8);
string portStr = msgId.substr(8, 8);
string offsetStr = msgId.substr(16, 16);
int num = strspn(offsetStr.c_str(), "F");
offsetStr = offsetStr.substr(num, 16 - num);
char* end;
int ipInt = strtoul(ipStr.c_str(), &end, 16);
int portInt = strtoul(portStr.c_str(), &end, 16);
uint64 offset = strtoul(offsetStr.c_str(), &end, 16);
// int64 offset = UtilAll::hexstr2ull(offsetStr.c_str());
struct sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(portInt);
sa.sin_addr.s_addr = htonl(ipInt);
sockaddr addr;
memcpy(&addr, &sa, sizeof(sockaddr));
MQMessageId id(addr, offset);
return id;
}
MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer) {
return decode(byteBuffer, true);
}
MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) {
MQMessageExt* msgExt = new MQMessageExt();
// 1 TOTALSIZE
int storeSize = byteBuffer.readIntBigEndian();
msgExt->setStoreSize(storeSize);
// 2 MAGICCODE sizeof(int)
byteBuffer.skipNextBytes(sizeof(int));
// 3 BODYCRC
int bodyCRC = byteBuffer.readIntBigEndian();
msgExt->setBodyCRC(bodyCRC);
// 4 QUEUEID
int queueId = byteBuffer.readIntBigEndian();
msgExt->setQueueId(queueId);
// 5 FLAG
int flag = byteBuffer.readIntBigEndian();
msgExt->setFlag(flag);
// 6 QUEUEOFFSET
int64 queueOffset = byteBuffer.readInt64BigEndian();
msgExt->setQueueOffset(queueOffset);
// 7 PHYSICALOFFSET
int64 physicOffset = byteBuffer.readInt64BigEndian();
msgExt->setCommitLogOffset(physicOffset);
// 8 SYSFLAG
int sysFlag = byteBuffer.readIntBigEndian();
msgExt->setSysFlag(sysFlag);
// 9 BORNTIMESTAMP
int64 bornTimeStamp = byteBuffer.readInt64BigEndian();
msgExt->setBornTimestamp(bornTimeStamp);
// 10 BORNHOST
int bornHost = byteBuffer.readIntBigEndian();
int port = byteBuffer.readIntBigEndian();
sockaddr bornAddr = IPPort2socketAddress(bornHost, port);
msgExt->setBornHost(bornAddr);
// 11 STORETIMESTAMP
int64 storeTimestamp = byteBuffer.readInt64BigEndian();
msgExt->setStoreTimestamp(storeTimestamp);
// // 12 STOREHOST
int storeHost = byteBuffer.readIntBigEndian();
port = byteBuffer.readIntBigEndian();
sockaddr storeAddr = IPPort2socketAddress(storeHost, port);
msgExt->setStoreHost(storeAddr);
// 13 RECONSUMETIMES
int reconsumeTimes = byteBuffer.readIntBigEndian();
msgExt->setReconsumeTimes(reconsumeTimes);
// 14 Prepared Transaction Offset
int64 preparedTransactionOffset = byteBuffer.readInt64BigEndian();
msgExt->setPreparedTransactionOffset(preparedTransactionOffset);
// 15 BODY
int bodyLen = byteBuffer.readIntBigEndian();
if (bodyLen > 0) {
if (readBody) {
MemoryBlock block;
byteBuffer.readIntoMemoryBlock(block, bodyLen);
const char* const pBody = static_cast<const char*>(block.getData());
int len = block.getSize();
string msgbody(pBody, len);
// decompress body
if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) {
string outbody;
if (UtilAll::inflate(msgbody, outbody)) {
msgExt->setBody(outbody);
}
} else {
msgExt->setBody(msgbody);
}
} else {
byteBuffer.skipNextBytes(bodyLen);
}
}
// 16 TOPIC
int topicLen = (int)byteBuffer.readByte();
MemoryBlock block;
byteBuffer.readIntoMemoryBlock(block, topicLen);
const char* const pTopic = static_cast<const char*>(block.getData());
topicLen = block.getSize();
msgExt->setTopic(pTopic, topicLen);
// 17 properties
short propertiesLen = byteBuffer.readShortBigEndian();
if (propertiesLen > 0) {
MemoryBlock block;
byteBuffer.readIntoMemoryBlock(block, propertiesLen);
const char* const pProperty = static_cast<const char*>(block.getData());
int len = block.getSize();
string propertiesString(pProperty, len);
map<string, string> propertiesMap;
string2messageProperties(propertiesString, propertiesMap);
msgExt->setPropertiesInternal(propertiesMap);
propertiesMap.clear();
}
// 18 msg ID
string offsetMsgId = createMessageId(msgExt->getStoreHost(), (int64)msgExt->getCommitLogOffset());
msgExt->setOffsetMsgId(offsetMsgId);
string msgId = msgExt->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (msgId.empty()) {
msgId = offsetMsgId;
}
msgExt->setMsgId(msgId);
// LOG_INFO("get msgExt from remote server, its contents are:%s", msgExt->toString().c_str());
return msgExt;
}
void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec) {
mqvec.clear();
decodes(mem, mqvec, true);
}
void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec, bool readBody) {
MemoryInputStream rawInput(*mem, true);
while (rawInput.getNumBytesRemaining() > 0) {
unique_ptr<MQMessageExt> msg(decode(rawInput, readBody));
mqvec.push_back(*msg);
}
}
string MQDecoder::messageProperties2String(const map<string, string>& properties) {
string os;
for (const auto& it : properties) {
// os << it->first << NAME_VALUE_SEPARATOR << it->second << PROPERTY_SEPARATOR;
os.append(it.first);
os += NAME_VALUE_SEPARATOR;
os.append(it.second);
os += PROPERTY_SEPARATOR;
}
return os;
}
void MQDecoder::string2messageProperties(const string& propertiesString, map<string, string>& properties) {
vector<string> out;
UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR);
for (size_t i = 0; i < out.size(); i++) {
vector<string> outValue;
UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR);
if (outValue.size() == 2) {
properties[outValue[0]] = outValue[1];
}
}
}
} // namespace rocketmq