blob: d8e63ceda66e4a549b1ac8492e9df217080a0cbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "cli/distributed/Executor.hpp"
#include <cstddef>
#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
#include "cli/InputParserUtil.hpp"
#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/Shiftboss.hpp"
#include "query_execution/Worker.hpp"
#include "query_execution/WorkerDirectory.hpp"
#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageManager.hpp"
#include "tmb/id_typedefs.h"
#include "tmb/native_net_client_message_bus.h"
#include "tmb/pure_memory_message_bus.h"
#include "glog/logging.h"
using std::make_unique;
using std::size_t;
using std::vector;
using tmb::client_id;
namespace quickstep {
void Executor::init() {
bus_local_.Initialize();
executor_client_id_ = bus_.Connect();
DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
bus_.RegisterClientAsSender(executor_client_id_, kBlockDomainRegistrationMessage);
bus_.RegisterClientAsReceiver(executor_client_id_, kBlockDomainRegistrationResponseMessage);
// Parse the CPU affinities for workers and the preloader thread, if enabled
// to warm up the buffer pool.
const vector<int> worker_cpu_affinities =
InputParserUtil::ParseWorkerAffinities(FLAGS_num_workers, FLAGS_worker_affinities);
const vector<numa_node_id> worker_numa_nodes(FLAGS_num_workers, kAnyNUMANodeID);
vector<client_id> worker_client_ids;
for (int worker_thread_index = 0;
worker_thread_index < FLAGS_num_workers;
++worker_thread_index) {
workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_,
worker_cpu_affinities[worker_thread_index]));
worker_client_ids.push_back(workers_.back()->getBusClientID());
}
worker_directory_ =
make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, worker_numa_nodes);
DefaultsConfigurator::CreateDirectory(FLAGS_storage_path);
client_id locator_client_id;
storage_manager_ = make_unique<StorageManager>(
FLAGS_storage_path,
block_locator::getBlockDomain(data_exchanger_.network_address(), executor_client_id_, &locator_client_id, &bus_),
locator_client_id, &bus_);
data_exchanger_.set_storage_manager(storage_manager_.get());
data_exchanger_.start();
shiftboss_ =
make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(),
storage_manager_->hdfs());
shiftboss_->start();
for (const auto &worker : workers_) {
worker->start();
}
}
} // namespace quickstep