blob: 4d228b481af7ca0e8c4f4e64826def7c675ddef7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <chrono> // NOLINT(build/c++11)
#include <cstddef>
#include <iostream>
#include <memory>
#include <thread> // NOLINT(build/c++11)
#include <utility>
#include <vector>
#include "gflags/gflags.h"
#include "tmb/address.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/message_style.h"
#include "tmb/priority.h"
#include "tmb/tagged_message.h"
#include "tmbbench/bus_setup.h"
#include "tmbbench/messages.h"
#include "tmbbench/receiver_thread.h"
#include "tmbbench/sender_thread.h"
// Command-line flags.
DEFINE_uint64(test_duration, 20,
"Test duration in seconds. May run for longer if receiver "
"threads have trouble keeping up with send throughput.");
DEFINE_uint64(sender_threads, 1,
"Number of sender threads to use. Must be at least 1.");
DEFINE_uint64(receiver_threads, 1,
"Number of receiver threads to use. Must be at least 1.");
DEFINE_uint64(num_hw_threads, 0,
"Number of hardware threads (vCPUs) available to the OS.");
DEFINE_uint64(num_cpu_sockets, 0,
"Number of physical CPU sockets.");
DEFINE_string(affinity_mode, "",
"NUMA affinity mode. Options are none, thread, or full. none "
"means that threads will run on any CPU core and send messages "
"to any other thread. thread means that threads will be "
"affinitized so that they only run on one socket, but will "
"send messages to any other thread (including those on other "
"sockets). full means that threads will be affinitized to a "
"single socket and only communicate with their peers on the "
"same socket.");
DEFINE_uint64(message_bytes, 8,
"Size of messages exchanged via the TMB in bytes. Must be at "
"least 8.");
DEFINE_bool(delete_messages_immediately, false,
"Whether to delete messages immediately as they are received, or "
"to delete separately afterwards.");
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
if (argc != 1) {
std::cerr << "Unrecognized command-line arguments.\n";
return 1;
}
if (FLAGS_message_bytes < 8) {
std::cerr << "message_bytes must be at least 8.\n";
return 1;
}
if (FLAGS_num_hw_threads == 0) {
std::cerr << "num_hw_threads must be at least 1.\n";
return 1;
}
if (FLAGS_num_cpu_sockets == 0) {
std::cerr << "num_cpu_sockets must be at least 1.\n";
return 1;
}
if (FLAGS_num_hw_threads % FLAGS_num_cpu_sockets != 0) {
std::cerr << "WARNING: num_cpu_sockets does not evenly divide "
<< "num_hw_threads.\n";
}
std::vector<std::vector<int>> affinity_sets(FLAGS_num_cpu_sockets);
if (FLAGS_affinity_mode == "none") {
// Do nothing.
} else if ((FLAGS_affinity_mode == "thread")
|| (FLAGS_affinity_mode == "full")) {
std::size_t threads_per_socket = FLAGS_num_hw_threads
/ FLAGS_num_cpu_sockets;
for (std::size_t thread_idx = 0;
thread_idx < threads_per_socket;
++thread_idx) {
for (std::size_t socket_idx = 0;
socket_idx < FLAGS_num_cpu_sockets;
++socket_idx) {
affinity_sets[socket_idx].push_back(
socket_idx + thread_idx * FLAGS_num_cpu_sockets);
}
}
} else {
std::cerr << "affinity_mode must be one either none, thread, or full.\n";
return 1;
}
std::unique_ptr<tmb::MessageBus> message_bus(
tmbbench::SetupBusAllInOne(FLAGS_delete_messages_immediately));
if (!message_bus) {
return 1;
}
tmb::client_id control_thread_id = message_bus->Connect();
message_bus->RegisterClientAsSender(control_thread_id, 1);
std::vector<std::unique_ptr<tmbbench::SenderThread>> sender_threads;
std::vector<std::vector<std::unique_ptr<tmbbench::ReceiverThreadBase>>>
receiver_threads;
if (FLAGS_affinity_mode == "none") {
receiver_threads.resize(1);
for (std::size_t receiver_idx = 0;
receiver_idx < FLAGS_receiver_threads;
++receiver_idx) {
if (FLAGS_delete_messages_immediately) {
receiver_threads[0].emplace_back(
new tmbbench::ReceiverThread<true>(message_bus.get(),
affinity_sets[0]));
} else {
receiver_threads[0].emplace_back(
new tmbbench::ReceiverThread<false>(message_bus.get(),
affinity_sets[0]));
}
}
for (std::size_t sender_idx = 0;
sender_idx < FLAGS_sender_threads;
++sender_idx) {
sender_threads.emplace_back(new tmbbench::SenderThread(
message_bus.get(),
FLAGS_message_bytes,
affinity_sets[0]));
for (const auto &receiver_thread : receiver_threads[0]) {
sender_threads.back()->AddReceiverID(receiver_thread->GetClientID());
}
}
} else if (FLAGS_affinity_mode == "thread") {
receiver_threads.resize(1);
for (std::size_t receiver_idx = 0;
receiver_idx < FLAGS_receiver_threads;
++receiver_idx) {
const std::vector<int> &socket_affinity
= affinity_sets[receiver_idx % affinity_sets.size()];
if (FLAGS_delete_messages_immediately) {
receiver_threads[0].emplace_back(
new tmbbench::ReceiverThread<true>(message_bus.get(),
socket_affinity));
} else {
receiver_threads[0].emplace_back(
new tmbbench::ReceiverThread<false>(message_bus.get(),
socket_affinity));
}
}
for (std::size_t sender_idx = 0;
sender_idx < FLAGS_sender_threads;
++sender_idx) {
const std::vector<int> &socket_affinity
= affinity_sets[sender_idx % affinity_sets.size()];
sender_threads.emplace_back(new tmbbench::SenderThread(
message_bus.get(),
FLAGS_message_bytes,
socket_affinity));
for (const auto &receiver_thread : receiver_threads[0]) {
sender_threads.back()->AddReceiverID(receiver_thread->GetClientID());
}
}
} else {
receiver_threads.resize(FLAGS_num_cpu_sockets);
for (std::size_t receiver_idx = 0;
receiver_idx < FLAGS_receiver_threads;
++receiver_idx) {
std::size_t socket_num = receiver_idx % FLAGS_num_cpu_sockets;
if (FLAGS_delete_messages_immediately) {
receiver_threads[socket_num].emplace_back(
new tmbbench::ReceiverThread<true>(message_bus.get(),
affinity_sets[socket_num]));
} else {
receiver_threads[socket_num].emplace_back(
new tmbbench::ReceiverThread<false>(message_bus.get(),
affinity_sets[socket_num]));
}
}
for (std::size_t sender_idx = 0;
sender_idx < FLAGS_sender_threads;
++sender_idx) {
const std::vector<int> &socket_affinity
= affinity_sets[sender_idx % affinity_sets.size()];
sender_threads.emplace_back(new tmbbench::SenderThread(
message_bus.get(),
FLAGS_message_bytes,
socket_affinity));
for (const auto &receiver_thread
: receiver_threads[sender_idx % receiver_threads.size()]) {
sender_threads.back()->AddReceiverID(receiver_thread->GetClientID());
}
}
}
for (const auto &sender_thread : sender_threads) {
sender_thread->Start();
}
for (const auto &receiver_thread_set : receiver_threads) {
for (const auto &receiver_thread : receiver_thread_set) {
receiver_thread->Start();
}
}
std::this_thread::sleep_for(std::chrono::seconds(FLAGS_test_duration));
tmb::MessageStyle style;
for (const auto &sender_thread : sender_threads) {
tmb::Address address;
address.AddRecipient(sender_thread->GetClientID());
tmb::TaggedMessage message(&tmbbench::kPoisonMessage,
sizeof(tmbbench::kPoisonMessage),
1);
message_bus->Send(control_thread_id, address, style, std::move(message));
}
for (const auto &receiver_thread_set : receiver_threads) {
for (const auto &receiver_thread : receiver_thread_set) {
tmb::Address address;
address.AddRecipient(receiver_thread->GetClientID());
tmb::TaggedMessage message(&tmbbench::kPoisonMessage,
sizeof(tmbbench::kPoisonMessage),
1);
message_bus->Send(control_thread_id,
address,
style,
std::move(message),
tmb::kDefaultPriority + 1);
}
}
for (const auto &sender_thread : sender_threads) {
sender_thread->Join();
}
for (const auto &receiver_thread_set : receiver_threads) {
for (const auto &receiver_thread : receiver_thread_set) {
receiver_thread->Join();
}
}
double total_send_throughput = 0;
for (const auto &sender_thread : sender_threads) {
total_send_throughput += sender_thread->GetThroughput();
}
double total_receive_throughput = 0;
for (const auto &receiver_thread_set : receiver_threads) {
for (const auto &receiver_thread : receiver_thread_set) {
total_receive_throughput += receiver_thread->GetThroughput();
}
}
std::cout << "Send throughput: " << total_send_throughput
<< " messages/s\n";
std::cout << "Receive throughput: " << total_receive_throughput
<< " messages/s\n";
return 0;
}