Added the initial distributed prototype.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 882d1da..2b76e06 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -708,3 +708,88 @@
# Link against other required system and third-party libraries.
target_link_libraries(quickstep_cli_shell ${LIBS})
+
+if (ENABLE_NATIVENET)
+ include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
+
+ # Build the quickstep_distributed_cli_shell executable.
+ add_executable (quickstep_distributed_cli_shell cli/QuickstepCliDistributed.cpp)
+ # Link against direct deps (will transitively pull in everything needed).
+ target_link_libraries(quickstep_distributed_cli_shell
+ gflags_nothreads-static
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_Catalog_proto
+ quickstep_cli_LineReader
+ quickstep_cli_PrintToScreen
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_utility_Macros
+ tmb
+ ${GRPCPLUSPLUS_LIBRARIES})
+ if (ENABLE_HDFS)
+ target_link_libraries(quickstep_distributed_cli_shell
+ quickstep_storage_FileManagerHdfs)
+ endif()
+
+ # Link against other required system and third-party libraries.
+ target_link_libraries(quickstep_distributed_cli_shell
+ ${LIBS})
+
+ # Build the quickstep_distributed_conductor executable.
+ add_executable (quickstep_distributed_conductor cli/ConductorDistributed.cpp)
+ # Link against direct deps (will transitively pull in everything needed).
+ target_link_libraries(quickstep_distributed_conductor
+ gflags_nothreads-static
+ glog
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_Catalog_proto
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryProcessor
+ quickstep_utility_SqlError
+ tmb
+ ${GRPCPLUSPLUS_LIBRARIES})
+ if (ENABLE_HDFS)
+ target_link_libraries(quickstep_distributed_conductor
+ quickstep_storage_FileManagerHdfs)
+ endif()
+
+ # Link against other required system and third-party libraries.
+ target_link_libraries(quickstep_distributed_conductor ${LIBS})
+
+ # Build the quickstep_distributed_worker executable.
+ add_executable (quickstep_distributed_worker cli/WorkerDistributed.cpp)
+ # Link against direct deps (will transitively pull in everything needed).
+ target_link_libraries(quickstep_distributed_worker
+ gflags_nothreads-static
+ glog
+ quickstep_catalog_CatalogDatabase
+ quickstep_cli_InputParserUtil
+ quickstep_queryexecution_Shiftboss
+ quickstep_queryexecution_Worker
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_storage_PreloaderThread
+ quickstep_storage_StorageManager
+ quickstep_utility_PtrVector
+ tmb
+ ${GRPCPLUSPLUS_LIBRARIES})
+ if (ENABLE_HDFS)
+ target_link_libraries(quickstep_distributed_worker
+ quickstep_storage_FileManagerHdfs)
+ endif()
+
+ # Link against other required system and third-party libraries.
+ target_link_libraries(quickstep_distributed_worker ${LIBS})
+endif()
diff --git a/cli/ConductorDistributed.cpp b/cli/ConductorDistributed.cpp
new file mode 100644
index 0000000..111a9a0
--- /dev/null
+++ b/cli/ConductorDistributed.cpp
@@ -0,0 +1,282 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep Conductor */
+
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <exception>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "utility/SqlError.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+
+using quickstep::QueryExecutionUtil;
+using quickstep::QueryProcessor;
+using quickstep::kDistributedCliRegistrationResponseMessage;
+using quickstep::kDistributedCliRegistrationMessage;
+using quickstep::kQueryExecutionErrorMessage;
+using quickstep::kQueryExecutionSuccessMessage;
+using quickstep::kSqlQueryMessage;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+#ifdef QUICKSTEP_OS_WINDOWS
+static constexpr char kPathSeparator = '\\';
+static constexpr char kDefaultStoragePath[] = "qsstor\\";
+#else
+static constexpr char kPathSeparator = '/';
+static constexpr char kDefaultStoragePath[] = "qsstor/";
+#endif
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+ std::int32_t value) {
+ if (value > 0 && value < 65536) {
+ return true;
+ } else {
+ std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+ return false;
+ }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+DEFINE_string(storage_path, kDefaultStoragePath,
+ "Filesystem path for quickstep database storage.");
+
+} // namespace quickstep
+
+int main(int argc, char* argv[]) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ google::InitGoogleLogging(argv[0]);
+ grpc_init();
+
+ quickstep::SqlParserWrapper parser_wrapper;
+
+ // Setup the paths used by StorageManager.
+ string fixed_storage_path(quickstep::FLAGS_storage_path);
+ if (!fixed_storage_path.empty()
+ && (fixed_storage_path.back() != quickstep::kPathSeparator)) {
+ fixed_storage_path.push_back(quickstep::kPathSeparator);
+ }
+
+ string catalog_path(fixed_storage_path);
+ catalog_path.append("catalog.pb.bin");
+
+ // Setup QueryProcessor, including CatalogDatabase and StorageManager.
+ std::unique_ptr<QueryProcessor> query_processor;
+ try {
+ query_processor.reset(new QueryProcessor(catalog_path, fixed_storage_path));
+ } catch (const std::exception &e) {
+ LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what();
+ } catch (...) {
+ LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
+ }
+
+ tmb::NativeNetClientMessageBus bus;
+ bus.AddServer(quickstep::FLAGS_tmb_server_ip.c_str(), quickstep::FLAGS_tmb_server_port);
+ bus.Initialize();
+ bus.ResetBus();
+
+ const client_id conductor_id = bus.Connect();
+ LOG(INFO) << "Conductor TMB Client ID: " << conductor_id;
+
+ bus.RegisterClientAsReceiver(conductor_id, kDistributedCliRegistrationMessage);
+ bus.RegisterClientAsSender(conductor_id, kDistributedCliRegistrationResponseMessage);
+
+ bus.RegisterClientAsReceiver(conductor_id, kSqlQueryMessage);
+ bus.RegisterClientAsSender(conductor_id, kQueryExecutionSuccessMessage);
+ bus.RegisterClientAsSender(conductor_id, kQueryExecutionErrorMessage);
+
+ quickstep::ForemanDistributed foreman(&bus, query_processor->getDefaultDatabase());
+
+ for (;;) {
+ AnnotatedMessage annotated_message(bus.Receive(conductor_id, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const client_id sender = annotated_message.sender;
+
+ LOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
+ << "' message from client " << sender;
+ switch (tagged_message.message_type()) {
+ case kDistributedCliRegistrationMessage: {
+ quickstep::serialization::EmptyMessage proto;
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kDistributedCliRegistrationResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
+ << kDistributedCliRegistrationResponseMessage
+ << "') to Distributed CLI " << sender;
+ QueryExecutionUtil::SendTMBMessage(&bus,
+ conductor_id,
+ sender,
+ move(message));
+ break;
+ }
+ case kSqlQueryMessage: {
+ quickstep::serialization::SqlQueryMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ LOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
+
+ string *command_string = new string(move(proto.sql_query()));
+
+ parser_wrapper.feedNextBuffer(command_string);
+ quickstep::ParseResult parse_result = parser_wrapper.getNextStatement();
+
+ unique_ptr<quickstep::QueryHandle> query_handle;
+ try {
+ query_handle.reset(query_processor->generateQueryHandle(*parse_result.parsed_statement));
+ } catch (const quickstep::SqlError &sql_error) {
+ // Set the query execution status along with the error message.
+ quickstep::serialization::QueryExecutionErrorMessage proto;
+ proto.set_error_message(sql_error.formatMessage(*command_string));
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionErrorMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
+ << kQueryExecutionErrorMessage
+ << "') to Distributed CLI " << sender;
+ QueryExecutionUtil::SendTMBMessage(&bus,
+ conductor_id,
+ sender,
+ move(message));
+ break;
+ }
+
+ try {
+ foreman.setQueryHandle(query_handle.get());
+ foreman.start();
+ foreman.join();
+
+ quickstep::serialization::QueryExecutionSuccessMessage proto;
+
+ const quickstep::CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+ if (query_result_relation) {
+ proto.mutable_query_result_relation()->MergeFrom(query_result_relation->getProto());
+ }
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionSuccessMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Conductor sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+ << "') to Distributed CLI " << sender;
+ QueryExecutionUtil::SendTMBMessage(&bus,
+ conductor_id,
+ sender,
+ move(message));
+
+ if (query_result_relation) {
+ const quickstep::relation_id query_result_relation_id = query_result_relation->getID();
+ DCHECK(query_processor->getDefaultDatabase()->hasRelationWithId(query_result_relation_id));
+ query_processor->getDefaultDatabase()->dropRelationById(query_result_relation_id);
+ }
+
+ query_processor->saveCatalog();
+ } catch (const std::exception &e) {
+ string error_message("QUERY EXECUTION ERROR: ");
+ error_message.append(e.what());
+ error_message.push_back('\n');
+
+ quickstep::serialization::QueryExecutionErrorMessage proto;
+ proto.set_error_message(error_message);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionErrorMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Conductor (on behalf of Executor) sent QueryExecutionErrorMessage (typed '"
+ << kQueryExecutionErrorMessage
+ << "') to Distributed CLI " << sender;
+ QueryExecutionUtil::SendTMBMessage(&bus,
+ conductor_id,
+ sender,
+ move(message));
+ }
+ break;
+ }
+ default: {
+ LOG(FATAL) << "Unknown TMB message type";
+ }
+ }
+ }
+
+ return 0;
+}
diff --git a/cli/QuickstepCliDistributed.cpp b/cli/QuickstepCliDistributed.cpp
new file mode 100644
index 0000000..5bc6ccf
--- /dev/null
+++ b/cli/QuickstepCliDistributed.cpp
@@ -0,0 +1,293 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+/* A standalone command-line interface to distributed QuickStep */
+
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogRelation.hpp"
+#include "cli/CliConfig.h" // For QUICKSTEP_USE_LINENOISE.
+#include "cli/LineReader.hpp"
+
+#ifdef QUICKSTEP_USE_LINENOISE
+#include "cli/LineReaderLineNoise.hpp"
+typedef quickstep::LineReaderLineNoise LineReaderImpl;
+#else
+#include "cli/LineReaderDumb.hpp"
+typedef quickstep::LineReaderDumb LineReaderImpl;
+#endif
+
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include "storage/FileManagerHdfs.hpp"
+#endif
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::fprintf;
+using std::free;
+using std::malloc;
+using std::move;
+using std::printf;
+using std::size_t;
+using std::string;
+
+using quickstep::ParseResult;
+using quickstep::QueryExecutionUtil;
+using quickstep::block_id;
+using quickstep::kDistributedCliRegistrationResponseMessage;
+using quickstep::kDistributedCliRegistrationMessage;
+using quickstep::kQueryExecutionErrorMessage;
+using quickstep::kQueryExecutionSuccessMessage;
+using quickstep::kSqlQueryMessage;
+
+using tmb::Address;
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+#ifdef QUICKSTEP_OS_WINDOWS
+static constexpr char kPathSeparator = '\\';
+static constexpr char kDefaultStoragePath[] = "qsstor\\";
+#else
+static constexpr char kPathSeparator = '/';
+static constexpr char kDefaultStoragePath[] = "qsstor/";
+#endif
+
+DEFINE_string(storage_path, kDefaultStoragePath,
+ "Filesystem path for quickstep database storage.");
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+ std::int32_t value) {
+ if (value > 0 && value < 65536) {
+ return true;
+ } else {
+ std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+ return false;
+ }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+} // namespace quickstep
+
+int main(int argc, char* argv[]) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ google::InitGoogleLogging(argv[0]);
+ grpc_init();
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (quickstep::FLAGS_use_hdfs) {
+ LOG(INFO) << "Using HDFS as the default persistent storage, with namenode at "
+ << quickstep::FLAGS_hdfs_namenode_host << ":"
+ << quickstep::FLAGS_hdfs_namenode_port << " and block replication factor "
+ << quickstep::FLAGS_hdfs_num_replications << "\n";
+ }
+#endif
+
+ // Setup the paths used by StorageManager.
+ string fixed_storage_path(quickstep::FLAGS_storage_path);
+ if (!fixed_storage_path.empty()
+ && (fixed_storage_path.back() != quickstep::kPathSeparator)) {
+ fixed_storage_path.push_back(quickstep::kPathSeparator);
+ }
+
+ quickstep::StorageManager storage_manager(fixed_storage_path);
+
+ tmb::NativeNetClientMessageBus bus;
+ bus.AddServer(quickstep::FLAGS_tmb_server_ip.c_str(), quickstep::FLAGS_tmb_server_port);
+ bus.Initialize();
+
+ const client_id cli_id = bus.Connect();
+ LOG(INFO) << "CLI TMB Client ID: " << cli_id;
+
+ bus.RegisterClientAsSender(cli_id, kDistributedCliRegistrationMessage);
+ bus.RegisterClientAsReceiver(cli_id, kDistributedCliRegistrationResponseMessage);
+
+ LOG(INFO) << "Distributed CLI sent DistributedCliRegistrationMessage (typed '"
+ << kDistributedCliRegistrationMessage
+ << "') to all";
+
+ Address all_addresses;
+ all_addresses.All(true);
+
+ // NOTE(zuyu): The singleton Conductor would need one copy of the message.
+ tmb::MessageStyle style;
+
+ quickstep::serialization::EmptyMessage proto;
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage cli_reg_message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kDistributedCliRegistrationMessage);
+ free(proto_bytes);
+
+ MessageBus::SendStatus send_status =
+ bus.Send(cli_id, all_addresses, style, move(cli_reg_message));
+ DCHECK(send_status == MessageBus::SendStatus::kOK);
+
+ // Wait for Conductor to response.
+ AnnotatedMessage cli_reg_response_message(bus.Receive(cli_id, 0, true));
+ DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
+ cli_reg_response_message.tagged_message.message_type());
+ const client_id conductor_id = cli_reg_response_message.sender;
+
+ LOG(INFO) << "Distributed CLI received typed '" << kDistributedCliRegistrationResponseMessage
+ << "' message from client " << conductor_id;
+
+ // Prepare for executing a query.
+ bus.RegisterClientAsSender(cli_id, kSqlQueryMessage);
+ bus.RegisterClientAsReceiver(cli_id, kQueryExecutionSuccessMessage);
+ bus.RegisterClientAsReceiver(cli_id, kQueryExecutionErrorMessage);
+
+ LineReaderImpl line_reader("distributed_quickstep> ",
+ " ...> ");
+ quickstep::SqlParserWrapper parser_wrapper;
+ std::chrono::time_point<std::chrono::steady_clock> start, end;
+
+ for (;;) {
+ string *command_string = new string();
+ *command_string = line_reader.getNextCommand();
+ if (command_string->size() == 0) {
+ delete command_string;
+ break;
+ }
+
+ parser_wrapper.feedNextBuffer(command_string);
+
+ bool quitting = false;
+ for (;;) {
+ ParseResult result = parser_wrapper.getNextStatement();
+ if (result.condition == ParseResult::kSuccess) {
+ if (result.parsed_statement->getStatementType() == quickstep::ParseStatement::kQuit) {
+ quitting = true;
+ break;
+ }
+
+ LOG(INFO) << "Distributed CLI sent SqlQueryMessage (typed '" << kSqlQueryMessage
+ << "') to Foreman";
+ quickstep::serialization::SqlQueryMessage proto;
+ proto.set_sql_query(*command_string);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kSqlQueryMessage);
+ free(proto_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(&bus,
+ cli_id,
+ conductor_id,
+ move(sql_query_message));
+
+ start = std::chrono::steady_clock::now();
+
+ AnnotatedMessage annotated_message(bus.Receive(cli_id, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ LOG(INFO) << "Distributed CLI received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+ switch (tagged_message.message_type()) {
+ case kQueryExecutionSuccessMessage: {
+ end = std::chrono::steady_clock::now();
+
+ quickstep::serialization::QueryExecutionSuccessMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ if (proto.has_query_result_relation()) {
+ quickstep::CatalogRelation query_result_relation(proto.query_result_relation());
+
+ quickstep::PrintToScreen::PrintRelation(query_result_relation,
+ &storage_manager,
+ stdout);
+
+ const std::vector<block_id> blocks(query_result_relation.getBlocksSnapshot());
+ for (const block_id block : blocks) {
+ storage_manager.deleteBlockOrBlobFile(block);
+ }
+ }
+
+ printf("Execution time: %g seconds\nQuery Complete\n", std::chrono::duration<double>(end - start).count());
+ break;
+ }
+ case kQueryExecutionErrorMessage: {
+ quickstep::serialization::QueryExecutionErrorMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ fprintf(stderr, "%s", proto.error_message().c_str());
+ break;
+ }
+ default: {
+ LOG(ERROR) << "Unknown TMB message type";
+ }
+ }
+ } else {
+ if (result.condition == ParseResult::kError) {
+ fprintf(stderr, "%s", result.error_message.c_str());
+ }
+ break;
+ }
+ }
+
+ if (quitting) {
+ break;
+ }
+ }
+
+ bus.Disconnect(cli_id);
+
+ return 0;
+}
diff --git a/cli/WorkerDistributed.cpp b/cli/WorkerDistributed.cpp
new file mode 100644
index 0000000..ff49aec
--- /dev/null
+++ b/cli/WorkerDistributed.cpp
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep Worker */
+
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "cli/InputParserUtil.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include "storage/FileManagerHdfs.hpp"
+#endif
+
+#include "storage/PreloaderThread.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/PtrVector.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/native_net_client_message_bus.h"
+
+using std::fflush;
+using std::fprintf;
+using std::printf;
+using std::size_t;
+using std::string;
+using std::vector;
+
+using quickstep::InputParserUtil;
+using quickstep::Worker;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+#ifdef QUICKSTEP_OS_WINDOWS
+static constexpr char kPathSeparator = '\\';
+static constexpr char kDefaultStoragePath[] = "qsstor\\";
+#else
+static constexpr char kPathSeparator = '/';
+static constexpr char kDefaultStoragePath[] = "qsstor/";
+#endif
+
+DEFINE_int32(num_workers, 1, "Number of worker threads");
+
+DEFINE_bool(preload_buffer_pool, false,
+ "If true, pre-load all known blocks into buffer pool before "
+ "accepting queries (should also set --buffer_pool_slots to be "
+ "large enough to accomodate the entire database).");
+
+DEFINE_string(storage_path, kDefaultStoragePath,
+ "Filesystem path for quickstep database storage.");
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+ std::int32_t value) {
+ if (value > 0 && value < 65536) {
+ return true;
+ } else {
+ fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+ return false;
+ }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+DEFINE_string(worker_affinities, "",
+ "A comma-separated list of CPU IDs to pin worker threads to "
+ "(leaving this empty will cause all worker threads to inherit "
+ "the affinity mask of the Quickstep process, which typically "
+ "means that they will all be runable on any CPU according to "
+ "the kernel's own scheduling policy).");
+
+} // namespace quickstep
+
+int main(int argc, char* argv[]) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ google::InitGoogleLogging(argv[0]);
+ grpc_init();
+
+ if (quickstep::FLAGS_num_workers > 0) {
+ printf("Starting Quickstep with %d worker thread(s)\n",
+ quickstep::FLAGS_num_workers);
+ } else {
+ LOG(FATAL) << "Quickstep needs at least one worker thread";
+ }
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (quickstep::FLAGS_use_hdfs) {
+ LOG(INFO) << "Using HDFS as the default persistent storage, with namenode at "
+ << quickstep::FLAGS_hdfs_namenode_host << ":"
+ << quickstep::FLAGS_hdfs_namenode_port << " and block replication factor "
+ << quickstep::FLAGS_hdfs_num_replications << "\n";
+ }
+#endif
+
+ // Setup the paths used by StorageManager.
+ string fixed_storage_path(quickstep::FLAGS_storage_path);
+ if (!fixed_storage_path.empty()
+ && (fixed_storage_path.back() != quickstep::kPathSeparator)) {
+ fixed_storage_path.push_back(quickstep::kPathSeparator);
+ }
+
+ quickstep::StorageManager storage_manager(fixed_storage_path);
+
+ tmb::NativeNetClientMessageBus bus;
+ bus.AddServer(quickstep::FLAGS_tmb_server_ip.c_str(), quickstep::FLAGS_tmb_server_port);
+ bus.Initialize();
+
+ // Parse the CPU affinities for workers and the preloader thread, if enabled
+ // to warm up the buffer pool.
+ const vector<int> worker_cpu_affinities =
+ InputParserUtil::ParseWorkerAffinities(quickstep::FLAGS_num_workers,
+ quickstep::FLAGS_worker_affinities);
+
+ if (quickstep::FLAGS_preload_buffer_pool) {
+ // TODO(quickstep-team): Enable Preloading by reconstructing CatalogDatabase
+ // from Catalog in Conductor.
+ quickstep::CatalogDatabase database(nullptr, "default");
+ quickstep::PreloaderThread preloader(database,
+ &storage_manager,
+ worker_cpu_affinities.front());
+ printf("Preloading buffer pool... ");
+ fflush(stdout);
+ preloader.start();
+ preloader.join();
+ printf("DONE\n");
+ }
+
+ // Get the NUMA affinities for workers.
+ vector<int> cpu_numa_nodes = InputParserUtil::GetNUMANodesForCPUs();
+ if (cpu_numa_nodes.empty()) {
+ // libnuma is not present. Assign -1 as the NUMA node for every worker.
+ cpu_numa_nodes.assign(worker_cpu_affinities.size(), -1);
+ }
+
+ vector<int> worker_numa_nodes;
+ quickstep::PtrVector<Worker> workers;
+ vector<client_id> worker_client_ids;
+
+ // Initialize the worker threads.
+ DCHECK_EQ(static_cast<size_t>(quickstep::FLAGS_num_workers),
+ worker_cpu_affinities.size());
+ for (size_t worker_idx = 0;
+ worker_idx < worker_cpu_affinities.size();
+ ++worker_idx) {
+ int numa_node_id = -1;
+ if (worker_cpu_affinities[worker_idx] >= 0) {
+ // This worker can be NUMA affinitized.
+ numa_node_id = cpu_numa_nodes[worker_cpu_affinities[worker_idx]];
+ }
+ worker_numa_nodes.push_back(numa_node_id);
+
+ workers.push_back(new Worker(worker_idx,
+ &bus,
+ worker_cpu_affinities[worker_idx]));
+ worker_client_ids.push_back(workers.back().getBusClientID());
+ }
+
+ // TODO(zuyu): Move WorkerDirectory within Shiftboss once the latter is added.
+ quickstep::WorkerDirectory worker_directory(worker_cpu_affinities.size(),
+ worker_client_ids,
+ worker_numa_nodes);
+
+ quickstep::Shiftboss shiftboss(&bus,
+ &storage_manager,
+ &worker_directory);
+ shiftboss.start();
+
+ // Start the worker threads.
+ for (Worker &worker : workers) {
+ worker.start();
+ }
+
+ for (Worker &worker : workers) {
+ worker.join();
+ }
+ shiftboss.join();
+
+ return 0;
+}
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 95807d4..b9a964c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,6 +20,10 @@
import "query_execution/QueryContext.proto";
import "relational_operators/WorkOrder.proto";
+// Used for any messages that do not carry payloads.
+message EmptyMessage {
+}
+
message ShiftbossRegistrationMessage {
// The total Work Order processing capacity in Shiftboss, which equals to the
// sum of the capacity of each worker managed by Shiftboss.
@@ -87,7 +91,7 @@
}
message QueryExecutionSuccessMessage {
- optional CatalogRelation query_result_relation = 1;
+ optional CatalogRelationSchema query_result_relation = 1;
}
message QueryExecutionErrorMessage {