blob: 1b8bfb2a86ac2149e8f65a2daf6d785b1cef4e4e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "cli/distributed/Conductor.hpp"
#include <cstddef>
#include <cstdlib>
#include <exception>
#include <functional>
#include <memory>
#include <sstream>
#include <string>
#include <utility>
#include "catalog/CatalogDatabase.hpp"
#include "cli/CommandExecutorUtil.hpp"
#include "cli/Constants.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
#include "parser/ParseStatement.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageConstants.hpp"
#include "utility/SqlError.hpp"
#include "utility/StringUtil.hpp"
#include "tmb/id_typedefs.h"
#include "tmb/native_net_client_message_bus.h"
#include "tmb/tagged_message.h"
#include "glog/logging.h"
using std::free;
using std::make_unique;
using std::malloc;
using std::move;
using std::size_t;
using std::string;
using tmb::AnnotatedMessage;
using tmb::MessageBus;
using tmb::TaggedMessage;
using tmb::client_id;
namespace quickstep {
namespace C = cli;
namespace S = serialization;
void Conductor::init() {
try {
string catalog_path = FLAGS_storage_path + kCatalogFilename;
if (FLAGS_initialize_db) { // Initialize the database
DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
}
query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
catalog_database_ = query_processor_->getDefaultDatabase();
} catch (const std::exception &e) {
LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
<< "\nIf you intended to create a new database, "
<< "please use the \"-initialize_db=true\" command line option.";
} catch (...) {
LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
}
bus_.ResetBus();
conductor_client_id_ = bus_.Connect();
DLOG(INFO) << "Conductor TMB Client ID: " << conductor_client_id_;
bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, catalog_database_, query_processor_.get());
foreman_->start();
}
void Conductor::run() {
for (;;) {
AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const client_id sender = annotated_message.sender;
DLOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
<< "' message from client " << sender;
switch (tagged_message.message_type()) {
case kDistributedCliRegistrationMessage: {
TaggedMessage message(kDistributedCliRegistrationResponseMessage);
DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
<< kDistributedCliRegistrationResponseMessage
<< "') to Distributed CLI " << sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
break;
}
case kSqlQueryMessage: {
S::SqlQueryMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
DLOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
processSqlQueryMessage(sender, new string(move(proto.sql_query())));
break;
}
case kQueryResultTeardownMessage: {
S::QueryResultTeardownMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
catalog_database_->dropRelationById(proto.relation_id());
break;
}
default:
LOG(FATAL) << "Unknown TMB message type";
}
}
}
void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
SqlParserWrapper parser_wrapper;
parser_wrapper.feedNextBuffer(command_string);
ParseResult parse_result = parser_wrapper.getNextStatement();
CHECK(parse_result.condition == ParseResult::kSuccess)
<< "Any syntax error should be addressed in the DistributedCli.";
const ParseStatement &statement = *parse_result.parsed_statement;
try {
if (statement.getStatementType() == ParseStatement::kCommand) {
const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
const PtrVector<ParseString> &arguments = *(parse_command.arguments());
const string &command = parse_command.command()->value();
string command_response;
if (command == C::kDescribeDatabaseCommand) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
} else if (command == C::kDescribeTableCommand) {
if (arguments.empty()) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
} else {
command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
}
}
S::CommandResponseMessage proto;
proto.set_command_response(command_response);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
free(proto_bytes);
DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
<< "') to Distributed CLI " << sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
} else {
auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
sender,
statement.getPriority());
query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
conductor_client_id_,
foreman_->getBusClientID(),
query_handle.release(),
&bus_);
}
} catch (const SqlError &sql_error) {
// Set the query execution status along with the error message.
S::QueryExecutionErrorMessage proto;
proto.set_error_message(sql_error.formatMessage(*command_string));
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kQueryExecutionErrorMessage);
free(proto_bytes);
DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
<< kQueryExecutionErrorMessage
<< "') to Distributed CLI " << sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
}
}
} // namespace quickstep