blob: 93f49daaa51c7a7f94509233ebf887828c390d42 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef TMB_TAGGED_MESSAGE_H_
#define TMB_TAGGED_MESSAGE_H_
#include <cassert>
#include <chrono> // NOLINT(build/c++11)
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <memory>
#include "tmb/id_typedefs.h"
#include "tmb/internal/leveldb_keys.h"
#include "tmb/internal/memory_mirror_cancellation_set.h"
#include "tmb/internal/shared_bool.h"
#include "tmb/internal/zookeeper_format.h"
namespace tmb {
template <bool enable_deletion_support> class MemoryMirrorMessageBus;
template <bool enable_deletion_support> class PureMemoryMessageBus;
namespace internal {
class NativeTransactionLog;
class NetMemoryContainerPusher;
class NetServiceImpl;
class QueuedMessage;
template <bool memory_mirror_version> class TreeReceiverMessageQueue;
} // namespace internal
/** \addtogroup TMB
* @{
*/
/**
* @brief An array of bytes with a message_type_id tag to indicate how the
* bytes are to be interpreted.
**/
class TaggedMessage {
public:
/**
* @brief Default constructor which creates an empty, invalid message.
**/
TaggedMessage()
: payload_inline_(true),
message_type_(0) {
payload_.in_line.size = 0;
}
/**
* @brief Constructor which creates an empty, but typed message.
**/
explicit TaggedMessage(const message_type_id message_type)
: payload_inline_(true),
message_type_(message_type) {
payload_.in_line.size = 0;
}
/**
* @brief Constructor.
*
* @param msg A pointer to the message contents in memory, which will be
* copied into this TaggedMessage.
* @param bytes The size of the message at msg in bytes.
* @param message_type The application-defined type ID of the message.
**/
TaggedMessage(const void *msg,
const std::size_t bytes,
const message_type_id message_type)
: message_type_(message_type) {
if (bytes <= sizeof(payload_.in_line.data)) {
std::memcpy(payload_.in_line.data, msg, bytes);
payload_.in_line.size = static_cast<unsigned char>(bytes);
payload_inline_ = true;
} else {
payload_.out_of_line.data = std::malloc(bytes);
std::memcpy(payload_.out_of_line.data, msg, bytes);
payload_.out_of_line.size = bytes;
payload_inline_ = false;
}
}
/**
* @brief Destructor.
**/
~TaggedMessage() {
if (!payload_inline_) {
std::free(payload_.out_of_line.data);
}
}
/**
* @brief Copy constructor.
**/
TaggedMessage(const TaggedMessage &orig) {
std::memcpy(this, &orig, sizeof(TaggedMessage));
if (!payload_inline_) {
payload_.out_of_line.data = std::malloc(payload_.out_of_line.size);
std::memcpy(payload_.out_of_line.data,
orig.payload_.out_of_line.data,
payload_.out_of_line.size);
}
}
/**
* @brief Move constructor.
**/
TaggedMessage(TaggedMessage &&orig) { // NOLINT(build/c++11)
std::memcpy(this, &orig, sizeof(TaggedMessage));
orig.payload_inline_ = true;
}
/**
* @brief Copy-assignment operator.
**/
TaggedMessage& operator=(const TaggedMessage &orig) {
if (this != &orig) {
if (!payload_inline_) {
std::free(payload_.out_of_line.data);
}
std::memcpy(this, &orig, sizeof(TaggedMessage));
if (!payload_inline_) {
payload_.out_of_line.data = std::malloc(payload_.out_of_line.size);
std::memcpy(payload_.out_of_line.data,
orig.payload_.out_of_line.data,
payload_.out_of_line.size);
}
}
return *this;
}
/**
* @brief Move-assignment operator.
**/
TaggedMessage& operator=(TaggedMessage &&orig) { // NOLINT(build/c++11)
if (this != &orig) {
if (!payload_inline_) {
std::free(payload_.out_of_line.data);
}
std::memcpy(this, &orig, sizeof(TaggedMessage));
orig.payload_inline_ = true;
}
return *this;
}
/**
* @brief Overwrite this TaggedMessage with new contents.
*
* @param msg A pointer to the message contents in memory, which will be
* copied into this TaggedMessage.
* @param bytes The size of the message at msg in bytes.
* @param message_type The application-defined type ID of the message.
**/
void set_message(const void *msg,
const std::size_t bytes,
const message_type_id message_type) {
if (!payload_inline_) {
std::free(payload_.out_of_line.data);
}
if (bytes <= sizeof(payload_.in_line.data)) {
std::memcpy(payload_.in_line.data, msg, bytes);
payload_.in_line.size = static_cast<unsigned char>(bytes);
payload_inline_ = true;
} else {
payload_.out_of_line.data = std::malloc(bytes);
std::memcpy(payload_.out_of_line.data, msg, bytes);
payload_.out_of_line.size = bytes;
payload_inline_ = false;
}
message_type_ = message_type;
}
/**
* @brief Overwrite this TaggedMessage with new contents, taking ownership
* of a memory block allocated elsewhere.
* @warning msg must have been allocated with one of the C standard library
* functions malloc(), calloc(), or realloc() (NOT C++ new). It will
* eventually be deallocated with free(). See also set_message(),
* which is safer (but copies the message).
*
* @param msg A pointer to the message contents in memory, which this
* TaggedMessage takes ownership of.
* @param bytes The size of the message at msg in bytes.
* @param message_type The application-defined type ID of the message.
**/
void acquire_message(void *msg,
const std::size_t bytes,
const message_type_id message_type) {
if (!payload_inline_) {
std::free(payload_.out_of_line.data);
}
if (bytes <= sizeof(payload_.in_line.data)) {
std::memcpy(payload_.in_line.data, msg, bytes);
std::free(msg);
payload_.in_line.size = static_cast<unsigned char>(bytes);
payload_inline_ = true;
} else {
payload_.out_of_line.data = msg;
payload_.out_of_line.size = bytes;
payload_inline_ = false;
}
message_type_ = message_type;
}
/**
* @brief Get this message's contents.
*
* @return A pointer to this message's content as raw bytes.
**/
inline const void* message() const {
return payload_inline_ ? payload_.in_line.data : payload_.out_of_line.data;
}
/**
* @brief Get the size of this message.
*
* @return The size of the message returned by message() in bytes.
**/
inline std::size_t message_bytes() const {
return payload_inline_ ? payload_.in_line.size : payload_.out_of_line.size;
}
/**
* @brief Get the type of this message.
*
* @return The application-defined type of this message.
**/
inline message_type_id message_type() const {
return message_type_;
}
private:
inline void ExtendOutOfLineAllocation(const std::size_t extra_bytes) {
assert(!payload_inline_);
payload_.out_of_line.data
= std::realloc(payload_.out_of_line.data,
payload_.out_of_line.size + extra_bytes);
}
union Payload {
struct OutOfLine {
void *data;
std::size_t size;
} out_of_line;
struct Inline {
char data[sizeof(OutOfLine) - 1];
unsigned char size;
} in_line;
} payload_;
bool payload_inline_;
message_type_id message_type_;
friend class LevelDBMessageBus;
friend class ZookeeperMessageBus;
};
/**
* @brief A TaggedMessage with additional annotations to indicate who sent the
* message and when.
**/
struct AnnotatedMessage {
/**
* @brief Default constructor which creates an empty, invalid
* AnnotatedMessage.
**/
AnnotatedMessage() {
deletion_token.message_id = -1;
}
AnnotatedMessage(const AnnotatedMessage &orig) = default;
AnnotatedMessage(AnnotatedMessage &&orig) = default;
AnnotatedMessage& operator=(const AnnotatedMessage &orig) = default;
AnnotatedMessage& operator=(
AnnotatedMessage &&orig) = default; // NOLINT(build/c++11)
client_id sender;
std::chrono::time_point<std::chrono::high_resolution_clock> send_time;
TaggedMessage tagged_message;
private:
union DeletionToken {
char leveldb_key[sizeof(internal::LevelDBQueuedMessageKey)];
std::int64_t message_id;
char zookeeper_nodename[internal::kZookeeperQueuedMessageNameLength];
} deletion_token;
mutable internal::SharedBool in_memory_cancel_flag;
std::shared_ptr<internal::MemoryMirrorCancellationSet>
memory_mirror_cancellation_set;
friend class internal::NativeTransactionLog;
friend class internal::NetMemoryContainerPusher;
friend class internal::NetServiceImpl;
friend class internal::QueuedMessage;
friend class internal::TreeReceiverMessageQueue<false>;
friend class internal::TreeReceiverMessageQueue<true>;
friend class LevelDBMessageBus;
friend class MemoryMirrorMessageBus<false>;
friend class MemoryMirrorMessageBus<true>;
friend class NativeNetClientMessageBus;
friend class PureMemoryMessageBus<false>;
friend class PureMemoryMessageBus<true>;
friend class SQLiteMessageBus;
friend class VoltDBMessageBus;
friend class ZookeeperMessageBus;
};
/** @} */
} // namespace tmb
#endif // TMB_TAGGED_MESSAGE_H_