Refactored command execution in the distributed version.
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 49b7dc1..63f3259 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -123,7 +123,7 @@
data_exchanger_.set_storage_manager(storage_manager_.get());
data_exchanger_.start();
- // Prepare for submitting a query.
+ // Prepare for submitting a query or a command.
bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
@@ -131,8 +131,6 @@
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
- // Prepare for submitting a command.
- bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
}
@@ -166,51 +164,37 @@
}
if (statement.getStatementType() == ParseStatement::kCommand) {
- const ParseCommand &command = static_cast<const ParseCommand &>(statement);
- const std::string &command_str = command.command()->value();
+ const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+ const std::string &command = parse_command.command()->value();
try {
- if (command_str == C::kAnalyzeCommand) {
+ if (command == C::kAnalyzeCommand) {
// TODO(zuyu): support '\analyze'.
- THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
- } else if (command_str != C::kDescribeDatabaseCommand &&
- command_str != C::kDescribeTableCommand) {
- THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+ THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
+ } else if (command != C::kDescribeDatabaseCommand &&
+ command != C::kDescribeTableCommand) {
+ THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
}
} catch (const SqlError &error) {
fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
reset_parser = true;
break;
}
-
- DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
- << "') to Conductor";
- S::CommandMessage proto;
- proto.set_command(*command_string);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage);
- free(proto_bytes);
-
- QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
- } else {
- DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
- << "') to Conductor";
- S::SqlQueryMessage proto;
- proto.set_sql_query(*command_string);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
- free(proto_bytes);
-
- QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
}
+ DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+ << "') to Conductor";
+ S::SqlQueryMessage proto;
+ proto.set_sql_query(*command_string);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
+ free(proto_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
+
start = std::chrono::steady_clock::now();
const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index b877b04..1b8bfb2 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -95,21 +95,18 @@
bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
- bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
- bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
-
bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+
bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
- foreman_ = make_unique<ForemanDistributed>(*block_locator_,
- std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
- catalog_database_);
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, catalog_database_, query_processor_.get());
foreman_->start();
}
@@ -132,14 +129,6 @@
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
break;
}
- case kCommandMessage: {
- S::CommandMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
- DLOG(INFO) << "Conductor received the following command: " << proto.command();
-
- processCommandMessage(sender, new string(move(proto.command())));
- break;
- }
case kSqlQueryMessage: {
S::SqlQueryMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -161,91 +150,59 @@
}
}
-void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) {
- parser_wrapper_.feedNextBuffer(command_string);
- ParseResult parse_result = parser_wrapper_.getNextStatement();
-
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+ SqlParserWrapper parser_wrapper;
+ parser_wrapper.feedNextBuffer(command_string);
+ ParseResult parse_result = parser_wrapper.getNextStatement();
CHECK(parse_result.condition == ParseResult::kSuccess)
<< "Any syntax error should be addressed in the DistributedCli.";
const ParseStatement &statement = *parse_result.parsed_statement;
- DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
-
- const ParseCommand &command = static_cast<const ParseCommand &>(statement);
- const PtrVector<ParseString> &arguments = *(command.arguments());
- const string &command_str = command.command()->value();
-
- string command_response;
try {
- if (command_str == C::kDescribeDatabaseCommand) {
- command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
- } else if (command_str == C::kDescribeTableCommand) {
- if (arguments.empty()) {
+ if (statement.getStatementType() == ParseStatement::kCommand) {
+ const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+ const PtrVector<ParseString> &arguments = *(parse_command.arguments());
+ const string &command = parse_command.command()->value();
+
+ string command_response;
+ if (command == C::kDescribeDatabaseCommand) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
- } else {
- command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+ } else if (command == C::kDescribeTableCommand) {
+ if (arguments.empty()) {
+ command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+ } else {
+ command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+ }
}
+
+ S::CommandResponseMessage proto;
+ proto.set_command_response(command_response);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+ } else {
+ auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+ sender,
+ statement.getPriority());
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_,
+ foreman_->getBusClientID(),
+ query_handle.release(),
+ &bus_);
}
- } catch (const SqlError &command_error) {
- // Set the query execution status along with the error message.
- S::QueryExecutionErrorMessage proto;
- proto.set_error_message(command_error.formatMessage(*command_string));
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kQueryExecutionErrorMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
- << kQueryExecutionErrorMessage
- << "') to Distributed CLI " << sender;
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
- }
-
- S::CommandResponseMessage proto;
- proto.set_command_response(command_response);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
- << "') to Distributed CLI " << sender;
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
-}
-
-void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
- parser_wrapper_.feedNextBuffer(command_string);
- ParseResult parse_result = parser_wrapper_.getNextStatement();
-
- CHECK(parse_result.condition == ParseResult::kSuccess)
- << "Any SQL syntax error should be addressed in the DistributedCli.";
-
- const ParseStatement &statement = *parse_result.parsed_statement;
- DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
-
- try {
- auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
- sender,
- statement.getPriority());
- query_processor_->generateQueryHandle(statement, query_handle.get());
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- conductor_client_id_,
- foreman_->getBusClientID(),
- query_handle.release(),
- &bus_);
} catch (const SqlError &sql_error) {
// Set the query execution status along with the error message.
S::QueryExecutionErrorMessage proto;
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index e7e003f..0c0f7e5 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -24,7 +24,6 @@
#include <string>
#include "cli/distributed/Role.hpp"
-#include "parser/SqlParserWrapper.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
#include "query_optimizer/QueryProcessor.hpp"
@@ -61,12 +60,8 @@
void run() override;
private:
- void processCommandMessage(const tmb::client_id sender, std::string *command_string);
-
void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
- SqlParserWrapper parser_wrapper_;
-
std::unique_ptr<QueryProcessor> query_processor_;
// Not owned.
CatalogDatabase *catalog_database_;
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 47246d8..a45e8df 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,10 +81,6 @@
required uint64 shiftboss_index = 1;
}
-message CommandMessage {
- required string command = 1;
-}
-
message SqlQueryMessage {
required string sql_query = 1;
}
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 0fd0bdf..a49de5e 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,9 +90,7 @@
kDistributedCliRegistrationMessage, // From CLI to Conductor.
kDistributedCliRegistrationResponseMessage, // From Conductor to CLI.
- // From CLI to Conductor.
- kCommandMessage,
- kSqlQueryMessage,
+ kSqlQueryMessage, // From CLI to Conductor.
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.