blob: b092dfafea03c5bc1a68e4c0927c48110e10c176 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "cli/InputParserUtil.hpp"
#include <cstddef>
#include <iostream>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogConfig.h"
#include "storage/StorageConfig.h"
#include "utility/StringUtil.hpp"
#include "glog/logging.h"
#ifdef QUICKSTEP_HAVE_LIBNUMA
#include <numa.h>
#endif
using std::string;
namespace quickstep {
std::vector<int> InputParserUtil::ParseWorkerAffinities(
const int num_workers,
const string &affinity_string) {
std::vector<int> affinities;
bool switch_to_default_affinities = false;
if (affinity_string.empty()) {
switch_to_default_affinities = true;
LOG(INFO) << "Empty worker affinities provided, switching to default "
"worker affinities";
} else if (!ParseIntString(affinity_string, ',', &affinities)) {
switch_to_default_affinities = true;
LOG(INFO) << "Invalid worker affinities provided, switching to default "
"affinities";
}
for (const int affinity : affinities) {
if (affinity < -1) {
switch_to_default_affinities = true;
LOG(INFO) << "CPU affinities specified by --worker_affinities must be "
"non-negative, or -1 to specify no affinity. Switching to "
"default worker affinities";
break;
}
}
if (switch_to_default_affinities) {
// Set default affinities.
// If the number of worker threads is less than the maximum parallelism on
// the box, we try to balance workers on all sockets. The intention is to
// balance the memory bandwidth usage across all sockets. This may however
// hurt the performance (due to poor data locality) when the machine has
// many sockets and data is not partitioned.
#ifdef QUICKSTEP_HAVE_LIBNUMA
// This code is inspired from the print_node_cpus() function of numactl.
// WARNING - If some NUMA sockets are disabled, we can't detect it.
const int num_sockets = numa_num_configured_nodes();
CHECK_GT(num_sockets, 0);
// A vector V where V[i] denotes a vector of CPU cores that belong to the
// socket i.
std::vector<std::vector<int>> cpus_from_sockets;
cpus_from_sockets.resize(num_sockets);
for (int curr_socket = 0; curr_socket < num_sockets; ++curr_socket) {
std::unique_ptr<struct bitmask> cpus(numa_allocate_cpumask());
const int err = numa_node_to_cpus(curr_socket, cpus.get());
if (err >= 0) {
for (int i = 0; i < static_cast<int>(cpus->size); i++) {
if (numa_bitmask_isbitset(cpus.get(), i)) {
// The current CPU belongs to curr_socket.
cpus_from_sockets[curr_socket].push_back(i);
}
}
}
}
// Now assign affinity to each worker, picking one CPU from each socket in a
// round robin manner.
int curr_socket = 0;
std::size_t iteration = 0;
for (int curr_worker = 0; curr_worker < num_workers; ++curr_worker) {
if (iteration < cpus_from_sockets[curr_socket].size()) {
const int curr_worker_affinity =
cpus_from_sockets[curr_socket][iteration];
affinities.push_back(curr_worker_affinity);
}
// Increase iteration number only when we are at the last socket.
iteration = iteration + ((curr_socket + 1) / num_sockets);
curr_socket = (curr_socket + 1) % num_sockets;
}
#endif
}
if (affinities.size() < static_cast<std::size_t>(num_workers)) {
std::cout << "--num_workers is " << num_workers << ", but only "
<< "specified " << affinities.size() << " CPU affinities "
<< "with --worker_affinities. "
<< (num_workers - affinities.size()) << " workers will be "
<< "unaffinitized.\n";
affinities.resize(num_workers, -1);
} else if (affinities.size() > static_cast<std::size_t>(num_workers)) {
std::cout << "--num_workers is " << num_workers << ", but specified "
<< affinities.size() << " CPU affinities with "
<< "--worker_affinities. Extra affinities will be ignored.\n";
affinities.resize(num_workers);
}
return affinities;
}
std::vector<int> InputParserUtil::GetNUMANodesForCPUs() {
std::vector<int> numa_nodes_of_cpus;
#ifdef QUICKSTEP_HAVE_LIBNUMA
const int num_cpus = numa_num_configured_cpus();
numa_nodes_of_cpus.reserve(num_cpus);
for (int curr_cpu = 0; curr_cpu < num_cpus; ++curr_cpu) {
numa_nodes_of_cpus.push_back(numa_node_of_cpu(curr_cpu));
}
#endif
return numa_nodes_of_cpus;
}
} // namespace quickstep