blob: af67c8796ecc1a16f7906a32978089b8d2b664ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ROCKETMQ_REMOTINGCOMMAND_H_
#define ROCKETMQ_REMOTINGCOMMAND_H_
#include <exception> // std::exception
#include <map> // std::map
#include <memory> // std::unique_ptr, std::shared_ptr
#include <string> // std::string
#include <typeindex> // std::type_index
#include "ByteArray.h"
#include "CommandCustomHeader.h"
#include "MQException.h"
namespace rocketmq {
/**
* RemotingCommand - rocketmq rpc protocol
*/
class ROCKETMQCLIENT_API RemotingCommand {
public:
static int32_t createNewRequestId();
public:
RemotingCommand() : code_(0) {}
RemotingCommand(int32_t code, CommandCustomHeader* customHeader = nullptr);
RemotingCommand(int32_t code, const std::string& remark, CommandCustomHeader* customHeader = nullptr);
RemotingCommand(int32_t code,
const std::string& language,
int32_t version,
int32_t opaque,
int32_t flag,
const std::string& remark,
CommandCustomHeader* customHeader);
RemotingCommand(RemotingCommand&& command);
virtual ~RemotingCommand();
public:
bool isResponseType();
void markResponseType();
bool isOnewayRPC();
void markOnewayRPC();
CommandCustomHeader* readCustomHeader() const;
public:
ByteArrayRef encode() const;
template <class H>
H* decodeCommandCustomHeader(bool useCache = true);
static std::unique_ptr<RemotingCommand> Decode(ByteArrayRef array, bool hasPackageLength = false);
std::string toString() const;
public:
inline int32_t code() const { return code_; }
inline void set_code(int32_t code) { code_ = code; }
inline int32_t version() const { return version_; }
inline int32_t opaque() const { return opaque_; }
inline void set_opaque(int32_t opaque) { opaque_ = opaque; }
inline int32_t flag() const { return flag_; }
inline const std::string& remark() const { return remark_; }
inline void set_remark(const std::string& remark) { remark_ = remark; }
inline void set_ext_field(const std::string& name, const std::string& value) { ext_fields_[name] = value; }
inline ByteArrayRef body() const { return body_; }
inline void set_body(ByteArrayRef body) { body_ = std::move(body); }
inline void set_body(const std::string& body) { body_ = stoba(body); }
inline void set_body(std::string&& body) { body_ = stoba(std::move(body)); }
private:
int32_t code_;
std::string language_;
int32_t version_;
int32_t opaque_;
int32_t flag_;
std::string remark_;
std::map<std::string, std::string> ext_fields_;
std::unique_ptr<CommandCustomHeader> custom_header_; // transient
ByteArrayRef body_; // transient
};
template <class H>
H* RemotingCommand::decodeCommandCustomHeader(bool useCache) {
if (useCache) {
auto* cache = custom_header_.get();
if (cache != nullptr && std::type_index(typeid(*cache)) == std::type_index(typeid(H))) {
return static_cast<H*>(custom_header_.get());
}
}
try {
std::unique_ptr<H> header = H::Decode(ext_fields_);
custom_header_ = std::move(header);
return static_cast<H*>(custom_header_.get());
} catch (std::exception& e) {
THROW_MQEXCEPTION(RemotingCommandException, e.what(), -1);
}
}
} // namespace rocketmq
#endif // ROCKETMQ_REMOTINGCOMMAND_H_