Added command support in the distributed version.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 918069c..9cd02be 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -817,6 +817,7 @@
target_link_libraries(quickstep_distributed_cli_shell
glog
quickstep_catalog_CatalogRelation
+ quickstep_cli_Constants
quickstep_cli_Flags
quickstep_cli_LineReader
quickstep_cli_PrintToScreen
@@ -833,6 +834,7 @@
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
quickstep_utility_Macros
+ quickstep_utility_SqlError
quickstep_utility_StringUtil
tmb
${GFLAGS_LIB_NAME}
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 5804321..1f7dee0 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -26,6 +26,8 @@
target_link_libraries(quickstep_cli_distributed_Conductor
glog
quickstep_catalog_CatalogDatabase
+ quickstep_cli_CommandExecutorUtil
+ quickstep_cli_Constants
quickstep_cli_DefaultsConfigurator
quickstep_cli_Flags
quickstep_cli_distributed_Role
@@ -41,6 +43,7 @@
quickstep_storage_StorageConstants
quickstep_utility_Macros
quickstep_utility_SqlError
+ quickstep_utility_StringUtil
tmb)
target_link_libraries(quickstep_cli_distributed_Executor
glog
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 6228898..49b7dc1 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -30,6 +30,7 @@
#include "catalog/CatalogRelation.hpp"
#include "cli/CliConfig.h" // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Constants.hpp"
#include "cli/Flags.hpp"
#ifdef QUICKSTEP_USE_LINENOISE
@@ -49,6 +50,7 @@
#include "query_execution/QueryExecutionUtil.hpp"
#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "utility/SqlError.hpp"
#include "utility/StringUtil.hpp"
#include "tmb/address.h"
@@ -76,6 +78,7 @@
namespace quickstep {
+namespace C = cli;
namespace S = serialization;
void Cli::init() {
@@ -127,6 +130,10 @@
bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+
+ // Prepare for submitting a command.
+ bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
}
void Cli::run() {
@@ -158,27 +165,51 @@
break;
}
- CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
- << "TODO(quickstep-team)";
+ if (statement.getStatementType() == ParseStatement::kCommand) {
+ const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+ const std::string &command_str = command.command()->value();
+ try {
+ if (command_str == C::kAnalyzeCommand) {
+ // TODO(zuyu): support '\analyze'.
+ THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
+ } else if (command_str != C::kDescribeDatabaseCommand &&
+ command_str != C::kDescribeTableCommand) {
+ THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+ }
+ } catch (const SqlError &error) {
+ fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
+ reset_parser = true;
+ break;
+ }
- DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
- << "') to Conductor";
- S::SqlQueryMessage proto;
- proto.set_sql_query(*command_string);
+ DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
+ << "') to Conductor";
+ S::CommandMessage proto;
+ proto.set_command(*command_string);
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
- proto_length,
- kSqlQueryMessage);
- free(proto_bytes);
+ TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage);
+ free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(&bus_,
- cli_id_,
- conductor_client_id_,
- move(sql_query_message));
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
+ } else {
+ DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+ << "') to Conductor";
+ S::SqlQueryMessage proto;
+ proto.set_sql_query(*command_string);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
+ free(proto_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
+ }
start = std::chrono::steady_clock::now();
@@ -187,6 +218,13 @@
DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
<< "' message from client " << annotated_message.sender;
switch (tagged_message.message_type()) {
+ case kCommandResponseMessage: {
+ S::CommandResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ printf("%s", proto.command_response().c_str());
+ break;
+ }
case kQueryExecutionSuccessMessage: {
end = std::chrono::steady_clock::now();
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 3c68bfb..b877b04 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -29,6 +29,8 @@
#include <utility>
#include "catalog/CatalogDatabase.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
#include "parser/ParseStatement.hpp"
@@ -42,6 +44,7 @@
#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageConstants.hpp"
#include "utility/SqlError.hpp"
+#include "utility/StringUtil.hpp"
#include "tmb/id_typedefs.h"
#include "tmb/native_net_client_message_bus.h"
@@ -63,6 +66,7 @@
namespace quickstep {
+namespace C = cli;
namespace S = serialization;
void Conductor::init() {
@@ -91,6 +95,9 @@
bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
+ bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
+
bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
@@ -125,6 +132,14 @@
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
break;
}
+ case kCommandMessage: {
+ S::CommandMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ DLOG(INFO) << "Conductor received the following command: " << proto.command();
+
+ processCommandMessage(sender, new string(move(proto.command())));
+ break;
+ }
case kSqlQueryMessage: {
S::SqlQueryMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -146,6 +161,69 @@
}
}
+void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) {
+ parser_wrapper_.feedNextBuffer(command_string);
+ ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+ CHECK(parse_result.condition == ParseResult::kSuccess)
+ << "Any syntax error should be addressed in the DistributedCli.";
+
+ const ParseStatement &statement = *parse_result.parsed_statement;
+ DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
+
+ const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+ const PtrVector<ParseString> &arguments = *(command.arguments());
+ const string &command_str = command.command()->value();
+
+ string command_response;
+
+ try {
+ if (command_str == C::kDescribeDatabaseCommand) {
+ command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+ } else if (command_str == C::kDescribeTableCommand) {
+ if (arguments.empty()) {
+ command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+ } else {
+ command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+ }
+ }
+ } catch (const SqlError &command_error) {
+ // Set the query execution status along with the error message.
+ S::QueryExecutionErrorMessage proto;
+ proto.set_error_message(command_error.formatMessage(*command_string));
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionErrorMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
+ << kQueryExecutionErrorMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+ }
+
+ S::CommandResponseMessage proto;
+ proto.set_command_response(command_response);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+}
+
void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
parser_wrapper_.feedNextBuffer(command_string);
ParseResult parse_result = parser_wrapper_.getNextStatement();
@@ -154,8 +232,7 @@
<< "Any SQL syntax error should be addressed in the DistributedCli.";
const ParseStatement &statement = *parse_result.parsed_statement;
- CHECK(statement.getStatementType() != ParseStatement::kCommand)
- << "TODO(quickstep-team)";
+ DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
try {
auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 09bf2b9..e7e003f 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -35,6 +35,7 @@
namespace quickstep {
class CatalogDatabase;
+class ParseCommand;
/** \addtogroup CliDistributed
* @{
@@ -60,6 +61,8 @@
void run() override;
private:
+ void processCommandMessage(const tmb::client_id sender, std::string *command_string);
+
void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
SqlParserWrapper parser_wrapper_;
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index 48f27bb..7f8150f 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -23,6 +23,14 @@
CommandExecutorTestRunner.hpp
"${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
"${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+if (ENABLE_DISTRIBUTED)
+ add_executable(quickstep_cli_tests_DistributedCommandExecutorTest
+ DistributedCommandExecutorTest.cpp
+ DistributedCommandExecutorTestRunner.cpp
+ DistributedCommandExecutorTestRunner.hpp
+ "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+ "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_cli_tests_CommandExecutorTest
glog
@@ -49,3 +57,36 @@
quickstep_utility_TextBasedTestDriver
tmb
${LIBS})
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_cli_tests_DistributedCommandExecutorTest
+ glog
+ gtest
+ quickstep_catalog_CatalogTypedefs
+ quickstep_cli_CommandExecutorUtil
+ quickstep_cli_Constants
+ quickstep_cli_DropRelation
+ quickstep_cli_PrintToScreen
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_BlockLocatorUtil
+ quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_Shiftboss
+ quickstep_queryexecution_Worker
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_queryoptimizer_Optimizer
+ quickstep_queryoptimizer_OptimizerContext
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_tests_TestDatabaseLoader
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageManager
+ quickstep_utility_Macros
+ quickstep_utility_MemStream
+ quickstep_utility_SqlError
+ quickstep_utility_TextBasedTestDriver
+ tmb
+ ${GFLAGS_LIB_NAME}
+ ${LIBS})
+endif(ENABLE_DISTRIBUTED)
diff --git a/cli/tests/DistributedCommandExecutorTest.cpp b/cli/tests/DistributedCommandExecutorTest.cpp
new file mode 100644
index 0000000..b41a70f
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTest.cpp
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include <iostream>
+#include <fstream>
+#include <memory>
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+using std::make_unique;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_COMMAND_EXECUTOR_TEST);
+
+int main(int argc, char** argv) {
+ google::InitGoogleLogging(argv[0]);
+ // Honor FLAGS_buffer_pool_slots in StorageManager.
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ if (argc < 4) {
+ LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+ << " are provided";
+ }
+
+ std::ifstream input_file(argv[1]);
+ CHECK(input_file.is_open()) << argv[1];
+
+ auto test_runner = make_unique<quickstep::DistributedCommandExecutorTestRunner>(argv[3]);
+ test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get());
+ test_driver->registerOption(
+ quickstep::DistributedCommandExecutorTestRunner::kResetOption);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ const int success = RUN_ALL_TESTS();
+ if (success != 0) {
+ test_driver->writeActualOutputToFile(argv[2]);
+ }
+
+ return success;
+}
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.cpp b/cli/tests/DistributedCommandExecutorTestRunner.cpp
new file mode 100644
index 0000000..66d0767
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.cpp
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+
+#include <cstdio>
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::make_unique;
+using std::string;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace {
+
+void nop() {}
+
+} // namespace
+
+namespace C = cli;
+
+const char *DistributedCommandExecutorTestRunner::kResetOption =
+ "reset_before_execution";
+
+DistributedCommandExecutorTestRunner::DistributedCommandExecutorTestRunner(const string &storage_path)
+ : query_id_(0) {
+ bus_.Initialize();
+
+ cli_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+ bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+ block_locator_ = make_unique<BlockLocator>(&bus_);
+ block_locator_->start();
+
+ test_database_loader_ = make_unique<optimizer::TestDatabaseLoader>(
+ storage_path,
+ block_locator::getBlockDomain(
+ test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+ locator_client_id_,
+ &bus_);
+ DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+ test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
+ test_database_loader_data_exchanger_.start();
+
+ test_database_loader_->createTestRelation(false /* allow_vchar */);
+ test_database_loader_->loadTestRelation();
+
+ // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+ // could receive a registration message from the latter.
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
+ test_database_loader_->catalog_database());
+
+ // We don't use the NUMA aware version of worker code.
+ const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
+ kAnyNUMANodeID);
+
+ bus_local_.Initialize();
+
+ worker_ = make_unique<Worker>(0 /* worker_thread_index */, &bus_local_);
+
+ const vector<tmb::client_id> worker_client_ids(1, worker_->getBusClientID());
+ worker_directory_ = make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes);
+
+ storage_manager_ = make_unique<StorageManager>(
+ storage_path,
+ block_locator::getBlockDomain(
+ data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+ locator_client_id_, &bus_);
+ DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+
+ data_exchanger_.set_storage_manager(storage_manager_.get());
+ shiftboss_ =
+ make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(),
+ storage_manager_->hdfs());
+
+ foreman_->start();
+
+ data_exchanger_.start();
+ shiftboss_->start();
+ worker_->start();
+}
+
+DistributedCommandExecutorTestRunner::~DistributedCommandExecutorTestRunner() {
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+ worker_->join();
+ shiftboss_->join();
+
+ foreman_->join();
+
+ test_database_loader_data_exchanger_.shutdown();
+ test_database_loader_.reset();
+ data_exchanger_.shutdown();
+ storage_manager_.reset();
+
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage)));
+
+ test_database_loader_data_exchanger_.join();
+ data_exchanger_.join();
+ block_locator_->join();
+}
+
+void DistributedCommandExecutorTestRunner::runTestCase(
+ const string &input, const std::set<string> &options, string *output) {
+ // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+ VLOG(4) << "Test SQL(s): " << input;
+
+ if (options.find(kResetOption) != options.end()) {
+ test_database_loader_->clear();
+ test_database_loader_->createTestRelation(false /* allow_vchar */);
+ test_database_loader_->loadTestRelation();
+ }
+
+ MemStream output_stream;
+ sql_parser_.feedNextBuffer(new string(input));
+
+ while (true) {
+ ParseResult result = sql_parser_.getNextStatement();
+ if (result.condition != ParseResult::kSuccess) {
+ if (result.condition == ParseResult::kError) {
+ *output = result.error_message;
+ }
+ break;
+ }
+
+ const ParseStatement &parse_statement = *result.parsed_statement;
+ std::printf("%s\n", parse_statement.toString().c_str());
+
+ try {
+ if (parse_statement.getStatementType() == ParseStatement::kCommand) {
+ const ParseCommand &command = static_cast<const ParseCommand &>(parse_statement);
+ const PtrVector<ParseString> &arguments = *(command.arguments());
+ const string &command_str = command.command()->value();
+
+ string command_response;
+ if (command_str == C::kDescribeDatabaseCommand) {
+ command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+ } else if (command_str == C::kDescribeTableCommand) {
+ if (arguments.empty()) {
+ command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+ } else {
+ command_response = C::ExecuteDescribeTable(arguments, *test_database_loader_->catalog_database());
+ }
+ } else {
+ THROW_SQL_ERROR_AT(command.command()) << "Unsupported command";
+ }
+
+ std::fprintf(output_stream.file(), "%s", command_response.c_str());
+ } else {
+ optimizer::OptimizerContext optimizer_context;
+ auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
+
+ optimizer_.generateQueryHandle(parse_statement,
+ test_database_loader_->catalog_database(),
+ &optimizer_context,
+ query_handle.get());
+ const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+
+ const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+ DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+ if (query_result_relation) {
+ PrintToScreen::PrintRelation(*query_result_relation,
+ test_database_loader_->storage_manager(),
+ output_stream.file());
+ DropRelation::Drop(*query_result_relation,
+ test_database_loader_->catalog_database(),
+ test_database_loader_->storage_manager());
+ }
+ }
+ } catch (const SqlError &error) {
+ *output = error.formatMessage(input);
+ break;
+ }
+ }
+
+ if (output->empty()) {
+ *output = output_stream.str();
+ }
+}
+
+} // namespace quickstep
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.hpp b/cli/tests/DistributedCommandExecutorTestRunner.hpp
new file mode 100644
index 0000000..0427a85
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.hpp
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/**
+ * @brief TextBasedTestRunner for testing the CommandExecutor in the
+ * distributed version.
+ */
+class DistributedCommandExecutorTestRunner : public TextBasedTestRunner {
+ public:
+ /**
+ * @brief If this option is enabled, recreate the entire database and
+ * repopulate the data before every test.
+ */
+ static const char *kResetOption;
+
+ /**
+ * @brief Constructor.
+ */
+ explicit DistributedCommandExecutorTestRunner(const std::string &storage_path);
+
+ ~DistributedCommandExecutorTestRunner();
+
+ void runTestCase(const std::string &input,
+ const std::set<std::string> &options,
+ std::string *output) override;
+
+ private:
+ std::size_t query_id_;
+
+ SqlParserWrapper sql_parser_;
+ std::unique_ptr<optimizer::TestDatabaseLoader> test_database_loader_;
+ DataExchangerAsync test_database_loader_data_exchanger_;
+ optimizer::Optimizer optimizer_;
+
+ MessageBusImpl bus_;
+ tmb::client_id cli_id_, locator_client_id_;
+
+ std::unique_ptr<BlockLocator> block_locator_;
+
+ std::unique_ptr<ForemanDistributed> foreman_;
+
+ MessageBusImpl bus_local_;
+ std::unique_ptr<Worker> worker_;
+ std::unique_ptr<WorkerDirectory> worker_directory_;
+ DataExchangerAsync data_exchanger_;
+ std::unique_ptr<StorageManager> storage_manager_;
+ std::unique_ptr<Shiftboss> shiftboss_;
+
+ DISALLOW_COPY_AND_ASSIGN(DistributedCommandExecutorTestRunner);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
diff --git a/cli/tests/command_executor/CMakeLists.txt b/cli/tests/command_executor/CMakeLists.txt
index 9cf1869..e62d954 100644
--- a/cli/tests/command_executor/CMakeLists.txt
+++ b/cli/tests/command_executor/CMakeLists.txt
@@ -26,7 +26,25 @@
"${CMAKE_CURRENT_BINARY_DIR}/Dt.test"
"${CMAKE_CURRENT_BINARY_DIR}/Dt/")
+if (ENABLE_DISTRIBUTED)
+ add_test(quickstep_cli_tests_commandexecutor_d_distributed
+ "../quickstep_cli_tests_DistributedCommandExecutorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/D.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DDistributed.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DDistributed/")
+ add_test(quickstep_cli_tests_commandexecutor_dt_distributed
+ "../quickstep_cli_tests_DistributedCommandExecutorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Dt.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed/")
+endif(ENABLE_DISTRIBUTED)
+
# Create the folders where the unit tests will store their data blocks for the
# duration of their test.
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/D)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Dt)
+
+if (ENABLE_DISTRIBUTED)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DDistributed)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DtDistributed)
+endif(ENABLE_DISTRIBUTED)
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 68f286d..47246d8 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,6 +81,10 @@
required uint64 shiftboss_index = 1;
}
+message CommandMessage {
+ required string command = 1;
+}
+
message SqlQueryMessage {
required string sql_query = 1;
}
@@ -134,6 +138,10 @@
required uint64 shiftboss_index = 4;
}
+message CommandResponseMessage {
+ required string command_response = 1;
+}
+
message QueryExecutionSuccessMessage {
optional CatalogRelationSchema result_relation = 1;
}
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 994bd60..0fd0bdf 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -89,7 +89,11 @@
// Shiftboss to Worker.
kDistributedCliRegistrationMessage, // From CLI to Conductor.
kDistributedCliRegistrationResponseMessage, // From Conductor to CLI.
- kSqlQueryMessage, // From CLI to Conductor.
+
+ // From CLI to Conductor.
+ kCommandMessage,
+ kSqlQueryMessage,
+
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
@@ -101,8 +105,10 @@
kSaveQueryResultMessage, // From Foreman to Shiftboss.
kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
+ kQueryExecutionSuccessMessage, // From Foreman to CLI.
+
// From Foreman / Conductor to CLI.
- kQueryExecutionSuccessMessage,
+ kCommandResponseMessage,
kQueryExecutionErrorMessage,
kQueryResultTeardownMessage, // From CLI to Conductor.