Moved some Cli flags into a common file.
diff --git a/cli/Flags.cpp b/cli/Flags.cpp
index 1e3ce1d..74915ae 100644
--- a/cli/Flags.cpp
+++ b/cli/Flags.cpp
@@ -58,6 +58,13 @@
static const volatile bool num_workers_dummy
= gflags::RegisterFlagValidator(&FLAGS_num_workers, &ValidateNumWorkers);
+DEFINE_string(worker_affinities, "",
+ "A comma-separated list of CPU IDs to pin worker threads to "
+ "(leaving this empty will cause all worker threads to inherit "
+ "the affinity mask of the Quickstep process, which typically "
+ "means that they will all be runable on any CPU according to "
+ "the kernel's own scheduling policy).");
+
static bool ValidateStoragePath(const char *flagname,
const std::string &value) {
if (!value.empty() && value.back() != kPathSeparator) {
@@ -71,4 +78,9 @@
static const volatile bool storage_path_dummy
= gflags::RegisterFlagValidator(&FLAGS_storage_path, &ValidateStoragePath);
+DEFINE_bool(preload_buffer_pool, false,
+ "If true, pre-load all known blocks into buffer pool before "
+ "accepting queries (should also set --buffer_pool_slots to be "
+ "large enough to accomodate the entire database).");
+
} // namespace quickstep
diff --git a/cli/Flags.hpp b/cli/Flags.hpp
index 70aee98..a268e39 100644
--- a/cli/Flags.hpp
+++ b/cli/Flags.hpp
@@ -37,8 +37,12 @@
DECLARE_int32(num_workers);
+DECLARE_string(worker_affinities);
+
DECLARE_string(storage_path);
+DECLARE_bool(preload_buffer_pool);
+
/** @} */
} // namespace quickstep
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index e45605c..b092dfa 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -51,9 +51,9 @@
LOG(INFO) << "Empty worker affinities provided, switching to default "
"worker affinities";
} else if (!ParseIntString(affinity_string, ',', &affinities)) {
- switch_to_default_affinities = true;
- LOG(INFO) << "Invalid worker affinities provided, switching to default "
- "affinities";
+ switch_to_default_affinities = true;
+ LOG(INFO) << "Invalid worker affinities provided, switching to default "
+ "affinities";
}
for (const int affinity : affinities) {
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index eddee8c..26cb154 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -119,16 +119,6 @@
namespace quickstep {
-DEFINE_bool(preload_buffer_pool, false,
- "If true, pre-load all known blocks into buffer pool before "
- "accepting queries (should also set --buffer_pool_slots to be "
- "large enough to accomodate the entire database).");
-DEFINE_string(worker_affinities, "",
- "A comma-separated list of CPU IDs to pin worker threads to "
- "(leaving this empty will cause all worker threads to inherit "
- "the affinity mask of the Quickstep process, which typically "
- "means that they will all be runable on any CPU according to "
- "the kernel's own scheduling policy).");
DEFINE_bool(print_query, false,
"Print each input query statement. This is useful when running a "
"large number of queries in a batch.");
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 2db27e5..6a31895 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -56,6 +56,7 @@
glog
quickstep_catalog_CatalogTypedefs
quickstep_cli_Flags
+ quickstep_cli_InputParserUtil
quickstep_cli_distributed_Role
quickstep_queryexecution_BlockLocatorUtil
quickstep_queryexecution_QueryExecutionTypedefs
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index a95ed41..1415e99 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -25,6 +25,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "cli/Flags.hpp"
+#include "cli/InputParserUtil.hpp"
#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/Shiftboss.hpp"
@@ -56,13 +57,19 @@
bus_.RegisterClientAsSender(executor_client_id_, kBlockDomainRegistrationMessage);
bus_.RegisterClientAsReceiver(executor_client_id_, kBlockDomainRegistrationResponseMessage);
+ // Parse the CPU affinities for workers and the preloader thread, if enabled
+ // to warm up the buffer pool.
+ const vector<int> worker_cpu_affinities =
+ InputParserUtil::ParseWorkerAffinities(FLAGS_num_workers, FLAGS_worker_affinities);
+
+ const vector<numa_node_id> worker_numa_nodes(FLAGS_num_workers, kAnyNUMANodeID);
vector<client_id> worker_client_ids;
- vector<numa_node_id> worker_numa_nodes(FLAGS_num_workers, kAnyNUMANodeID);
for (std::size_t worker_thread_index = 0;
worker_thread_index < FLAGS_num_workers;
++worker_thread_index) {
- workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_));
+ workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_,
+ worker_cpu_affinities[worker_thread_index]));
worker_client_ids.push_back(workers_.back()->getBusClientID());
}
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 857d9aa..9f246ed 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -88,6 +88,7 @@
if (admitted_queries_.empty()) {
if (waiting_queries_.empty()) {
return;
+ }
// Admit the earliest waiting query.
QueryHandle *new_query = waiting_queries_.front();
diff --git a/storage/PreloaderThread.hpp b/storage/PreloaderThread.hpp
index aaea790..d9853b8 100644
--- a/storage/PreloaderThread.hpp
+++ b/storage/PreloaderThread.hpp
@@ -58,7 +58,7 @@
**/
PreloaderThread(const CatalogDatabase &database,
StorageManager *storage_manager,
- const int cpu_id)
+ const int cpu_id = -1)
: database_(database),
storage_manager_(storage_manager),
cpu_id_(cpu_id) {