blob: 83fe68da77a457d3e26609f1248b48bc8c6e8500 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MQMessage.h"
#include "MessageSysFlag.h"
#include "UtilAll.h"
namespace rocketmq {
//<!***************************************************************************
const string MQMessage::PROPERTY_KEYS = "KEYS";
const string MQMessage::PROPERTY_TAGS = "TAGS";
const string MQMessage::PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
const string MQMessage::PROPERTY_DELAY_TIME_LEVEL = "DELAY";
const string MQMessage::PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
const string MQMessage::PROPERTY_REAL_TOPIC = "REAL_TOPIC";
const string MQMessage::PROPERTY_REAL_QUEUE_ID = "REAL_QID";
const string MQMessage::PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
const string MQMessage::PROPERTY_PRODUCER_GROUP = "PGROUP";
const string MQMessage::PROPERTY_MIN_OFFSET = "MIN_OFFSET";
const string MQMessage::PROPERTY_MAX_OFFSET = "MAX_OFFSET";
const string MQMessage::PROPERTY_BUYER_ID = "BUYER_ID";
const string MQMessage::PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
const string MQMessage::PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
const string MQMessage::PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
const string MQMessage::PROPERTY_MQ2_FLAG = "MQ2_FLAG";
const string MQMessage::PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
const string MQMessage::PROPERTY_MSG_REGION = "MSG_REGION";
const string MQMessage::PROPERTY_TRACE_SWITCH = "TRACE_ON";
const string MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
const string MQMessage::PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
const string MQMessage::PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
const string MQMessage::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";
const string MQMessage::PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
const string MQMessage::PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
const string MQMessage::KEY_SEPARATOR = " ";
static const string EMPTY_STRING = "";
//<!************************************************************************
MQMessage::MQMessage() {
Init("", "", "", 0, "", true);
}
MQMessage::MQMessage(const string& topic, const string& body) {
Init(topic, "", "", 0, body, true);
}
MQMessage::MQMessage(const string& topic, const string& tags, const string& body) {
Init(topic, tags, "", 0, body, true);
}
MQMessage::MQMessage(const string& topic, const string& tags, const string& keys, const string& body) {
Init(topic, tags, keys, 0, body, true);
}
MQMessage::MQMessage(const string& topic,
const string& tags,
const string& keys,
const int flag,
const string& body,
bool waitStoreMsgOK) {
Init(topic, tags, keys, flag, body, waitStoreMsgOK);
}
MQMessage::~MQMessage() {
m_properties.clear();
}
MQMessage::MQMessage(const MQMessage& other) {
m_body = other.m_body;
m_topic = other.m_topic;
m_flag = other.m_flag;
m_sysFlag = other.m_sysFlag;
m_properties = other.m_properties;
}
MQMessage& MQMessage::operator=(const MQMessage& other) {
if (this != &other) {
m_body = other.m_body;
m_topic = other.m_topic;
m_flag = other.m_flag;
m_sysFlag = other.m_sysFlag;
m_properties = other.m_properties;
}
return *this;
}
void MQMessage::setProperty(const string& name, const string& value) {
if (PROPERTY_TRANSACTION_PREPARED == name) {
if (!value.empty() && value == "true") {
m_sysFlag |= MessageSysFlag::TransactionPreparedType;
} else {
m_sysFlag &= ~MessageSysFlag::TransactionPreparedType;
}
}
m_properties[name] = value;
}
void MQMessage::setPropertyInternal(const string& name, const string& value) {
m_properties[name] = value;
}
const string& MQMessage::getProperty(const string& name) const {
map<string, string>::const_iterator it = m_properties.find(name);
if (it == m_properties.end()) {
return EMPTY_STRING;
} else {
return it->second;
}
}
const string& MQMessage::getTopic() const {
return m_topic;
}
void MQMessage::setTopic(const string& topic) {
m_topic = topic;
}
void MQMessage::setTopic(const char* body, int len) {
m_topic.clear();
m_topic.append(body, len);
}
const string& MQMessage::getTags() const {
return getProperty(PROPERTY_TAGS);
}
void MQMessage::setTags(const string& tags) {
setPropertyInternal(PROPERTY_TAGS, tags);
}
const string& MQMessage::getKeys() const {
return getProperty(PROPERTY_KEYS);
}
void MQMessage::setKeys(const string& keys) {
setPropertyInternal(PROPERTY_KEYS, keys);
}
void MQMessage::setKeys(const vector<string>& keys) {
if (keys.empty()) {
return;
}
vector<string>::const_iterator it = keys.begin();
string str;
str += *it;
it++;
for (; it != keys.end(); it++) {
str += KEY_SEPARATOR;
str += *it;
}
setKeys(str);
}
int MQMessage::getDelayTimeLevel() const {
string tmp = getProperty(PROPERTY_DELAY_TIME_LEVEL);
if (!tmp.empty()) {
return atoi(tmp.c_str());
}
return 0;
}
void MQMessage::setDelayTimeLevel(int level) {
char tmp[16];
sprintf(tmp, "%d", level);
setPropertyInternal(PROPERTY_DELAY_TIME_LEVEL, tmp);
}
bool MQMessage::isWaitStoreMsgOK() const {
string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK);
if (tmp.empty()) {
return true;
} else {
return (tmp == "true") ? true : false;
}
}
void MQMessage::setWaitStoreMsgOK(bool waitStoreMsgOK) {
if (waitStoreMsgOK) {
setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "true");
} else {
setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "false");
}
}
int MQMessage::getFlag() const {
return m_flag;
}
void MQMessage::setFlag(int flag) {
m_flag = flag;
}
int MQMessage::getSysFlag() const {
return m_sysFlag;
}
void MQMessage::setSysFlag(int sysFlag) {
m_sysFlag = sysFlag;
}
const string& MQMessage::getBody() const {
return m_body;
}
void MQMessage::setBody(const char* body, int len) {
m_body.clear();
m_body.append(body, len);
}
void MQMessage::setBody(const string& body) {
m_body.clear();
m_body.append(body);
}
map<string, string> MQMessage::getProperties() const {
return m_properties;
}
void MQMessage::setProperties(map<string, string>& properties) {
m_properties = properties;
map<string, string>::const_iterator it = m_properties.find(PROPERTY_TRANSACTION_PREPARED);
if (it != m_properties.end()) {
string tranMsg = it->second;
if (!tranMsg.empty() && tranMsg == "true") {
m_sysFlag |= MessageSysFlag::TransactionPreparedType;
} else {
m_sysFlag &= ~MessageSysFlag::TransactionPreparedType;
}
}
}
void MQMessage::setPropertiesInternal(map<string, string>& properties) {
m_properties = properties;
}
void MQMessage::Init(const string& topic,
const string& tags,
const string& keys,
const int flag,
const string& body,
bool waitStoreMsgOK) {
m_topic = topic;
m_flag = flag;
m_sysFlag = 0;
m_body = body;
if (tags.length() > 0) {
setTags(tags);
}
if (keys.length() > 0) {
setKeys(keys);
}
setWaitStoreMsgOK(waitStoreMsgOK);
}
} //<!end namespace;