blob: f89cee0560a47ec987923ac35bc51d45885fce86 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "tmb/pure_memory_message_bus.h"
#include <atomic>
#include <cassert>
#include <chrono> // NOLINT(build/c++11)
#include <cstdint>
#include <limits>
#include <memory>
#include <unordered_map>
#include <utility>
#include <vector>
#include "tmb/cancellation_token.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/message_style.h"
#include "tmb/tagged_message.h"
#include "tmb/internal/heap_receiver_message_queue.h"
#include "tmb/internal/memory_based_message_bus.h"
#include "tmb/internal/net_memory_container_pusher.h"
#include "tmb/internal/queued_message.h"
#include "tmb/internal/rcu.h"
#include "tmb/internal/shared_bool.h"
#include "tmb/internal/tree_receiver_message_queue.h"
namespace tmb {
class Address;
namespace internal { template <typename T> class IteratorAdapter; }
template <bool enable_deletion_support>
void PureMemoryMessageBus<enable_deletion_support>::ResetBus() {
typename ClientMap::WriteHandle clients_handle(
this->clients_.GetWriteHandleWithValue(
new typename ClientMap::value_type));
clients_handle.Commit();
typename ReceiverDirectory::WriteHandle dir_handle(
this->receiver_directory_.GetWriteHandleWithValue(
new typename ReceiverDirectory::value_type));
dir_handle.Commit();
client_id_sequence_.store(0, std::memory_order_relaxed);
message_id_sequence_.store(-1, std::memory_order_relaxed);
}
template <bool enable_deletion_support>
client_id PureMemoryMessageBus<enable_deletion_support>::Connect() {
client_id new_client_id
= client_id_sequence_.fetch_add(1, std::memory_order_relaxed);
this->InsertClientHandle(new_client_id);
return new_client_id;
}
template <bool enable_deletion_support>
bool PureMemoryMessageBus<enable_deletion_support>::Disconnect(
const client_id client) {
std::shared_ptr<ClientHandle> removed(this->RemoveClientHandle(client));
return removed.get() != nullptr;
}
template <bool enable_deletion_support>
bool PureMemoryMessageBus<enable_deletion_support>::RegisterClientAsSender(
const client_id sender_id,
const message_type_id message_type) {
typename ClientMap::ReadHandle clients_read_handle
= this->clients_.GetReadHandle();
typename ClientMap::value_type::const_iterator it;
switch (this->CheckSenderRegistered(clients_read_handle,
sender_id,
message_type,
&it)) {
case RegistrationState::kNotConnected:
case RegistrationState::kRegistered:
return false;
case RegistrationState::kNotRegistered:
return it->second->sendable_message_types_.insert(message_type);
}
// Should be unreachable:
return false;
}
template <bool enable_deletion_support>
bool PureMemoryMessageBus<enable_deletion_support>::RegisterClientAsReceiver(
const client_id receiver_id,
const message_type_id message_type) {
typename ClientMap::ReadHandle clients_read_handle
= this->clients_.GetReadHandle();
typename ClientMap::value_type::const_iterator it;
switch (this->CheckReceiverRegistered(clients_read_handle,
receiver_id,
message_type,
&it)) {
case RegistrationState::kNotConnected:
case RegistrationState::kRegistered:
return false;
case RegistrationState::kNotRegistered:
break;
}
it->second->receivable_message_types_.insert(message_type);
clients_read_handle.release();
tmb::internal::ThreadsafeSet<client_id> *receiver_set;
this->GetReceiversForType(message_type, &receiver_set);
receiver_set->insert(receiver_id);
return true;
}
template <bool enable_deletion_support>
MessageBus::SendStatus PureMemoryMessageBus<enable_deletion_support>::Send(
const client_id sender_id,
const Address &destination_address,
const MessageStyle &style,
TaggedMessage &&message, // NOLINT(whitespace/operators)
const Priority priority,
CancellationToken *cancellation_token) {
std::chrono::time_point<std::chrono::high_resolution_clock> send_time
= std::chrono::high_resolution_clock::now();
typename ClientMap::ReadHandle clients_handle
= this->clients_.GetReadHandle();
typename ClientMap::value_type::const_iterator it;
switch (this->CheckSenderRegistered(clients_handle,
sender_id,
message.message_type(),
&it)) {
case RegistrationState::kNotConnected:
return SendStatus::kSenderNotConnected;
case RegistrationState::kNotRegistered:
return SendStatus::kSenderNotRegisteredForMessageType;
case RegistrationState::kRegistered:
break;
}
// Verify address and finalize the set of receivers.
// TODO(chasseur): Consider using a vector of iterators to avoid hashing
// twice.
std::vector<client_id> verified_receivers;
SendStatus retval = this->FinalizeReceivers(clients_handle,
destination_address,
style,
message.message_type(),
&verified_receivers);
if (retval != SendStatus::kOK) {
return retval;
}
const std::int64_t message_id =
it->second->sent_message_counter_
< std::numeric_limits<std::uint32_t>::max()
? (static_cast<std::int64_t>(sender_id) << 32)
| ++(it->second->sent_message_counter_)
: message_id_sequence_.fetch_sub(1, std::memory_order_relaxed);
// Construct QueuedMessage.
internal::SharedBool cancel_flag;
if (cancellation_token != nullptr) {
cancel_flag = internal::SharedBool(false);
}
tmb::internal::QueuedMessage queued_message(sender_id,
priority,
send_time,
style.expiration_time_,
cancel_flag,
std::move(message));
queued_message.SetMessageID(message_id);
this->FinishSend(clients_handle,
verified_receivers,
std::move(queued_message));
if (cancellation_token != nullptr) {
cancellation_token->in_memory_cancel_flag_ = std::move(cancel_flag);
cancellation_token->message_id_ = message_id;
if (this->net_support_enabled_) {
// Clean up dead cancel flags, then insert a new one.
this->PurgeDeadCancelTokens(it->second.get());
it->second->send_cancel_tokens_.emplace(message_id, *cancellation_token);
}
}
return SendStatus::kOK;
}
template <bool enable_deletion_support>
void PureMemoryMessageBus<enable_deletion_support>::CancelMessage(
const client_id sender_id,
const CancellationToken &cancellation_token) {
if (cancellation_token.in_memory_cancel_flag_.Valid()) {
cancellation_token.in_memory_cancel_flag_.Set(true);
}
}
template <>
void PureMemoryMessageBus<false>::DeleteById(
const client_id receiver_id,
const std::vector<std::int64_t> &message_ids) {
}
template <>
void PureMemoryMessageBus<true>::DeleteById(
const client_id receiver_id,
const std::vector<std::int64_t> &message_ids) {
ClientHandle *handle = this->LookupClientHandle(receiver_id);
assert(handle != nullptr);
handle->incoming_messages_.DeleteReceivedMessagesById(message_ids, nullptr);
// Clean up leftover dead cancel flags.
this->PurgeDeadCancelTokens(handle);
}
template <bool enable_deletion_support>
void PureMemoryMessageBus<enable_deletion_support>::SenderCancelById(
const client_id sender_id,
const std::vector<std::int64_t> &message_ids) {
ClientHandle *handle = this->LookupClientHandle(sender_id);
assert(handle != nullptr);
for (const std::int64_t message_id : message_ids) {
std::unordered_map<std::int64_t, CancellationToken>::iterator it
= handle->send_cancel_tokens_.find(message_id);
if (it != handle->send_cancel_tokens_.end()) {
it->second.in_memory_cancel_flag_.Set(true);
handle->send_cancel_tokens_.erase(it);
}
}
// Clean up leftover dead cancel flags.
this->PurgeDeadCancelTokens(handle);
}
template <bool enable_deletion_support>
void PureMemoryMessageBus<enable_deletion_support>::ReceiverCancelById(
const client_id receiver_id,
const std::vector<std::int64_t> &message_ids) {
ClientHandle *handle = this->LookupClientHandle(receiver_id);
assert(handle != nullptr);
for (const std::int64_t message_id : message_ids) {
std::unordered_map<std::int64_t, CancellationToken>::iterator it
= handle->receive_cancel_tokens_.find(message_id);
if (it != handle->receive_cancel_tokens_.end()) {
it->second.in_memory_cancel_flag_.Set(true);
handle->receive_cancel_tokens_.erase(it);
}
}
// Clean up leftover dead cancel flags.
this->PurgeDeadCancelTokens(handle);
}
template <bool enable_deletion_support>
std::size_t PureMemoryMessageBus<enable_deletion_support>::ReceiveImpl(
const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
internal::ContainerPusher *pusher) {
ClientHandle *handle = this->LookupClientHandle(receiver_id);
assert(handle != nullptr);
std::unique_ptr<internal::NetMemoryContainerPusher> net_pusher;
if (this->net_support_enabled_) {
// Clean up leftover dead cancel flags.
this->PurgeDeadCancelTokens(handle);
net_pusher.reset(new internal::NetMemoryContainerPusher(
pusher,
&(handle->receive_cancel_tokens_)));
}
return handle->incoming_messages_.Pop(
minimum_priority,
max_messages,
delete_immediately,
this->net_support_enabled_ ? net_pusher.get() : pusher,
nullptr);
}
template <bool enable_deletion_support>
std::size_t PureMemoryMessageBus<enable_deletion_support>
::ReceiveIfAvailableImpl(const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
internal::ContainerPusher *pusher) {
ClientHandle *handle = this->LookupClientHandle(receiver_id);
assert(handle != nullptr);
std::unique_ptr<internal::NetMemoryContainerPusher> net_pusher;
if (this->net_support_enabled_) {
// Clean up leftover dead cancel flags.
this->PurgeDeadCancelTokens(handle);
net_pusher.reset(new internal::NetMemoryContainerPusher(
pusher,
&(handle->receive_cancel_tokens_)));
}
return handle->incoming_messages_.PopIfAvailable(
minimum_priority,
max_messages,
delete_immediately,
this->net_support_enabled_ ? net_pusher.get() : pusher,
nullptr);
}
template <>
void PureMemoryMessageBus<true>::DeleteImpl(
const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter) {
ClientHandle *handle = LookupClientHandle(receiver_id);
assert(handle != nullptr);
handle->incoming_messages_.DeleteReceivedMessages(adapter, nullptr);
}
template <>
void PureMemoryMessageBus<false>::DeleteImpl(
const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter) {
}
template <bool enable_deletion_support>
void PureMemoryMessageBus<enable_deletion_support>::CancelImpl(
const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter) {
while (adapter->Valid()) {
if ((*adapter)->in_memory_cancel_flag.Valid()) {
(*adapter)->in_memory_cancel_flag.Set(true);
}
adapter->Next();
}
}
// Explicitly instantiate and compile-in both versions of PureMemoryMessageBus.
template class PureMemoryMessageBus<true>;
template class PureMemoryMessageBus<false>;
} // namespace tmb