| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #ifndef TESTS_MESSAGE_BUS_UNITTEST_COMMON_H_ |
| #define TESTS_MESSAGE_BUS_UNITTEST_COMMON_H_ |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <bitset> |
| #include <chrono> // NOLINT(build/c++11) |
| #include <cstddef> |
| #include <cstring> |
| #include <limits> |
| #include <memory> |
| #include <thread> // NOLINT(build/c++11) |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "gtest/gtest.h" |
| |
| #include "tmb/address.h" |
| #include "tmb/cancellation_token.h" |
| #include "tmb/id_typedefs.h" |
| #include "tmb/message_bus.h" |
| #include "tmb/message_style.h" |
| #include "tmb/priority.h" |
| #include "tmb/tagged_message.h" |
| |
| namespace tmb { |
| |
| constexpr int kNumSampleMessages = 64; |
| constexpr std::size_t kNumClients = 64; |
| constexpr std::size_t kNumThreads = 16; |
| |
| TaggedMessage MakeTaggedSimpleIntMessage(int int_payload, |
| message_type_id type = 0) { |
| return TaggedMessage(&int_payload, sizeof(int_payload), type); |
| } |
| |
| // Object-oriented wrapper for std::thread. |
| class Thread { |
| public: |
| Thread() { |
| } |
| |
| void Start() { |
| internal_thread_ = std::thread(Thread::ExecuteRunMethodOfThread, this); |
| } |
| |
| void Join() { |
| internal_thread_.join(); |
| } |
| |
| protected: |
| static void ExecuteRunMethodOfThread(Thread* thread) { |
| thread->Run(); |
| } |
| |
| virtual void Run() = 0; |
| |
| private: |
| std::thread internal_thread_; |
| |
| // Disallow copy and assign: |
| Thread(const Thread &orig) = delete; |
| Thread& operator=(const Thread &rhs) = delete; |
| }; |
| |
| template <typename MessageBusT> |
| class MessageBusTest : public ::testing::Test { |
| protected: |
| virtual void SetUp() { |
| message_bus_.reset(CreateBus()); |
| ASSERT_TRUE(message_bus_->Initialize()); |
| message_bus_->ResetBus(); |
| } |
| |
| MessageBus* CreateBus() { |
| return new MessageBusT; |
| } |
| |
| std::unique_ptr<MessageBus> message_bus_; |
| }; |
| |
| TYPED_TEST_CASE_P(MessageBusTest); |
| |
| // An alias used for test cases which require support for deleting |
| // messages separately from receiving them. |
| template <typename MessageBusT> using SeparateDeletionMessageBusTest |
| = MessageBusTest<MessageBusT>; |
| TYPED_TEST_CASE_P(SeparateDeletionMessageBusTest); |
| |
| // Alias for test cases which require the ability to reconnect to a persistent |
| // message bus. |
| template <typename MessageBusT> using ReconnectMessageBusTest |
| = MessageBusTest<MessageBusT>; |
| TYPED_TEST_CASE_P(ReconnectMessageBusTest); |
| |
| // Alias for test cases which require the ability to reconnect to a persistent |
| // message bus, and for which "active" state objects (e.g. CancellationTokens |
| // and AnnotatedMessages) remain valid across different instances of the |
| // MessageBus. |
| template <typename MessageBusT> using ReconnectActiveStateMessageBusTest |
| = MessageBusTest<MessageBusT>; |
| TYPED_TEST_CASE_P(ReconnectActiveStateMessageBusTest); |
| |
| // Alias for test cases which require both separate-deletion support and active |
| // state objects that remain valid across different instances of the |
| // MessageBus. |
| template <typename MessageBusT> using ReconnectSeparateDeletionMessageBusTest |
| = MessageBusTest<MessageBusT>; |
| TYPED_TEST_CASE_P(ReconnectSeparateDeletionMessageBusTest); |
| |
| // Alias for test cases which require the ability for multiple instances of a |
| // MessageBus class to present interfaces to the same underlying bus. |
| template <typename MessageBusT> using MultiInstanceMessageBusTest |
| = MessageBusTest<MessageBusT>; |
| TYPED_TEST_CASE_P(MultiInstanceMessageBusTest); |
| |
| |
| TYPED_TEST_P(MessageBusTest, ConnectTest) { |
| std::unordered_set<client_id> assigned_ids; |
| client_id highest_id = std::numeric_limits<client_id>::min(); |
| |
| // Connect a bunch of clients and make sure their IDs don't collide. |
| for (std::size_t client_idx = 0; client_idx < kNumClients; ++client_idx) { |
| client_id current_client = this->message_bus_->Connect(); |
| if (current_client > highest_id) { |
| highest_id = current_client; |
| } |
| EXPECT_EQ(assigned_ids.end(), assigned_ids.find(current_client)); |
| EXPECT_TRUE(assigned_ids.insert(current_client).second); |
| } |
| |
| // Disconnect half of the clients. |
| ASSERT_EQ(kNumClients, assigned_ids.size()); |
| std::unordered_set<client_id>::const_iterator client_it |
| = assigned_ids.begin(); |
| for (std::size_t client_idx = 0; |
| client_idx < kNumClients / 2; |
| ++client_idx) { |
| ASSERT_NE(client_it, assigned_ids.end()); |
| EXPECT_TRUE(this->message_bus_->Disconnect(*client_it)); |
| ++client_it; |
| } |
| |
| // Attempting to disconnect the same clients again should fail. |
| client_it = assigned_ids.begin(); |
| for (std::size_t client_idx = 0; |
| client_idx < kNumClients / 2; |
| ++client_idx) { |
| ASSERT_NE(client_it, assigned_ids.end()); |
| EXPECT_FALSE(this->message_bus_->Disconnect(*client_it)); |
| ++client_it; |
| } |
| |
| // Connect some more new clients. IDs should not be reused. |
| for (std::size_t client_idx = 0; |
| client_idx < kNumClients / 2; |
| ++client_idx) { |
| client_id current_client = this->message_bus_->Connect(); |
| if (current_client > highest_id) { |
| highest_id = current_client; |
| } |
| EXPECT_EQ(assigned_ids.end(), assigned_ids.find(current_client)); |
| EXPECT_TRUE(assigned_ids.insert(current_client).second); |
| } |
| |
| // Can not disconnect a client which was never connected. |
| EXPECT_FALSE(this->message_bus_->Disconnect(highest_id + 1)); |
| } |
| |
| |
| class ConnectorThread : public Thread { |
| public: |
| ConnectorThread(MessageBus *message_bus_ptr, std::size_t num_clients) |
| : message_bus_ptr_(message_bus_ptr), |
| num_clients_(num_clients) { |
| } |
| |
| const std::unordered_set<client_id>& assigned_ids() const { |
| return assigned_ids_; |
| } |
| |
| client_id last_connected_id() const { |
| return last_connected_id_; |
| } |
| |
| protected: |
| void Run() { |
| for (std::size_t client_idx = 0; client_idx < num_clients_; ++client_idx) { |
| client_id current_client = message_bus_ptr_->Connect(); |
| EXPECT_EQ(assigned_ids_.end(), assigned_ids_.find(current_client)); |
| EXPECT_TRUE(assigned_ids_.insert(current_client).second); |
| } |
| |
| // Disconnect half of the clients. |
| ASSERT_EQ(num_clients_, assigned_ids_.size()); |
| std::unordered_set<client_id>::const_iterator client_it |
| = assigned_ids_.begin(); |
| for (std::size_t client_idx = 0; |
| client_idx < num_clients_ / 2; |
| ++client_idx) { |
| ASSERT_NE(client_it, assigned_ids_.end()); |
| EXPECT_TRUE(message_bus_ptr_->Disconnect(*client_it)); |
| ++client_it; |
| } |
| |
| // Attempting to disconnect the same clients again should fail. |
| client_it = assigned_ids_.begin(); |
| for (std::size_t client_idx = 0; |
| client_idx < num_clients_ / 2; |
| ++client_idx) { |
| ASSERT_NE(client_it, assigned_ids_.end()); |
| EXPECT_FALSE(message_bus_ptr_->Disconnect(*client_it)); |
| ++client_it; |
| } |
| |
| // Connect some more new clients. IDs should not be reused. |
| for (std::size_t client_idx = 0; |
| client_idx < num_clients_ / 2; |
| ++client_idx) { |
| last_connected_id_ = message_bus_ptr_->Connect(); |
| EXPECT_EQ(assigned_ids_.end(), assigned_ids_.find(last_connected_id_)); |
| EXPECT_TRUE(assigned_ids_.insert(last_connected_id_).second); |
| } |
| } |
| |
| private: |
| MessageBus *message_bus_ptr_; |
| const std::size_t num_clients_; |
| std::unordered_set<client_id> assigned_ids_; |
| client_id last_connected_id_; |
| }; |
| |
| TYPED_TEST_P(MessageBusTest, ThreadedConnectTest) { |
| // Set up multiple concurrent threads. |
| std::vector<std::unique_ptr<ConnectorThread>> threads; |
| for (std::size_t thread_idx = 0; thread_idx < kNumThreads; ++thread_idx) { |
| threads.emplace_back(new ConnectorThread(this->message_bus_.get(), |
| kNumClients)); |
| } |
| |
| // Start the threads, then wait for all of them to finish. |
| for (auto &thread : threads) { |
| thread->Start(); |
| } |
| for (auto &thread : threads) { |
| thread->Join(); |
| } |
| |
| // Make sure that none of the threads got duplicate ids. |
| std::unordered_set<client_id> global_assigned_ids; |
| client_id highest_id = std::numeric_limits<client_id>::min(); |
| for (const auto &thread : threads) { |
| for (const auto &client : thread->assigned_ids()) { |
| EXPECT_EQ(global_assigned_ids.end(), global_assigned_ids.find(client)); |
| EXPECT_TRUE(global_assigned_ids.insert(client).second); |
| if (client > highest_id) { |
| highest_id = client; |
| } |
| } |
| } |
| EXPECT_EQ(kNumThreads * (kNumClients + kNumClients / 2), |
| global_assigned_ids.size()); |
| |
| // Can not disconnect a client which was never connected. |
| EXPECT_FALSE(this->message_bus_->Disconnect(highest_id + 1)); |
| |
| // Disconnect some clients that were first connected in other threads. |
| for (const auto &thread : threads) { |
| EXPECT_TRUE(this->message_bus_->Disconnect(thread->last_connected_id())); |
| } |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, RegisterAndSendTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| |
| Address addr; |
| MessageStyle style; |
| |
| // Attempt to send a message which we are not registered to send. |
| EXPECT_EQ(MessageBus::SendStatus::kSenderNotRegisteredForMessageType, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Attempt to send from a non-connected client. |
| EXPECT_EQ(MessageBus::SendStatus::kSenderNotConnected, |
| this->message_bus_->Send(sender_id + 1, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Register client as a sender. |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| // Registering the same client twice for the same message type should fail. |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| // Registering for a different message type is OK. |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 1)); |
| // Can't register a client which is not connected. |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsSender(sender_id + 1, 0)); |
| |
| // Try sending with an empty Address. |
| EXPECT_EQ(MessageBus::SendStatus::kNoReceivers, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Connect a receiver. |
| client_id receiver_id = this->message_bus_->Connect(); |
| |
| // Can't send to a receiver which is not registered for the message type. |
| addr.AddRecipient(receiver_id); |
| EXPECT_EQ(MessageBus::SendStatus::kReceiverNotRegisteredForMessageType, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Register the receiver. |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| // Registering twice should fail. |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| // Registering for a different message type is OK. |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 2)); |
| // Can't register a client which is not connected. |
| EXPECT_FALSE( |
| this->message_bus_->RegisterClientAsReceiver(receiver_id + sender_id + 1, |
| 0)); |
| |
| // Sending directly should be OK now. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Still can't send a message type that the receiver is not registered for. |
| EXPECT_EQ(MessageBus::SendStatus::kReceiverNotRegisteredForMessageType, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42, 1))); |
| |
| // Send to all possible recipients instead of an explicitly specified one. |
| addr.All(true); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Send-to-all with no possible recipients. |
| EXPECT_EQ(MessageBus::SendStatus::kNoReceivers, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42, 1))); |
| |
| // Send-to-all without first registering as a sender for the message type. |
| EXPECT_EQ(MessageBus::SendStatus::kSenderNotRegisteredForMessageType, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42, 2))); |
| |
| // Disconnect receiver. |
| EXPECT_TRUE(this->message_bus_->Disconnect(receiver_id)); |
| |
| addr.All(false); |
| EXPECT_EQ(MessageBus::SendStatus::kNoReceivers, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| addr.All(true); |
| EXPECT_EQ(MessageBus::SendStatus::kNoReceivers, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Now register sender for message type '2', which the now-disconnected |
| // receiver was previously registered for. |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 2)); |
| EXPECT_EQ(MessageBus::SendStatus::kNoReceivers, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42, 2))); |
| |
| // Connect a second receiver. |
| client_id other_receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(other_receiver_id, |
| 0)); |
| |
| addr.All(false); |
| addr.AddRecipient(other_receiver_id); |
| // 'addr' now contains 2 explicit recipients, only one of which is connected. |
| |
| // OK to send as long as at least 1 receiver is still connected. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Also check that send-to-all still works as expected. |
| addr.All(true); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Now disconnect the second receiver. |
| EXPECT_TRUE(this->message_bus_->Disconnect(other_receiver_id)); |
| |
| addr.All(false); |
| EXPECT_EQ(MessageBus::SendStatus::kNoReceivers, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| addr.All(true); |
| EXPECT_EQ(MessageBus::SendStatus::kNoReceivers, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Disconnect the sender and try to send. |
| EXPECT_TRUE(this->message_bus_->Disconnect(sender_id)); |
| EXPECT_EQ(MessageBus::SendStatus::kSenderNotConnected, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Trying to register a disconnected client to send or receive other types |
| // should also fail. |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsSender(sender_id, 3)); |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsReceiver(sender_id, 3)); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, SimpleSendAndReceiveTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| MessageStyle style; |
| |
| for (int value = 0; value < kNumSampleMessages; ++value) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(value))); |
| } |
| |
| // Check that each of the sent messages was received. |
| std::bitset<kNumSampleMessages> check_set; |
| AnnotatedMessage received_msg; |
| while (this->message_bus_->ReceiveIfAvailable(receiver_id, &received_msg)) { |
| EXPECT_EQ(sender_id, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_msg.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_msg.tagged_message.message()); |
| ASSERT_GE(payload, 0); |
| ASSERT_LT(payload, kNumSampleMessages); |
| check_set.set(payload, true); |
| |
| this->message_bus_->DeleteMessage(receiver_id, received_msg); |
| } |
| EXPECT_TRUE(check_set.all()); |
| } |
| |
| |
| // Test sending of messages too large to be represented in-line in a |
| // TaggedMessage. |
| TYPED_TEST_P(MessageBusTest, LargeMessageSimpleSendAndReceiveTest) { |
| const char message1[] |
| = "They say there is no hope\n" |
| "They say no U.F.O.s\n" |
| "Why is no head held high?\n" |
| "Maybe you'll see them fly.\n"; |
| |
| const char message2[] |
| = "Well ... I'm driving in a black on black in black Porsche 924\n" |
| "Tempting fate for a little bit more\n" |
| "My head is filled with Techno beat, Metro Times, Face magazine\n" |
| "Shadows out of time and space\n" |
| "TIME/SPACE/TRANSMAT\n"; |
| |
| const char message3[] |
| = "For those who know it's time to leave the house\n" |
| "and go back to the field\n" |
| "Find your strength in the sound and make your transition\n"; |
| |
| |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| MessageStyle style; |
| |
| TaggedMessage msg(message1, std::strlen(message1) + 1, 0); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| std::move(msg))); |
| |
| msg = TaggedMessage(message2, std::strlen(message2) + 1, 0); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| std::move(msg))); |
| |
| msg = TaggedMessage(message3, std::strlen(message3) + 1, 0); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| std::move(msg))); |
| |
| std::vector<AnnotatedMessage> received_messages; |
| EXPECT_EQ(3u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_id, |
| &received_messages)); |
| ASSERT_EQ(3u, received_messages.size()); |
| |
| EXPECT_EQ(sender_id, received_messages[0].sender); |
| EXPECT_EQ(0u, received_messages[0].tagged_message.message_type()); |
| EXPECT_EQ(std::strlen(message1) + 1, |
| received_messages[0].tagged_message.message_bytes()); |
| EXPECT_STREQ(message1, |
| static_cast<const char*>( |
| received_messages[0].tagged_message.message())); |
| |
| EXPECT_EQ(sender_id, received_messages[1].sender); |
| EXPECT_EQ(0u, received_messages[1].tagged_message.message_type()); |
| EXPECT_EQ(std::strlen(message2) + 1, |
| received_messages[1].tagged_message.message_bytes()); |
| EXPECT_STREQ(message2, |
| static_cast<const char*>( |
| received_messages[1].tagged_message.message())); |
| |
| EXPECT_EQ(sender_id, received_messages[2].sender); |
| EXPECT_EQ(0u, received_messages[2].tagged_message.message_type()); |
| EXPECT_EQ(std::strlen(message3) + 1, |
| received_messages[2].tagged_message.message_bytes()); |
| EXPECT_STREQ(message3, |
| static_cast<const char*>( |
| received_messages[2].tagged_message.message())); |
| } |
| |
| |
| // Similar to SimpleSendAndReceiveTest above, but uses the blocking Receive() |
| // call and interleaves sending with receiving. |
| TYPED_TEST_P(MessageBusTest, InterleavedBlockingSimpleSendAndReceiveTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| MessageStyle style; |
| |
| for (int value = 0; value < kNumSampleMessages; ++value) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(value))); |
| |
| AnnotatedMessage received_msg; |
| // Alternate between deleting immediately and in a seperate call. |
| if (value & 0x1) { |
| received_msg = this->message_bus_->Receive(receiver_id, 0, true); |
| } else { |
| received_msg = this->message_bus_->Receive(receiver_id, 0, false); |
| } |
| |
| EXPECT_EQ(sender_id, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_msg.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_msg.tagged_message.message()); |
| EXPECT_EQ(value, payload); |
| |
| if (!(value & 0x1)) { |
| this->message_bus_->DeleteMessage(receiver_id, received_msg); |
| } |
| } |
| |
| // Send and receive an extra message that is not deleted (this checks that |
| // garbage collection of leftover watch contexts works properly in the |
| // Zookeeper implementation). |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| AnnotatedMessage final_msg |
| = this->message_bus_->Receive(receiver_id, 0, false); |
| EXPECT_EQ(sender_id, final_msg.sender); |
| EXPECT_EQ(0u, final_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), final_msg.tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>(final_msg.tagged_message.message()); |
| EXPECT_EQ(42, payload); |
| } |
| |
| |
| class SimpleReceiverThread : public Thread { |
| public: |
| SimpleReceiverThread(MessageBus *message_bus_ptr, |
| const client_id sender_id) |
| : message_bus_ptr_(message_bus_ptr), |
| sender_id_(sender_id), |
| receiver_id_(sender_id) { |
| } |
| |
| // Spins until the receiver id is actually available. |
| client_id GetReceiverID() const { |
| client_id receiver_id = receiver_id_.load(); |
| while (receiver_id == sender_id_) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| receiver_id = receiver_id_.load(); |
| } |
| return receiver_id; |
| } |
| |
| bool AllMessagesReceived() const { |
| return received_set_.all(); |
| } |
| |
| protected: |
| void Run() { |
| const client_id receiver_id = message_bus_ptr_->Connect(); |
| EXPECT_TRUE(message_bus_ptr_->RegisterClientAsReceiver(receiver_id, 0)); |
| receiver_id_.store(receiver_id); |
| for (std::size_t message_num = 0; |
| message_num < kNumSampleMessages; |
| ++message_num) { |
| AnnotatedMessage received_msg = message_bus_ptr_->Receive(receiver_id); |
| EXPECT_EQ(sender_id_, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_msg.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_msg.tagged_message.message()); |
| ASSERT_GE(payload, 0); |
| ASSERT_LT(payload, kNumSampleMessages); |
| received_set_.set(payload, true); |
| |
| message_bus_ptr_->DeleteMessage(receiver_id, received_msg); |
| } |
| message_bus_ptr_->Disconnect(receiver_id); |
| } |
| |
| private: |
| MessageBus *message_bus_ptr_; |
| const client_id sender_id_; |
| std::atomic<client_id> receiver_id_; |
| std::bitset<kNumSampleMessages> received_set_; |
| }; |
| |
| TYPED_TEST_P(MessageBusTest, ThreadedSimpleSendAndReceiveTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| SimpleReceiverThread receiver_thread(this->message_bus_.get(), sender_id); |
| receiver_thread.Start(); |
| client_id receiver_id = receiver_thread.GetReceiverID(); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| MessageStyle style; |
| |
| for (int value = 0; value < kNumSampleMessages; ++value) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(value))); |
| // Yield every 16 messages. |
| if ((value & 0xF) == 0) { |
| std::this_thread::yield(); |
| } |
| } |
| |
| receiver_thread.Join(); |
| EXPECT_TRUE(receiver_thread.AllMessagesReceived()); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, ThreadedMultipleReceiversTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| // Create and start receiver threads. |
| std::vector<std::unique_ptr<SimpleReceiverThread>> threads; |
| for (std::size_t thread_idx = 0; thread_idx < kNumThreads; ++thread_idx) { |
| threads.emplace_back(new SimpleReceiverThread(this->message_bus_.get(), |
| sender_id)); |
| } |
| for (auto &thread : threads) { |
| thread->Start(); |
| } |
| |
| Address addr; |
| for (auto &thread : threads) { |
| addr.AddRecipient(thread->GetReceiverID()); |
| } |
| MessageStyle style; |
| style.Broadcast(true); |
| |
| for (int value = 0; value < kNumSampleMessages; ++value) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(value))); |
| // Yield every 16 messages. |
| if ((value & 0xF) == 0) { |
| std::this_thread::yield(); |
| } |
| } |
| |
| for (auto &thread : threads) { |
| thread->Join(); |
| } |
| for (auto &thread : threads) { |
| EXPECT_TRUE(thread->AllMessagesReceived()); |
| } |
| } |
| |
| |
| class SimpleSenderThread : public Thread { |
| public: |
| SimpleSenderThread(MessageBus *message_bus_ptr, |
| const client_id receiver_id) |
| : message_bus_ptr_(message_bus_ptr), |
| receiver_id_(receiver_id), |
| sender_id_(receiver_id) { |
| } |
| |
| // Spins until the sender id is actually available. |
| client_id GetSenderID() const { |
| client_id sender_id = sender_id_.load(); |
| while (sender_id == receiver_id_) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| sender_id = sender_id_.load(); |
| } |
| return sender_id; |
| } |
| |
| protected: |
| void Run() { |
| const client_id sender_id = message_bus_ptr_->Connect(); |
| EXPECT_TRUE(message_bus_ptr_->RegisterClientAsSender(sender_id, 0)); |
| sender_id_.store(sender_id); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id_); |
| MessageStyle style; |
| for (std::size_t message_num = 0; |
| message_num < kNumSampleMessages; |
| ++message_num) { |
| EXPECT_EQ( |
| MessageBus::SendStatus::kOK, |
| message_bus_ptr_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(message_num))); |
| } |
| message_bus_ptr_->Disconnect(sender_id); |
| } |
| |
| private: |
| MessageBus *message_bus_ptr_; |
| const client_id receiver_id_; |
| std::atomic<client_id> sender_id_; |
| }; |
| |
| TYPED_TEST_P(MessageBusTest, ThreadedMultipleSendersTest) { |
| // Receive messages in the main thread. |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| // Create and start sender threads. |
| std::vector<std::unique_ptr<SimpleSenderThread>> threads; |
| for (std::size_t thread_idx = 0; thread_idx < kNumThreads; ++thread_idx) { |
| threads.emplace_back(new SimpleSenderThread(this->message_bus_.get(), |
| receiver_id)); |
| } |
| for (auto &thread : threads) { |
| thread->Start(); |
| } |
| |
| // Set up data structure to check that all expected messages are received. |
| std::unordered_map<client_id, std::bitset<kNumSampleMessages>> received_sets; |
| for (auto &thread : threads) { |
| received_sets.emplace(thread->GetSenderID(), |
| std::bitset<kNumSampleMessages>()); |
| } |
| |
| // Receive all the expected messages. |
| for (std::size_t received_num = 0; |
| received_num < kNumThreads * kNumSampleMessages; |
| ++received_num) { |
| AnnotatedMessage received_message |
| = this->message_bus_->Receive(receiver_id); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_message.tagged_message.message()); |
| ASSERT_GE(payload, 0); |
| ASSERT_LT(payload, kNumSampleMessages); |
| |
| ASSERT_NE(received_sets.find(received_message.sender), |
| received_sets.end()); |
| EXPECT_FALSE(received_sets[received_message.sender][payload]); |
| received_sets[received_message.sender].set(payload, true); |
| |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| } |
| |
| // No more messages should be recieved after the above loop. |
| for (auto &thread : threads) { |
| thread->Join(); |
| } |
| AnnotatedMessage received_message; |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| |
| // Check that all expected messages have been received. |
| for (const auto &received_pair : received_sets) { |
| EXPECT_TRUE(received_pair.second.all()); |
| } |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, ReceiveBatchTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| std::vector<AnnotatedMessage> received_messages; |
| |
| // Initial attempt to receive should give 0 messages. |
| EXPECT_EQ(0u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_id, |
| &received_messages)); |
| EXPECT_EQ(0u, received_messages.size()); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| MessageStyle style; |
| |
| // Send a few messages. |
| for (int i = 0; i < 16; ++i) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(i))); |
| } |
| |
| // Receive a batch of limited size. |
| EXPECT_EQ(8u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_id, |
| &received_messages, |
| 0, |
| 8)); |
| EXPECT_EQ(8u, received_messages.size()); |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(static_cast<int>(i), payload); |
| } |
| |
| this->message_bus_->DeleteMessages(receiver_id, |
| received_messages.begin(), |
| received_messages.end()); |
| received_messages.clear(); |
| |
| // Receive a single message. |
| AnnotatedMessage single_message; |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &single_message)); |
| EXPECT_EQ(sender_id, single_message.sender); |
| EXPECT_EQ(0u, single_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), single_message.tagged_message.message_bytes()); |
| EXPECT_EQ(8, |
| *static_cast<const int*>(single_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, single_message); |
| |
| // Receive a batch via the blocking interface. |
| EXPECT_EQ(4u, |
| this->message_bus_->ReceiveBatch(receiver_id, |
| &received_messages, |
| 0, |
| 4)); |
| EXPECT_EQ(4u, received_messages.size()); |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(static_cast<int>(i + 9), payload); |
| } |
| this->message_bus_->DeleteMessages(receiver_id, |
| received_messages.begin(), |
| received_messages.end()); |
| |
| // Append messages to a container that already has some in it. |
| EXPECT_EQ(3u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_id, |
| &received_messages)); |
| EXPECT_EQ(7u, received_messages.size()); |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(static_cast<int>(i + 9), payload); |
| } |
| |
| // Double-deletion of the first four elements is a no-op. |
| this->message_bus_->DeleteMessages(receiver_id, |
| received_messages.begin(), |
| received_messages.end()); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, PriorityTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| MessageStyle style; |
| |
| // Fill up a vector with all possible priority levels, then shuffle the |
| // order. |
| std::vector<Priority> priorities; |
| for (Priority priority = 0; priority <= kMaxAsyncPriority; ++priority) { |
| priorities.push_back(priority); |
| } |
| priorities.push_back(kSyncResponsePriority); |
| std::random_shuffle(priorities.begin(), priorities.end()); |
| |
| // Send messages at each priority level in random order. |
| for (const Priority &priority : priorities) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(priority), |
| priority)); |
| } |
| |
| // Receiver should get messages in descending order of priority. |
| Priority expected_priority = kSyncResponsePriority; |
| AnnotatedMessage received_msg; |
| while (this->message_bus_->ReceiveIfAvailable(receiver_id, &received_msg)) { |
| EXPECT_EQ(sender_id, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_msg.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_msg.tagged_message.message()); |
| EXPECT_EQ(expected_priority, payload); |
| this->message_bus_->DeleteMessage(receiver_id, received_msg); |
| if (expected_priority > 0) { |
| --expected_priority; |
| } else { |
| // Should be finished, now. |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_msg)); |
| } |
| } |
| EXPECT_EQ(0, expected_priority); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, TimeoutTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| |
| MessageStyle style; |
| |
| // Send a message that expires right away. |
| style.Timeout(std::chrono::high_resolution_clock::now()); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(0))); |
| |
| // Send a message that expires in 2000 seconds. |
| style.Timeout(std::chrono::high_resolution_clock::now() |
| + std::chrono::seconds(2000)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(1))); |
| |
| // Send a message that expires in 1000 seconds. |
| style.Timeout(std::chrono::high_resolution_clock::now() |
| + std::chrono::seconds(1000)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(2))); |
| |
| // Sleep briefly. |
| std::this_thread::sleep_for(std::chrono::seconds(2)); |
| |
| AnnotatedMessage received_message; |
| |
| // Should receive the message with the earlier deadline first. |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(2, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(1, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| // The expired message should never be received. |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, SendOrderTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| |
| MessageStyle style; |
| |
| // Send a few messages at default priority with no timeout. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(0))); |
| |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(1))); |
| |
| // Messages at higher priority. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(2), |
| kDefaultPriority + 1)); |
| |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(3), |
| kDefaultPriority + 1)); |
| |
| // Messages with a timeout. |
| MessageStyle timeout_style; |
| timeout_style.Timeout(std::chrono::high_resolution_clock::now() |
| + std::chrono::seconds(1000)); |
| |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| timeout_style, |
| MakeTaggedSimpleIntMessage(4))); |
| |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| timeout_style, |
| MakeTaggedSimpleIntMessage(5))); |
| |
| // Another one of each set of messages. |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(6))); |
| |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(7), |
| kDefaultPriority + 1)); |
| |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| timeout_style, |
| MakeTaggedSimpleIntMessage(8))); |
| |
| // Check that messages are received in expected order. Ordering is determined |
| // by priority, then deadline, then send time. |
| AnnotatedMessage received_message; |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(2, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(3, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(7, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(4, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(5, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(8, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(0, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(1, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| EXPECT_EQ(6, *static_cast<const int*>( |
| received_message.tagged_message.message())); |
| this->message_bus_->DeleteMessage(receiver_id, received_message); |
| |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_message)); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, BroadcastTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| // A receiver which gets every message. |
| client_id receiver_id_1 = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id_1, 0)); |
| |
| // A receiver which gets every even-numbered message. |
| client_id receiver_id_2 = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id_2, 0)); |
| |
| // A receiver which gets every third message. |
| client_id receiver_id_3 = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id_3, 0)); |
| |
| // A receiver which gets every fifth message. |
| client_id receiver_id_5 = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id_5, 0)); |
| |
| // Send out broadcast messages. |
| MessageStyle style; |
| style.Broadcast(true); |
| for (int message_num = 0; |
| message_num < kNumSampleMessages; |
| ++message_num) { |
| Address addr; |
| addr.AddRecipient(receiver_id_1); |
| if (message_num % 2 == 0) { |
| addr.AddRecipient(receiver_id_2); |
| } |
| if (message_num % 3 == 0) { |
| addr.AddRecipient(receiver_id_3); |
| } |
| if (message_num % 5 == 0) { |
| addr.AddRecipient(receiver_id_5); |
| } |
| |
| EXPECT_EQ( |
| MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(message_num))); |
| } |
| |
| AnnotatedMessage received_message; |
| |
| std::bitset<kNumSampleMessages> received_set_1; |
| for (int received_message_num = 0; |
| received_message_num < kNumSampleMessages; |
| ++received_message_num) { |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id_1, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_message.tagged_message.message()); |
| ASSERT_GE(payload, 0); |
| ASSERT_LT(payload, kNumSampleMessages); |
| received_set_1.set(payload, true); |
| |
| this->message_bus_->DeleteMessage(receiver_id_1, received_message); |
| } |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id_1, |
| &received_message)); |
| EXPECT_TRUE(received_set_1.all()); |
| |
| std::bitset<(kNumSampleMessages + 1) / 2> received_set_2; |
| for (int received_message_num = 0; |
| received_message_num < (kNumSampleMessages + 1) / 2; |
| ++received_message_num) { |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id_2, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_message.tagged_message.message()); |
| ASSERT_GE(payload, 0); |
| ASSERT_LT(payload, kNumSampleMessages); |
| ASSERT_EQ(0, payload % 2); |
| received_set_2.set(payload / 2, true); |
| |
| this->message_bus_->DeleteMessage(receiver_id_2, received_message); |
| } |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id_2, |
| &received_message)); |
| EXPECT_TRUE(received_set_2.all()); |
| |
| std::bitset<(kNumSampleMessages + 2) / 3> received_set_3; |
| for (int received_message_num = 0; |
| received_message_num < (kNumSampleMessages + 2) / 3; |
| ++received_message_num) { |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id_3, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_message.tagged_message.message()); |
| ASSERT_GE(payload, 0); |
| ASSERT_LT(payload, kNumSampleMessages); |
| ASSERT_EQ(0, payload % 3); |
| received_set_3.set(payload / 3, true); |
| |
| this->message_bus_->DeleteMessage(receiver_id_3, received_message); |
| } |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id_3, |
| &received_message)); |
| EXPECT_TRUE(received_set_3.all()); |
| |
| std::bitset<(kNumSampleMessages + 4) / 5> received_set_5; |
| for (int received_message_num = 0; |
| received_message_num < (kNumSampleMessages + 4) / 5; |
| ++received_message_num) { |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id_5, |
| &received_message)); |
| EXPECT_EQ(sender_id, received_message.sender); |
| EXPECT_EQ(0u, received_message.tagged_message.message_type()); |
| |
| ASSERT_EQ(sizeof(int), received_message.tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_message.tagged_message.message()); |
| ASSERT_GE(payload, 0); |
| ASSERT_LT(payload, kNumSampleMessages); |
| ASSERT_EQ(0, payload % 5); |
| received_set_5.set(payload / 5, true); |
| |
| this->message_bus_->DeleteMessage(receiver_id_5, received_message); |
| } |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id_5, |
| &received_message)); |
| EXPECT_TRUE(received_set_5.all()); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, CancelTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_a_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_a_id, 0)); |
| |
| client_id receiver_b_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_b_id, 0)); |
| |
| Address addr; |
| MessageStyle style; |
| |
| // Send 3 messages, then cancel the middle one. |
| addr.AddRecipient(receiver_a_id); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(0), |
| kDefaultPriority, |
| nullptr)); |
| CancellationToken token; |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(1), |
| kDefaultPriority, |
| &token)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(2), |
| kDefaultPriority, |
| nullptr)); |
| |
| this->message_bus_->CancelMessage(sender_id, token); |
| |
| // Receiver should not get the cancelled message. |
| std::vector<AnnotatedMessage> received_messages; |
| EXPECT_EQ(2u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages)); |
| ASSERT_EQ(2u, received_messages.size()); |
| |
| EXPECT_EQ(sender_id, received_messages.front().sender); |
| EXPECT_EQ(0u, received_messages.front().tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages.front().tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages.front().tagged_message.message()); |
| EXPECT_EQ(0, payload); |
| |
| EXPECT_EQ(sender_id, received_messages.back().sender); |
| EXPECT_EQ(0u, received_messages.back().tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages.back().tagged_message.message_bytes()); |
| payload = *static_cast<const int*>( |
| received_messages.back().tagged_message.message()); |
| EXPECT_EQ(2, payload); |
| |
| this->message_bus_->DeleteMessages(receiver_a_id, |
| received_messages.begin(), |
| received_messages.end()); |
| received_messages.clear(); |
| |
| // Send a broadcast message and have a receiver cancel it for its peer. |
| addr.AddRecipient(receiver_b_id); |
| style.Broadcast(true); |
| |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(3), |
| kDefaultPriority, |
| &token)); |
| EXPECT_EQ(1u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages)); |
| ASSERT_EQ(1u, received_messages.size()); |
| EXPECT_EQ(sender_id, received_messages.front().sender); |
| EXPECT_EQ(0u, received_messages.front().tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages.front().tagged_message.message_bytes()); |
| payload = *static_cast<const int*>( |
| received_messages.front().tagged_message.message()); |
| EXPECT_EQ(3, payload); |
| |
| this->message_bus_->CancelMessage(receiver_a_id, received_messages.front()); |
| |
| // Other receiver should not get the message. |
| AnnotatedMessage message_buf; |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_b_id, |
| &message_buf)); |
| // Cancelling a message also implicitly deletes it for a client which already |
| // received it. |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_a_id, |
| &message_buf)); |
| |
| // Attempting to cancel a non-cancellable message does nothing. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(4), |
| kDefaultPriority, |
| nullptr)); |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_a_id, |
| &message_buf)); |
| EXPECT_EQ(sender_id, message_buf.sender); |
| EXPECT_EQ(0u, message_buf.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), message_buf.tagged_message.message_bytes()); |
| payload = *static_cast<const int*>(message_buf.tagged_message.message()); |
| EXPECT_EQ(4, payload); |
| this->message_bus_->DeleteMessage(receiver_a_id, message_buf); |
| |
| this->message_bus_->CancelMessage(receiver_a_id, message_buf); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_b_id, |
| &message_buf)); |
| EXPECT_EQ(sender_id, message_buf.sender); |
| EXPECT_EQ(0u, message_buf.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), message_buf.tagged_message.message_bytes()); |
| payload = *static_cast<const int*>(message_buf.tagged_message.message()); |
| EXPECT_EQ(4, payload); |
| this->message_bus_->DeleteMessage(receiver_b_id, message_buf); |
| |
| // Try cancelling a message after it has already been received and deleted. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(5), |
| kDefaultPriority, |
| &token)); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_a_id, |
| &message_buf)); |
| EXPECT_EQ(sender_id, message_buf.sender); |
| EXPECT_EQ(0u, message_buf.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), message_buf.tagged_message.message_bytes()); |
| payload = *static_cast<const int*>(message_buf.tagged_message.message()); |
| EXPECT_EQ(5, payload); |
| this->message_bus_->DeleteMessage(receiver_a_id, message_buf); |
| |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_b_id, |
| &message_buf)); |
| EXPECT_EQ(sender_id, message_buf.sender); |
| EXPECT_EQ(0u, message_buf.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), message_buf.tagged_message.message_bytes()); |
| payload = *static_cast<const int*>(message_buf.tagged_message.message()); |
| EXPECT_EQ(5, payload); |
| this->message_bus_->DeleteMessage(receiver_b_id, message_buf); |
| |
| this->message_bus_->CancelMessage(sender_id, token); |
| |
| // Test batch-cancellation. Send 2 cancellable messages with a |
| // non-cancellable one in the middle. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(6), |
| kDefaultPriority, |
| &token)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(7), |
| kDefaultPriority, |
| nullptr)); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(8), |
| kDefaultPriority, |
| &token)); |
| |
| received_messages.clear(); |
| EXPECT_EQ(3u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages)); |
| this->message_bus_->CancelMessages(receiver_a_id, |
| received_messages.begin(), |
| received_messages.end()); |
| |
| // Other receiver should only get the non-cancellable message. |
| received_messages.clear(); |
| EXPECT_EQ(1u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_b_id, |
| &received_messages)); |
| ASSERT_EQ(1u, received_messages.size()); |
| EXPECT_EQ(sender_id, received_messages.front().sender); |
| EXPECT_EQ(0u, received_messages.front().tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages.front().tagged_message.message_bytes()); |
| payload = *static_cast<const int*>( |
| received_messages.front().tagged_message.message()); |
| EXPECT_EQ(7, payload); |
| } |
| |
| |
| TYPED_TEST_P(MessageBusTest, CountQueuedMessagesForClientTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| // Receiver's queue is initially empty. |
| EXPECT_EQ(0u, this->message_bus_->CountQueuedMessagesForClient(receiver_id)); |
| |
| // Now we will enqueue a few messages for the receiver. |
| MessageStyle style; |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| for (int i = 0; i < 3; ++i) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(i))); |
| } |
| |
| EXPECT_EQ(3u, this->message_bus_->CountQueuedMessagesForClient(receiver_id)); |
| |
| // Receive and delete a single message. |
| AnnotatedMessage received_msg; |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_msg, |
| 0, |
| true)); |
| // 2 messages remain on the queue: |
| EXPECT_EQ(2u, this->message_bus_->CountQueuedMessagesForClient(receiver_id)); |
| |
| // Add and immediately cancel a message at a higher priority than the |
| // existing messages. |
| CancellationToken token; |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42), |
| kDefaultPriority + 1, |
| &token)); |
| this->message_bus_->CancelMessage(sender_id, token); |
| |
| // Depending on the particular implementation of this message bus, a |
| // cancelled message may or may not count towards the number in the queue |
| // until a call to one of the Receive methods cleans it up. |
| EXPECT_TRUE( |
| this->message_bus_->CountQueuedMessagesForClient(receiver_id) == 2 |
| || this->message_bus_->CountQueuedMessagesForClient(receiver_id) == 3); |
| |
| // Receive one more message. The cancelled message will be cleaned from the |
| // queue no matter what at this point. |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_msg, |
| 0, |
| true)); |
| EXPECT_EQ(1u, this->message_bus_->CountQueuedMessagesForClient(receiver_id)); |
| |
| // Send a message that expires immediately. |
| style.Timeout(std::chrono::high_resolution_clock::now()); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(121))); |
| std::this_thread::sleep_for(std::chrono::seconds(1)); |
| |
| // Until we receive, an expired message will still count towards the queue's |
| // total. |
| EXPECT_EQ(2u, this->message_bus_->CountQueuedMessagesForClient(receiver_id)); |
| |
| // Receive one more message, which should clear out the queue (including the |
| // expired message). |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_msg, |
| 0, |
| true)); |
| EXPECT_EQ(0u, this->message_bus_->CountQueuedMessagesForClient(receiver_id)); |
| } |
| |
| |
| TYPED_TEST_P(SeparateDeletionMessageBusTest, DeleteTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_a_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_a_id, 0)); |
| |
| client_id receiver_b_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_b_id, 0)); |
| |
| // Enqueue several messages for a single receiver. |
| MessageStyle style; |
| Address addr; |
| addr.AddRecipient(receiver_a_id); |
| for (int i = 0; i < 16; ++i) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(i))); |
| } |
| |
| // Receive a batch of messages without immediately deleting them. |
| std::vector<AnnotatedMessage> received_messages; |
| EXPECT_EQ(16u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages, |
| 0, |
| 0, |
| false)); |
| |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(static_cast<int>(i), payload); |
| } |
| |
| // Receiving again without first deleting will give us the same messages. |
| received_messages.clear(); |
| EXPECT_EQ(16u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages, |
| 0, |
| 0, |
| false)); |
| |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(static_cast<int>(i), payload); |
| } |
| |
| // Delete only some of the messages. |
| this->message_bus_->DeleteMessages(receiver_a_id, |
| received_messages.begin(), |
| received_messages.begin() + 4); |
| |
| // Should receive non-deleted messages. |
| received_messages.clear(); |
| EXPECT_EQ(12u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages, |
| 0, |
| 0, |
| false)); |
| |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(static_cast<int>(i + 4), payload); |
| } |
| |
| // Delete some messages in an order other than that in which they were |
| // received. |
| ASSERT_EQ(12u, received_messages.size()); |
| this->message_bus_->DeleteMessage(receiver_a_id, received_messages[8]); |
| this->message_bus_->DeleteMessage(receiver_a_id, received_messages[4]); |
| this->message_bus_->DeleteMessage(receiver_a_id, received_messages[0]); |
| |
| // Double-deletion is a silent no-op. |
| this->message_bus_->DeleteMessage(receiver_a_id, received_messages[8]); |
| this->message_bus_->DeleteMessage(receiver_a_id, received_messages[4]); |
| this->message_bus_->DeleteMessage(receiver_a_id, received_messages[0]); |
| |
| received_messages.clear(); |
| EXPECT_EQ(9u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages, |
| 0, |
| 0, |
| false)); |
| |
| int expected = 5; |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(expected, payload); |
| |
| ++expected; |
| if ((expected == 8) || (expected == 12)) { |
| ++expected; |
| } |
| } |
| |
| // Send a new, higher-priority message. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(-1), |
| kDefaultPriority + 1)); |
| |
| // Delete the previous "front" message. |
| ASSERT_FALSE(received_messages.empty()); |
| this->message_bus_->DeleteMessage(receiver_a_id, received_messages[0]); |
| |
| // Receive messages, deleting them as they are received. |
| received_messages.clear(); |
| EXPECT_EQ(9u, |
| this->message_bus_->ReceiveBatchIfAvailable(receiver_a_id, |
| &received_messages, |
| 0, |
| 0, |
| true)); |
| |
| expected = -1; |
| for (std::size_t i = 0; i < received_messages.size(); ++i) { |
| EXPECT_EQ(sender_id, received_messages[i].sender); |
| EXPECT_EQ(0u, received_messages[i].tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_messages[i].tagged_message.message_bytes()); |
| int payload = *static_cast<const int*>( |
| received_messages[i].tagged_message.message()); |
| EXPECT_EQ(expected, payload); |
| |
| if (expected == -1) { |
| expected = 6; |
| } else { |
| ++expected; |
| if ((expected == 8) || (expected == 12)) { |
| ++expected; |
| } |
| } |
| } |
| |
| // Now all pending messages should be deleted. |
| AnnotatedMessage message_buffer; |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_a_id, |
| &message_buffer)); |
| |
| // Send a broadcast message to multiple receivers. |
| addr.AddRecipient(receiver_b_id); |
| style.Broadcast(true); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(42))); |
| |
| // Receive and delete from one client. |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_a_id, |
| &message_buffer)); |
| EXPECT_EQ(sender_id, message_buffer.sender); |
| EXPECT_EQ(0u, message_buffer.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), message_buffer.tagged_message.message_bytes()); |
| EXPECT_EQ(42, |
| *static_cast<const int*>(message_buffer.tagged_message.message())); |
| |
| this->message_bus_->DeleteMessage(receiver_a_id, message_buffer); |
| |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_a_id, |
| &message_buffer)); |
| |
| // Message should still be receivable by the other client. |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_b_id, |
| &message_buffer)); |
| EXPECT_EQ(sender_id, message_buffer.sender); |
| EXPECT_EQ(0u, message_buffer.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), message_buffer.tagged_message.message_bytes()); |
| EXPECT_EQ(42, |
| *static_cast<const int*>(message_buffer.tagged_message.message())); |
| } |
| |
| |
| TYPED_TEST_P(ReconnectMessageBusTest, ReconnectTest) { |
| // Connect a couple of clients and have them send messages to eachother. |
| client_id client_a = this->message_bus_->Connect(); |
| client_id client_b = this->message_bus_->Connect(); |
| |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(client_a, 0)); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(client_a, 1)); |
| |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(client_b, 1)); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(client_b, 0)); |
| |
| Address addr_a; |
| addr_a.AddRecipient(client_a); |
| |
| Address addr_b; |
| addr_b.AddRecipient(client_b); |
| |
| MessageStyle style; |
| |
| for (int i = 0; i < kNumSampleMessages; ++i) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(client_a, |
| addr_b, |
| style, |
| MakeTaggedSimpleIntMessage(i, 0))); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(client_b, |
| addr_a, |
| style, |
| MakeTaggedSimpleIntMessage(-i, 1))); |
| } |
| |
| // Receive and delete the first message for each client before disconnecting. |
| AnnotatedMessage received; |
| received = this->message_bus_->Receive(client_a); |
| EXPECT_EQ(client_b, received.sender); |
| EXPECT_EQ(1u, received.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received.tagged_message.message_bytes()); |
| EXPECT_EQ(0, *static_cast<const int*>(received.tagged_message.message())); |
| this->message_bus_->DeleteMessage(client_a, received); |
| |
| received = this->message_bus_->Receive(client_b); |
| EXPECT_EQ(client_a, received.sender); |
| EXPECT_EQ(0u, received.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received.tagged_message.message_bytes()); |
| EXPECT_EQ(0, *static_cast<const int*>(received.tagged_message.message())); |
| this->message_bus_->DeleteMessage(client_b, received); |
| |
| // Destroy the MessageBus object and "reconnect" to the same persistent |
| // instance. |
| this->message_bus_.reset(nullptr); |
| this->message_bus_.reset(this->CreateBus()); |
| ASSERT_TRUE(this->message_bus_->Initialize()); |
| |
| // Connect an additional client and make sure that client_ids don't collide. |
| client_id client_c = this->message_bus_->Connect(); |
| EXPECT_NE(client_a, client_c); |
| EXPECT_NE(client_b, client_c); |
| |
| // Attempting to re-register the clients for the same message types should |
| // fail. |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsSender(client_a, 0)); |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsReceiver(client_a, 1)); |
| |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsSender(client_b, 1)); |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsReceiver(client_b, 0)); |
| |
| // Send one more message from each client. |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send( |
| client_a, |
| addr_b, |
| style, |
| MakeTaggedSimpleIntMessage(kNumSampleMessages, 0))); |
| EXPECT_EQ( |
| MessageBus::SendStatus::kOK, |
| this->message_bus_->Send( |
| client_b, |
| addr_a, |
| style, |
| MakeTaggedSimpleIntMessage(-static_cast<int>(kNumSampleMessages), |
| 1))); |
| |
| // Receive and delete another message for each client. |
| received = this->message_bus_->Receive(client_a); |
| EXPECT_EQ(client_b, received.sender); |
| EXPECT_EQ(1u, received.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received.tagged_message.message_bytes()); |
| EXPECT_EQ(-1, *static_cast<const int*>(received.tagged_message.message())); |
| this->message_bus_->DeleteMessage(client_a, received); |
| |
| received = this->message_bus_->Receive(client_b); |
| EXPECT_EQ(client_a, received.sender); |
| EXPECT_EQ(0u, received.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received.tagged_message.message_bytes()); |
| EXPECT_EQ(1, *static_cast<const int*>(received.tagged_message.message())); |
| this->message_bus_->DeleteMessage(client_b, received); |
| |
| // Disconnect and reconnect once more. |
| this->message_bus_.reset(nullptr); |
| this->message_bus_.reset(this->CreateBus()); |
| ASSERT_TRUE(this->message_bus_->Initialize()); |
| |
| // Now receive all the previously sent messages. |
| for (int i = 2; i <= kNumSampleMessages; ++i) { |
| received = this->message_bus_->Receive(client_a); |
| EXPECT_EQ(client_b, received.sender); |
| EXPECT_EQ(1u, received.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received.tagged_message.message_bytes()); |
| EXPECT_EQ(-i, *static_cast<const int*>(received.tagged_message.message())); |
| this->message_bus_->DeleteMessage(client_a, received); |
| |
| received = this->message_bus_->Receive(client_b); |
| EXPECT_EQ(client_a, received.sender); |
| EXPECT_EQ(0u, received.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received.tagged_message.message_bytes()); |
| EXPECT_EQ(i, *static_cast<const int*>(received.tagged_message.message())); |
| this->message_bus_->DeleteMessage(client_b, received); |
| } |
| |
| // No more messages for either client remain. |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(client_a, &received)); |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(client_b, &received)); |
| |
| // Send a cancellable broadcast message. |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(client_c, 0)); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(client_a, 0)); |
| |
| Address broadcast_addr; |
| broadcast_addr.AddRecipient(client_a).AddRecipient(client_b); |
| |
| MessageStyle broadcast_style; |
| broadcast_style.Broadcast(true); |
| |
| CancellationToken cancellation_token; |
| |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(client_c, |
| broadcast_addr, |
| broadcast_style, |
| MakeTaggedSimpleIntMessage(42, 0), |
| kDefaultPriority, |
| &cancellation_token)); |
| |
| // Disconnect one of the clients. |
| EXPECT_TRUE(this->message_bus_->Disconnect(client_c)); |
| |
| // Reconnect. |
| this->message_bus_.reset(nullptr); |
| this->message_bus_.reset(this->CreateBus()); |
| ASSERT_TRUE(this->message_bus_->Initialize()); |
| |
| // Shouldn't be able to do anything with the disconnected client. |
| EXPECT_FALSE(this->message_bus_->RegisterClientAsSender(client_c, 0)); |
| |
| // Receive and cancel the broadcast message from one of the clients. |
| received = this->message_bus_->Receive(client_a); |
| EXPECT_EQ(client_c, received.sender); |
| EXPECT_EQ(0u, received.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), received.tagged_message.message_bytes()); |
| EXPECT_EQ(42, *static_cast<const int*>(received.tagged_message.message())); |
| this->message_bus_->CancelMessage(client_a, received); |
| |
| // Reconnect. |
| this->message_bus_.reset(nullptr); |
| this->message_bus_.reset(this->CreateBus()); |
| ASSERT_TRUE(this->message_bus_->Initialize()); |
| |
| // Neither client should be able to receive cancelled message. |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(client_a, &received)); |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(client_b, &received)); |
| } |
| |
| |
| TYPED_TEST_P(ReconnectActiveStateMessageBusTest, ReconnectAndCancelTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_a_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_a_id, 0)); |
| |
| client_id receiver_b_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_b_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_a_id).AddRecipient(receiver_b_id); |
| |
| MessageStyle style; |
| style.Broadcast(true); |
| |
| CancellationToken token_0; |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(0), |
| kDefaultPriority, |
| &token_0)); |
| |
| CancellationToken token_1; |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(1), |
| kDefaultPriority, |
| &token_1)); |
| |
| // Destroy the MessageBus object and "reconnect" to the same persistent |
| // instance. |
| this->message_bus_.reset(nullptr); |
| this->message_bus_.reset(this->CreateBus()); |
| ASSERT_TRUE(this->message_bus_->Initialize()); |
| |
| // Cancel the first message on the sender side using the reconnected bus. |
| this->message_bus_->CancelMessage(sender_id, token_0); |
| |
| // Receive a message. |
| AnnotatedMessage received_msg = this->message_bus_->Receive(receiver_a_id); |
| EXPECT_EQ(sender_id, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_msg.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_msg.tagged_message.message()); |
| EXPECT_EQ(1, payload); |
| |
| // Destroy and reconnect again. |
| this->message_bus_.reset(nullptr); |
| this->message_bus_.reset(this->CreateBus()); |
| ASSERT_TRUE(this->message_bus_->Initialize()); |
| |
| // Cancel the message on the receiver side using the reconnected bus. |
| this->message_bus_->CancelMessage(receiver_a_id, received_msg); |
| |
| // Both messages are cancelled, so there is nothing left to receive. |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_b_id, |
| &received_msg)); |
| } |
| |
| |
| TYPED_TEST_P(ReconnectSeparateDeletionMessageBusTest, |
| ReconnectAndDeleteTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| client_id receiver_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsReceiver(receiver_id, 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_id); |
| MessageStyle style; |
| |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(0))); |
| |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(1))); |
| |
| // Receive the first message. |
| AnnotatedMessage received_msg = this->message_bus_->Receive(receiver_id); |
| EXPECT_EQ(sender_id, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_msg.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_msg.tagged_message.message()); |
| EXPECT_EQ(0, payload); |
| |
| // Destroy the MessageBus object and "reconnect" to the same persistent |
| // instance. |
| this->message_bus_.reset(nullptr); |
| this->message_bus_.reset(this->CreateBus()); |
| ASSERT_TRUE(this->message_bus_->Initialize()); |
| |
| // Delete using the reconnected bus instance. |
| this->message_bus_->DeleteMessage(receiver_id, received_msg); |
| |
| // Should now receive the next message. |
| EXPECT_TRUE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_msg)); |
| EXPECT_EQ(sender_id, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_msg.tagged_message.message_bytes()); |
| payload = *static_cast<const int*>(received_msg.tagged_message.message()); |
| EXPECT_EQ(1, payload); |
| |
| // After deleting the second message, there is nothing left to receive. |
| this->message_bus_->DeleteMessage(receiver_id, received_msg); |
| |
| EXPECT_FALSE(this->message_bus_->ReceiveIfAvailable(receiver_id, |
| &received_msg)); |
| } |
| |
| |
| // Simulate a client-server interaction for MultiInstanceTest, with a client |
| // sending ints to the server and the server returning their squares. |
| class ClientThread : public Thread { |
| public: |
| ClientThread(MessageBus *message_bus, |
| const int num_messages, |
| const client_id server_id) |
| : message_bus_ptr_(message_bus), |
| num_messages_(num_messages), |
| server_(server_id) { |
| me_ = message_bus_ptr_->Connect(); |
| EXPECT_TRUE(message_bus_ptr_->RegisterClientAsSender(me_, 0)); |
| EXPECT_TRUE(message_bus_ptr_->RegisterClientAsReceiver(me_, 1)); |
| } |
| |
| protected: |
| void Run() { |
| Address addr; |
| addr.AddRecipient(server_); |
| MessageStyle style; |
| for (int i = 0; i < num_messages_; ++i) { |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| message_bus_ptr_->Send(me_, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(i, 0))); |
| |
| AnnotatedMessage response = message_bus_ptr_->Receive(me_); |
| EXPECT_EQ(server_, response.sender); |
| EXPECT_EQ(1u, response.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), response.tagged_message.message_bytes()); |
| EXPECT_EQ(i * i, |
| *static_cast<const int*>(response.tagged_message.message())); |
| message_bus_ptr_->DeleteMessage(me_, response); |
| } |
| } |
| |
| private: |
| MessageBus *message_bus_ptr_; |
| const int num_messages_; |
| client_id me_; |
| client_id server_; |
| }; |
| |
| class ServerThread : public Thread { |
| public: |
| ServerThread(MessageBus *message_bus, const int num_messages) |
| : message_bus_ptr_(message_bus), |
| num_messages_(num_messages) { |
| me_ = message_bus_ptr_->Connect(); |
| EXPECT_TRUE(message_bus_ptr_->RegisterClientAsSender(me_, 1)); |
| EXPECT_TRUE(message_bus_ptr_->RegisterClientAsReceiver(me_, 0)); |
| } |
| |
| client_id GetID() const { |
| return me_; |
| } |
| |
| protected: |
| void Run() { |
| MessageStyle style; |
| for (int i = 0; i < num_messages_; ++i) { |
| AnnotatedMessage request = message_bus_ptr_->Receive(me_); |
| EXPECT_EQ(0u, request.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), request.tagged_message.message_bytes()); |
| int request_param |
| = *static_cast<const int*>(request.tagged_message.message()); |
| |
| Address addr; |
| addr.AddRecipient(request.sender); |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| message_bus_ptr_->Send( |
| me_, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(request_param * request_param, |
| 1))); |
| |
| message_bus_ptr_->DeleteMessage(me_, request); |
| } |
| } |
| |
| private: |
| MessageBus *message_bus_ptr_; |
| const int num_messages_; |
| client_id me_; |
| }; |
| |
| TYPED_TEST_P(MultiInstanceMessageBusTest, MultiInstanceTest) { |
| std::unique_ptr<MessageBus> server_bus_instance(this->CreateBus()); |
| ASSERT_TRUE(server_bus_instance->Initialize()); |
| ServerThread server(server_bus_instance.get(), 16); |
| server.Start(); |
| |
| ClientThread client(this->message_bus_.get(), 16, server.GetID()); |
| client.Start(); |
| |
| server.Join(); |
| client.Join(); |
| } |
| |
| |
| TYPED_TEST_P(MultiInstanceMessageBusTest, MultiInstanceCancelTest) { |
| client_id sender_id = this->message_bus_->Connect(); |
| EXPECT_TRUE(this->message_bus_->RegisterClientAsSender(sender_id, 0)); |
| |
| std::unique_ptr<MessageBus> receiver_a_bus_instance(this->CreateBus()); |
| ASSERT_TRUE(receiver_a_bus_instance->Initialize()); |
| client_id receiver_a_id = receiver_a_bus_instance->Connect(); |
| EXPECT_TRUE(receiver_a_bus_instance->RegisterClientAsReceiver(receiver_a_id, |
| 0)); |
| |
| std::unique_ptr<MessageBus> receiver_b_bus_instance(this->CreateBus()); |
| ASSERT_TRUE(receiver_b_bus_instance->Initialize()); |
| client_id receiver_b_id = receiver_b_bus_instance->Connect(); |
| EXPECT_TRUE(receiver_b_bus_instance->RegisterClientAsReceiver(receiver_b_id, |
| 0)); |
| |
| Address addr; |
| addr.AddRecipient(receiver_a_id).AddRecipient(receiver_b_id); |
| |
| MessageStyle style; |
| style.Broadcast(true); |
| |
| CancellationToken token_0; |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(0), |
| kDefaultPriority, |
| &token_0)); |
| |
| CancellationToken token_1; |
| EXPECT_EQ(MessageBus::SendStatus::kOK, |
| this->message_bus_->Send(sender_id, |
| addr, |
| style, |
| MakeTaggedSimpleIntMessage(1), |
| kDefaultPriority, |
| &token_1)); |
| |
| // Cancel the first message on the sender side. |
| this->message_bus_->CancelMessage(sender_id, token_0); |
| |
| // Receive a message. |
| AnnotatedMessage received_msg = receiver_a_bus_instance->Receive( |
| receiver_a_id); |
| EXPECT_EQ(sender_id, received_msg.sender); |
| EXPECT_EQ(0u, received_msg.tagged_message.message_type()); |
| ASSERT_EQ(sizeof(int), |
| received_msg.tagged_message.message_bytes()); |
| int payload |
| = *static_cast<const int*>(received_msg.tagged_message.message()); |
| EXPECT_EQ(1, payload); |
| |
| // Cancel the message on the receiver side. |
| receiver_a_bus_instance->CancelMessage(receiver_a_id, received_msg); |
| |
| // Both messages are cancelled, so there is nothing left to receive. |
| EXPECT_FALSE(receiver_b_bus_instance->ReceiveIfAvailable(receiver_b_id, |
| &received_msg)); |
| } |
| |
| |
| REGISTER_TYPED_TEST_CASE_P(MessageBusTest, |
| ConnectTest, ThreadedConnectTest, |
| RegisterAndSendTest, SimpleSendAndReceiveTest, |
| LargeMessageSimpleSendAndReceiveTest, |
| InterleavedBlockingSimpleSendAndReceiveTest, |
| ThreadedSimpleSendAndReceiveTest, |
| ThreadedMultipleReceiversTest, |
| ThreadedMultipleSendersTest, ReceiveBatchTest, |
| PriorityTest, TimeoutTest, SendOrderTest, |
| BroadcastTest, CancelTest, |
| CountQueuedMessagesForClientTest); |
| |
| REGISTER_TYPED_TEST_CASE_P(SeparateDeletionMessageBusTest, |
| DeleteTest); |
| |
| REGISTER_TYPED_TEST_CASE_P(ReconnectMessageBusTest, |
| ReconnectTest); |
| |
| REGISTER_TYPED_TEST_CASE_P(ReconnectActiveStateMessageBusTest, |
| ReconnectAndCancelTest); |
| |
| REGISTER_TYPED_TEST_CASE_P(ReconnectSeparateDeletionMessageBusTest, |
| ReconnectAndDeleteTest); |
| |
| REGISTER_TYPED_TEST_CASE_P(MultiInstanceMessageBusTest, |
| MultiInstanceTest, |
| MultiInstanceCancelTest); |
| |
| } // namespace tmb |
| |
| #endif // TESTS_MESSAGE_BUS_UNITTEST_COMMON_H_ |