blob: 2e31bd446f3f7d7930fc1e522e6412816f81d877 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef TMB_INTERNAL_MEMORY_BASED_MESSAGE_BUS_H_
#define TMB_INTERNAL_MEMORY_BASED_MESSAGE_BUS_H_
#include <cstddef>
#include <cstdint>
#include <memory>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "tmb/cancellation_token.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/internal/heap_receiver_message_queue.h"
#include "tmb/internal/rcu.h"
#include "tmb/internal/threadsafe_set.h"
#include "tmb/internal/tree_receiver_message_queue.h"
namespace tmb {
class Address;
class MessageStyle;
namespace internal {
class QueuedMessage;
/**
* @brief Abstract base class which implements functionality common to
* PureMemoryMessageBus and MemoryMirrorMessageBus.
**/
template <bool enable_deletion_support, bool memory_mirror>
class MemoryBasedMessageBus : public MessageBus {
public:
struct ClientHandle {
ClientHandle() : sent_message_counter_(0) {
}
ClientHandle(std::unordered_set<message_type_id> *sendable_types,
std::unordered_set<message_type_id> *receivable_types,
const std::uint32_t sent_message_counter)
: sendable_message_types_(sendable_types),
receivable_message_types_(receivable_types),
sent_message_counter_(sent_message_counter) {
}
ThreadsafeSet<message_type_id> sendable_message_types_;
ThreadsafeSet<message_type_id> receivable_message_types_;
typename std::conditional<enable_deletion_support,
TreeReceiverMessageQueue<memory_mirror>,
HeapReceiverMessageQueue<memory_mirror>>::type
incoming_messages_;
// This is used, together with sender ID, to generate unique message IDs
// until it overflows, at which point we give up and resort to a global
// atomic counter.
std::uint32_t sent_message_counter_;
// When net support is enabled, these are used to lookup cancellation
// tokens by ID. Otherwise they are unused.
//
// NOTE(chasseur): The message ID is also stored in the token itself. We
// could use a simple custom hasher class and an unordered_set to avoid
// storing an extra copy, but doing things this way makes it simpler to
// look up a token using only an ID.
//
// FIXME(chasseur): These are not properly repopulated when reloading a
// MemoryMirror from persistent storage, so it may be possible for a server
// to go down and then recover while clients stay up, after which the
// clients may attempt to cancel messages they received before the failure,
// which will then not actually be cancelled.
std::unordered_map<std::int64_t, CancellationToken> send_cancel_tokens_;
std::unordered_map<std::int64_t, CancellationToken> receive_cancel_tokens_;
};
explicit MemoryBasedMessageBus(const bool enable_net_support)
: net_support_enabled_(enable_net_support),
clients_(new std::unordered_map<client_id,
std::shared_ptr<ClientHandle>>()),
receiver_directory_(
new std::unordered_map<
message_type_id,
std::shared_ptr<ThreadsafeSet<client_id>>>()) {
}
virtual ~MemoryBasedMessageBus() {
}
std::size_t CountQueuedMessagesForClient(const client_id receiver_id)
override;
protected:
typedef internal::RCU<
std::unordered_map<client_id, std::shared_ptr<ClientHandle>>>
ClientMap;
typedef internal::RCU<
std::unordered_map<
message_type_id,
std::shared_ptr<tmb::internal::ThreadsafeSet<client_id>>>>
ReceiverDirectory;
enum class RegistrationState {
kNotConnected,
kNotRegistered,
kRegistered
};
// TODO(chasseur): Currently we are very aggressive about calling this to
// garbage-collect dead tokens: if net support is enabled, it is called every
// time a cancellable message is sent (for the sender's handle), every time a
// message is cancelled (for the caller's handle, either send-side or
// receive-side), and every time messages are received or explicitly deleted
// (for the receiver's handle). This is very effective at reclaiming memory
// and keeping the lookup tables of CancellationTokens small, but it may be
// overkill and eat up CPU cycles unnecessarily. We might consider
// implementing a lazier policy where dead tokens are purged only every N
// calls (for some reasonably tuned value of N), or only when the size of
// 'send_cancel_tokens_' and 'receive_cancel_tokens_' for a handle exceeds a
// certain threshold.
static void PurgeDeadCancelTokens(ClientHandle *handle);
void InsertClientHandle(const client_id client);
std::shared_ptr<ClientHandle> RemoveClientHandle(const client_id client);
ClientHandle* LookupClientHandle(const client_id client) const;
RegistrationState CheckSenderRegistered(
const typename ClientMap::ReadHandle &read_handle,
const client_id client,
const message_type_id message_type,
typename ClientMap::value_type::const_iterator *it) const;
RegistrationState CheckReceiverRegistered(
const typename ClientMap::ReadHandle &read_handle,
const client_id client,
const message_type_id message_type,
typename ClientMap::value_type::const_iterator *it) const;
// Returns true if new set may have been created for 'message_type', false if
// set already existed. Note that in cases where two callers call this method
// for the same 'message_type' which was not present in ReceiverDirectory at
// the same time, this method may return 'true' to both callers. This is an
// intentional quirk so that MemoryMirrorMessageBus will always make sure
// that PersistentMessageBus::InsertMessageTypeUnchecked() is completed
// before PersistentMessageBus::InsertReceivableUnchecked() is called.
bool GetReceiversForType(const message_type_id message_type,
ThreadsafeSet<client_id> **receivers);
MessageBus::SendStatus FinalizeReceivers(
const typename ClientMap::ReadHandle &clients_read_handle,
const Address &destination_address,
const MessageStyle &style,
const message_type_id message_type,
std::vector<client_id> *receivers) const;
void FinishSend(
const typename ClientMap::ReadHandle &clients_read_handle,
const std::vector<client_id> &receivers,
QueuedMessage &&msg) const; // NOLINT(whitespace/operators)
const bool net_support_enabled_;
ClientMap clients_;
ReceiverDirectory receiver_directory_;
private:
// Disallow copy and assign:
MemoryBasedMessageBus(const MemoryBasedMessageBus &orig) = delete;
MemoryBasedMessageBus& operator=(const MemoryBasedMessageBus &rhs) = delete;
};
} // namespace internal
} // namespace tmb
#endif // TMB_INTERNAL_MEMORY_BASED_MESSAGE_BUS_H_