blob: a13ab21951799345bb69332e5d52b245067207c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "cli/distributed/Conductor.hpp"
#include <cstddef>
#include <cstdlib>
#include <exception>
#include <functional>
#include <memory>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "cli/CommandExecutorUtil.hpp"
#include "cli/Constants.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
#include "expressions/aggregation/AggregateFunctionMax.hpp"
#include "expressions/aggregation/AggregateFunctionMin.hpp"
#include "parser/ParseStatement.hpp"
#include "parser/ParseString.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageConstants.hpp"
#include "utility/PtrVector.hpp"
#include "utility/SqlError.hpp"
#include "utility/StringUtil.hpp"
#include "tmb/id_typedefs.h"
#include "tmb/native_net_client_message_bus.h"
#include "tmb/tagged_message.h"
#include "glog/logging.h"
using std::free;
using std::make_unique;
using std::malloc;
using std::move;
using std::size_t;
using std::string;
using tmb::AnnotatedMessage;
using tmb::MessageBus;
using tmb::TaggedMessage;
using tmb::client_id;
namespace quickstep {
namespace C = cli;
namespace S = serialization;
class Type;
void Conductor::init() {
try {
string catalog_path = FLAGS_storage_path + kCatalogFilename;
if (FLAGS_initialize_db) { // Initialize the database
DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
}
query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
catalog_database_ = query_processor_->getDefaultDatabase();
// Clean up the temp relations, if any, caused by crashes in the distributed query execution.
for (const CatalogRelation &relation : *catalog_database_) {
if (relation.isTemporary()) {
catalog_database_->dropRelationById(relation.getID());
query_processor_->markCatalogAltered();
}
}
query_processor_->saveCatalog();
} catch (const std::exception &e) {
LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
<< "\nIf you intended to create a new database, "
<< "please use the \"-initialize_db=true\" command line option.";
} catch (...) {
LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
}
bus_.ResetBus();
conductor_client_id_ = bus_.Connect();
DLOG(INFO) << "Conductor TMB Client ID: " << conductor_client_id_;
bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, catalog_database_, query_processor_.get());
foreman_->start();
}
void Conductor::run() {
for (;;) {
AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const tmb::message_type_id message_type = tagged_message.message_type();
const client_id sender = annotated_message.sender;
DLOG(INFO) << "Conductor received " << QueryExecutionUtil::MessageTypeToString(message_type)
<< " from Client " << sender;
switch (message_type) {
case kDistributedCliRegistrationMessage: {
TaggedMessage message(kDistributedCliRegistrationResponseMessage);
DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage to DistributedCLI with Client "
<< sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
break;
}
case kSqlQueryMessage: {
S::SqlQueryMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
DLOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
processSqlQueryMessage(sender, new string(move(proto.sql_query())));
break;
}
case kQueryResultTeardownMessage: {
S::QueryResultTeardownMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
catalog_database_->dropRelationById(proto.relation_id());
break;
}
default:
LOG(FATAL) << "Unknown TMB message type";
}
}
}
void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
SqlParserWrapper parser_wrapper;
parser_wrapper.feedNextBuffer(command_string);
ParseResult parse_result = parser_wrapper.getNextStatement();
CHECK_EQ(ParseResult::kSuccess, parse_result.condition)
<< "Any syntax error should be addressed in the DistributedCli.";
const ParseStatement &statement = *parse_result.parsed_statement;
try {
if (statement.getStatementType() == ParseStatement::kCommand) {
const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
const PtrVector<ParseString> &arguments = *(parse_command.arguments());
const string &command = parse_command.command()->value();
if (command == C::kAnalyzeCommand) {
executeAnalyze(sender, arguments);
return;
}
string command_response;
if (command == C::kDescribeDatabaseCommand) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
} else if (command == C::kDescribeTableCommand) {
if (arguments.empty()) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
} else {
command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
}
}
S::CommandResponseMessage proto;
proto.set_command_response(command_response);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
free(proto_bytes);
DLOG(INFO) << "Conductor sent CommandResponseMessage to DistributedCLI with Client " << sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
} else {
if (FLAGS_print_query) {
printf("\nQuery %zu: %s\n", query_processor_->query_id(), command_string->c_str());
}
auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority());
query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_));
}
} catch (const SqlError &sql_error) {
// Set the query execution status along with the error message.
S::QueryExecutionErrorMessage proto;
proto.set_error_message(sql_error.formatMessage(*command_string));
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kQueryExecutionErrorMessage);
free(proto_bytes);
DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage to DistributedCLI with Client "
<< sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
}
}
void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments) {
std::vector<std::reference_wrapper<const CatalogRelation>> relations;
if (arguments.empty()) {
relations.insert(relations.end(), catalog_database_->begin(), catalog_database_->end());
} else {
for (const auto &argument : arguments) {
const CatalogRelation *relation = catalog_database_->getRelationByName(argument.value());
if (relation == nullptr) {
THROW_SQL_ERROR_AT(&argument) << "Table does not exist";
}
relations.emplace_back(*relation);
}
}
SqlParserWrapper parser_wrapper;
std::vector<QueryHandle*> query_handles;
// Analyze each relation in the database.
for (const CatalogRelation &relation : relations) {
const relation_id rel_id = relation.getID();
const string rel_name = EscapeQuotes(relation.getName(), '"');
string *query_string = nullptr;
// Get the number of distinct values for each column.
for (const CatalogAttribute &attribute : relation) {
const string attr_name = EscapeQuotes(attribute.getName(), '"');
const Type &attr_type = attribute.getType();
const bool is_min_applicable =
AggregateFunctionMin::Instance().canApplyToTypes({&attr_type});
const bool is_max_applicable =
AggregateFunctionMax::Instance().canApplyToTypes({&attr_type});
// NOTE(jianqiao): Note that the relation name and the attribute names may
// contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
// So here we need to format the names with double quotes (").
query_string = new string("SELECT COUNT(DISTINCT \"");
query_string->append(attr_name);
query_string->append("\")");
if (is_min_applicable) {
query_string->append(", MIN(\"");
query_string->append(attr_name);
query_string->append("\")");
}
if (is_max_applicable) {
query_string->append(", MAX(\"");
query_string->append(attr_name);
query_string->append("\")");
}
query_string->append(" FROM \"");
query_string->append(rel_name);
query_string->append("\";");
if (FLAGS_print_query) {
printf("\nQuery %zu: %s\n", query_processor_->query_id(), query_string->c_str());
}
parser_wrapper.feedNextBuffer(query_string);
const ParseResult parse_result = parser_wrapper.getNextStatement();
DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
const ParseStatement &statement = *parse_result.parsed_statement;
// Generate the query plan.
auto query_handle =
make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id,
relations.size(), attribute.getID(),
is_min_applicable, is_max_applicable));
query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
query_handles.push_back(query_handle.release());
}
// Get the number of tuples for the relation.
query_string = new string("SELECT COUNT(*) FROM \"");
query_string->append(rel_name);
query_string->append("\";");
if (FLAGS_print_query) {
printf("\nQuery %zu: %s\n", query_processor_->query_id(), query_string->c_str());
}
parser_wrapper.feedNextBuffer(query_string);
const ParseResult parse_result = parser_wrapper.getNextStatement();
DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
const ParseStatement &statement = *parse_result.parsed_statement;
// Generate the query plan.
auto query_handle =
make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id,
relations.size()));
query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
query_handles.push_back(query_handle.release());
}
if (!query_handles.empty()) {
auto request_message = make_unique<AdmitRequestMessage>(query_handles);
const std::size_t size_of_request_msg = sizeof(*request_message);
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, foreman_->getBusClientID(),
TaggedMessage(request_message.release(), size_of_request_msg,
kAdmitRequestMessage)));
}
}
} // namespace quickstep