| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| #include "MQDecoder.h" | |
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include <string.h> | |
| #include <sstream> | |
| #include "Logging.h" | |
| #include "MemoryOutputStream.h" | |
| #include "MessageSysFlag.h" | |
| #include "UtilAll.h" | |
| namespace rocketmq { | |
| //<!*************************************************************************** | |
| const int MQDecoder::MSG_ID_LENGTH = 8 + 8; | |
| const char MQDecoder::NAME_VALUE_SEPARATOR = 1; | |
| const char MQDecoder::PROPERTY_SEPARATOR = 2; | |
| int MQDecoder::MessageMagicCodePostion = 4; | |
| int MQDecoder::MessageFlagPostion = 16; | |
| int MQDecoder::MessagePhysicOffsetPostion = 28; | |
| int MQDecoder::MessageStoreTimestampPostion = 56; | |
| //<!*************************************************************************** | |
| string MQDecoder::createMessageId(sockaddr addr, int64 offset) { | |
| int host, port; | |
| socketAddress2IPPort(addr, host, port); | |
| MemoryOutputStream outputmen(MSG_ID_LENGTH); | |
| outputmen.writeIntBigEndian(host); | |
| outputmen.writeIntBigEndian(port); | |
| outputmen.writeInt64BigEndian(offset); | |
| const char* bytes = static_cast<const char*>(outputmen.getData()); | |
| int len = outputmen.getDataSize(); | |
| return UtilAll::bytes2string(bytes, len); | |
| } | |
| MQMessageId MQDecoder::decodeMessageId(const string& msgId) { | |
| string ipStr = msgId.substr(0, 8); | |
| string portStr = msgId.substr(8, 8); | |
| string offsetStr = msgId.substr(16, 16); | |
| int num = strspn(offsetStr.c_str(), "F"); | |
| offsetStr = offsetStr.substr(num, 16 - num); | |
| char* end; | |
| int ipInt = strtoul(ipStr.c_str(), &end, 16); | |
| int portInt = strtoul(portStr.c_str(), &end, 16); | |
| uint64 offset = strtoul(offsetStr.c_str(), &end, 16); | |
| // int64 offset = UtilAll::hexstr2ull(offsetStr.c_str()); | |
| struct sockaddr_in sa; | |
| sa.sin_family = AF_INET; | |
| sa.sin_port = htons(portInt); | |
| sa.sin_addr.s_addr = htonl(ipInt); | |
| sockaddr addr; | |
| memcpy(&addr, &sa, sizeof(sockaddr)); | |
| MQMessageId id(addr, offset); | |
| return id; | |
| } | |
| MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer) { | |
| return decode(byteBuffer, true); | |
| } | |
| MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { | |
| MQMessageExt* msgExt = new MQMessageExt(); | |
| // 1 TOTALSIZE | |
| int storeSize = byteBuffer.readIntBigEndian(); | |
| msgExt->setStoreSize(storeSize); | |
| // 2 MAGICCODE sizeof(int) | |
| byteBuffer.skipNextBytes(sizeof(int)); | |
| // 3 BODYCRC | |
| int bodyCRC = byteBuffer.readIntBigEndian(); | |
| msgExt->setBodyCRC(bodyCRC); | |
| // 4 QUEUEID | |
| int queueId = byteBuffer.readIntBigEndian(); | |
| msgExt->setQueueId(queueId); | |
| // 5 FLAG | |
| int flag = byteBuffer.readIntBigEndian(); | |
| msgExt->setFlag(flag); | |
| // 6 QUEUEOFFSET | |
| int64 queueOffset = byteBuffer.readInt64BigEndian(); | |
| msgExt->setQueueOffset(queueOffset); | |
| // 7 PHYSICALOFFSET | |
| int64 physicOffset = byteBuffer.readInt64BigEndian(); | |
| msgExt->setCommitLogOffset(physicOffset); | |
| // 8 SYSFLAG | |
| int sysFlag = byteBuffer.readIntBigEndian(); | |
| msgExt->setSysFlag(sysFlag); | |
| // 9 BORNTIMESTAMP | |
| int64 bornTimeStamp = byteBuffer.readInt64BigEndian(); | |
| msgExt->setBornTimestamp(bornTimeStamp); | |
| // 10 BORNHOST | |
| int bornHost = byteBuffer.readIntBigEndian(); | |
| int port = byteBuffer.readIntBigEndian(); | |
| sockaddr bornAddr = IPPort2socketAddress(bornHost, port); | |
| msgExt->setBornHost(bornAddr); | |
| // 11 STORETIMESTAMP | |
| int64 storeTimestamp = byteBuffer.readInt64BigEndian(); | |
| msgExt->setStoreTimestamp(storeTimestamp); | |
| // // 12 STOREHOST | |
| int storeHost = byteBuffer.readIntBigEndian(); | |
| port = byteBuffer.readIntBigEndian(); | |
| sockaddr storeAddr = IPPort2socketAddress(storeHost, port); | |
| msgExt->setStoreHost(storeAddr); | |
| // 13 RECONSUMETIMES | |
| int reconsumeTimes = byteBuffer.readIntBigEndian(); | |
| msgExt->setReconsumeTimes(reconsumeTimes); | |
| // 14 Prepared Transaction Offset | |
| int64 preparedTransactionOffset = byteBuffer.readInt64BigEndian(); | |
| msgExt->setPreparedTransactionOffset(preparedTransactionOffset); | |
| // 15 BODY | |
| int bodyLen = byteBuffer.readIntBigEndian(); | |
| if (bodyLen > 0) { | |
| if (readBody) { | |
| MemoryBlock block; | |
| byteBuffer.readIntoMemoryBlock(block, bodyLen); | |
| const char* const pBody = static_cast<const char*>(block.getData()); | |
| int len = block.getSize(); | |
| string msgbody(pBody, len); | |
| // decompress body | |
| if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) { | |
| string outbody; | |
| if (UtilAll::inflate(msgbody, outbody)) { | |
| msgExt->setBody(outbody); | |
| } | |
| } else { | |
| msgExt->setBody(msgbody); | |
| } | |
| } else { | |
| byteBuffer.skipNextBytes(bodyLen); | |
| } | |
| } | |
| // 16 TOPIC | |
| int topicLen = (int)byteBuffer.readByte(); | |
| MemoryBlock block; | |
| byteBuffer.readIntoMemoryBlock(block, topicLen); | |
| const char* const pTopic = static_cast<const char*>(block.getData()); | |
| topicLen = block.getSize(); | |
| msgExt->setTopic(pTopic, topicLen); | |
| // 17 properties | |
| short propertiesLen = byteBuffer.readShortBigEndian(); | |
| if (propertiesLen > 0) { | |
| MemoryBlock block; | |
| byteBuffer.readIntoMemoryBlock(block, propertiesLen); | |
| const char* const pProperty = static_cast<const char*>(block.getData()); | |
| int len = block.getSize(); | |
| string propertiesString(pProperty, len); | |
| map<string, string> propertiesMap; | |
| string2messageProperties(propertiesString, propertiesMap); | |
| msgExt->setPropertiesInternal(propertiesMap); | |
| propertiesMap.clear(); | |
| } | |
| // 18 msg ID | |
| string offsetMsgId = createMessageId(msgExt->getStoreHost(), (int64)msgExt->getCommitLogOffset()); | |
| msgExt->setOffsetMsgId(offsetMsgId); | |
| string msgId = msgExt->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | |
| if (msgId.empty()) { | |
| msgId = offsetMsgId; | |
| } | |
| msgExt->setMsgId(msgId); | |
| // LOG_INFO("get msgExt from remote server, its contents are:%s", msgExt->toString().c_str()); | |
| return msgExt; | |
| } | |
| void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec) { | |
| mqvec.clear(); | |
| decodes(mem, mqvec, true); | |
| } | |
| void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec, bool readBody) { | |
| MemoryInputStream rawInput(*mem, true); | |
| while (rawInput.getNumBytesRemaining() > 0) { | |
| unique_ptr<MQMessageExt> msg(decode(rawInput, readBody)); | |
| mqvec.push_back(*msg); | |
| } | |
| } | |
| string MQDecoder::messageProperties2String(const map<string, string>& properties) { | |
| string os; | |
| for (const auto& it : properties) { | |
| // os << it->first << NAME_VALUE_SEPARATOR << it->second << PROPERTY_SEPARATOR; | |
| os.append(it.first); | |
| os += NAME_VALUE_SEPARATOR; | |
| os.append(it.second); | |
| os += PROPERTY_SEPARATOR; | |
| } | |
| return os; | |
| } | |
| void MQDecoder::string2messageProperties(const string& propertiesString, map<string, string>& properties) { | |
| vector<string> out; | |
| UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR); | |
| for (size_t i = 0; i < out.size(); i++) { | |
| vector<string> outValue; | |
| UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR); | |
| if (outValue.size() == 2) { | |
| properties[outValue[0]] = outValue[1]; | |
| } | |
| } | |
| } | |
| } // namespace rocketmq |