blob: be04c50922c22d1567c12f55c9003767abbf846e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
#include <cstdio>
#include <cstdlib>
#include <functional>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/ForemanDistributed.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/Optimizer.hpp"
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/tests/TestDatabaseLoader.hpp"
#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageManager.hpp"
#include "utility/MemStream.hpp"
#include "utility/SqlError.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
using std::free;
using std::make_unique;
using std::malloc;
using std::move;
using std::string;
using std::vector;
using tmb::TaggedMessage;
namespace quickstep {
class CatalogRelation;
namespace optimizer {
namespace {
constexpr int kNumInstances = 3;
} // namespace
const char *DistributedExecutionGeneratorTestRunner::kResetOption =
"reset_before_execution";
DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path)
: query_id_(0),
bus_locals_(kNumInstances),
data_exchangers_(kNumInstances) {
bus_.Initialize();
cli_id_ = bus_.Connect();
bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
test_database_loader_ = make_unique<TestDatabaseLoader>(
storage_path,
block_locator::getBlockDomain(
test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
locator_client_id_,
&bus_);
DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
test_database_loader_data_exchanger_.start();
test_database_loader_->createTestRelation(false /* allow_vchar */);
test_database_loader_->loadTestRelation();
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(),
nullptr /* query_processor */);
foreman_->start();
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
kAnyNUMANodeID);
for (int i = 0; i < kNumInstances; ++i) {
tmb::MessageBus *bus_local = &bus_locals_[i];
bus_local->Initialize();
workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, bus_local));
const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID());
worker_directories_.push_back(
make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes));
auto storage_manager = make_unique<StorageManager>(
storage_path,
block_locator::getBlockDomain(
data_exchangers_[i].network_address(), cli_id_, &locator_client_id_, &bus_),
locator_client_id_, &bus_);
DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
data_exchangers_[i].set_storage_manager(storage_manager.get());
shiftbosses_.push_back(
make_unique<Shiftboss>(&bus_, bus_local, storage_manager.get(), worker_directories_.back().get(),
storage_manager->hdfs()));
storage_managers_.push_back(move(storage_manager));
}
for (int i = 0; i < kNumInstances; ++i) {
data_exchangers_[i].start();
shiftbosses_[i]->start();
workers_[i]->start();
}
}
DistributedExecutionGeneratorTestRunner::~DistributedExecutionGeneratorTestRunner() {
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
for (int i = 0; i < kNumInstances; ++i) {
workers_[i]->join();
shiftbosses_[i]->join();
}
foreman_->join();
test_database_loader_data_exchanger_.shutdown();
test_database_loader_.reset();
for (int i = 0; i < kNumInstances; ++i) {
data_exchangers_[i].shutdown();
storage_managers_[i].reset();
}
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage)));
test_database_loader_data_exchanger_.join();
for (int i = 0; i < kNumInstances; ++i) {
data_exchangers_[i].join();
}
block_locator_->join();
}
void DistributedExecutionGeneratorTestRunner::runTestCase(
const string &input, const std::set<string> &options, string *output) {
// TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
VLOG(4) << "Test SQL(s): " << input;
if (options.find(kResetOption) != options.end()) {
test_database_loader_->clear();
test_database_loader_->createTestRelation(false /* allow_vchar */);
test_database_loader_->loadTestRelation();
}
MemStream output_stream;
sql_parser_.feedNextBuffer(new string(input));
while (true) {
ParseResult result = sql_parser_.getNextStatement();
if (result.condition != ParseResult::kSuccess) {
if (result.condition == ParseResult::kError) {
*output = result.error_message;
}
break;
}
const ParseStatement &parse_statement = *result.parsed_statement;
std::printf("%s\n", parse_statement.toString().c_str());
auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
try {
OptimizerContext optimizer_context;
optimizer_.generateQueryHandle(parse_statement,
test_database_loader_->catalog_database(),
&optimizer_context,
query_handle.get());
} catch (const SqlError &error) {
*output = error.formatMessage(input);
break;
}
const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
test_database_loader_->storage_manager(),
output_stream.file());
DropRelation::Drop(*query_result_relation,
test_database_loader_->catalog_database(),
test_database_loader_->storage_manager());
}
}
if (output->empty()) {
*output = output_stream.str();
}
}
} // namespace optimizer
} // namespace quickstep