| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #include "cli/distributed/Cli.hpp" |
| |
| #include <chrono> |
| #include <cstddef> |
| #include <cstdio> |
| #include <cstdlib> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/CatalogRelation.hpp" |
| #include "cli/CliConfig.h" // For QUICKSTEP_USE_LINENOISE. |
| #include "cli/Constants.hpp" |
| #include "cli/Flags.hpp" |
| |
| #ifdef QUICKSTEP_USE_LINENOISE |
| #include "cli/LineReaderLineNoise.hpp" |
| typedef quickstep::LineReaderLineNoise LineReaderImpl; |
| #else |
| #include "cli/LineReaderDumb.hpp" |
| typedef quickstep::LineReaderDumb LineReaderImpl; |
| #endif |
| |
| #include "cli/PrintToScreen.hpp" |
| #include "parser/ParseStatement.hpp" |
| #include "parser/SqlParserWrapper.hpp" |
| #include "query_execution/BlockLocatorUtil.hpp" |
| #include "query_execution/QueryExecutionMessages.pb.h" |
| #include "query_execution/QueryExecutionTypedefs.hpp" |
| #include "query_execution/QueryExecutionUtil.hpp" |
| #include "storage/DataExchangerAsync.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "utility/SqlError.hpp" |
| #include "utility/StringUtil.hpp" |
| |
| #include "tmb/address.h" |
| #include "tmb/id_typedefs.h" |
| #include "tmb/message_bus.h" |
| #include "tmb/message_style.h" |
| #include "tmb/native_net_client_message_bus.h" |
| #include "tmb/tagged_message.h" |
| |
| #include "glog/logging.h" |
| |
| using std::fprintf; |
| using std::free; |
| using std::make_unique; |
| using std::malloc; |
| using std::move; |
| using std::printf; |
| using std::size_t; |
| using std::string; |
| using std::vector; |
| |
| using tmb::AnnotatedMessage; |
| using tmb::TaggedMessage; |
| using tmb::client_id; |
| |
| namespace quickstep { |
| |
| namespace C = cli; |
| namespace S = serialization; |
| |
| void Cli::init() { |
| cli_id_ = bus_.Connect(); |
| DLOG(INFO) << "DistributedCli TMB Client ID: " << cli_id_; |
| |
| bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage); |
| bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage); |
| |
| DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage to all"; |
| |
| tmb::Address all_addresses; |
| all_addresses.All(true); |
| // NOTE(zuyu): The singleton Conductor would need one copy of the message. |
| tmb::MessageStyle style; |
| |
| TaggedMessage cli_reg_message(kDistributedCliRegistrationMessage); |
| CHECK(tmb::MessageBus::SendStatus::kOK == |
| bus_.Send(cli_id_, all_addresses, style, move(cli_reg_message))); |
| |
| // Wait for Conductor to response. |
| const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true)); |
| DCHECK_EQ(kDistributedCliRegistrationResponseMessage, |
| cli_reg_response_message.tagged_message.message_type()); |
| conductor_client_id_ = cli_reg_response_message.sender; |
| |
| DLOG(INFO) << "DistributedCli received DistributedCliRegistrationResponseMessage from Conductor with Client " |
| << conductor_client_id_; |
| |
| // Setup StorageManager. |
| bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage); |
| bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage); |
| |
| client_id locator_client_id; |
| storage_manager_ = make_unique<StorageManager>( |
| FLAGS_storage_path, |
| block_locator::getBlockDomain(data_exchanger_.network_address(), cli_id_, &locator_client_id, &bus_), |
| locator_client_id, &bus_); |
| |
| data_exchanger_.set_storage_manager(storage_manager_.get()); |
| data_exchanger_.start(); |
| |
| // Prepare for submitting a query or a command. |
| bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage); |
| |
| bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage); |
| bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage); |
| |
| bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage); |
| |
| bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage); |
| } |
| |
| void Cli::run() { |
| LineReaderImpl line_reader("distributed_quickstep> ", |
| " ...> "); |
| auto parser_wrapper = make_unique<SqlParserWrapper>(); |
| std::chrono::time_point<std::chrono::steady_clock> start, end; |
| |
| for (;;) { |
| string *command_string = new string(); |
| *command_string = line_reader.getNextCommand(); |
| if (command_string->size() == 0) { |
| delete command_string; |
| break; |
| } |
| |
| parser_wrapper->feedNextBuffer(command_string); |
| |
| bool quitting = false; |
| // A parse error should reset the parser. This is because the thrown quickstep |
| // SqlError does not do the proper reset work of the YYABORT macro. |
| bool reset_parser = false; |
| for (;;) { |
| ParseResult result = parser_wrapper->getNextStatement(); |
| const ParseStatement &statement = *result.parsed_statement; |
| if (result.condition == ParseResult::kSuccess) { |
| if (statement.getStatementType() == ParseStatement::kQuit) { |
| quitting = true; |
| break; |
| } |
| |
| if (statement.getStatementType() == ParseStatement::kCommand) { |
| const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement); |
| const std::string &command = parse_command.command()->value(); |
| try { |
| if (command != C::kAnalyzeCommand && |
| command != C::kDescribeDatabaseCommand && |
| command != C::kDescribeTableCommand) { |
| THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command"; |
| } |
| } catch (const SqlError &error) { |
| fprintf(stderr, "%s", error.formatMessage(*command_string).c_str()); |
| reset_parser = true; |
| break; |
| } |
| } |
| |
| DLOG(INFO) << "DistributedCli sent SqlQueryMessage to Conductor"; |
| S::SqlQueryMessage proto; |
| proto.set_sql_query(*command_string); |
| |
| const size_t proto_length = proto.ByteSize(); |
| char *proto_bytes = static_cast<char*>(malloc(proto_length)); |
| CHECK(proto.SerializeToArray(proto_bytes, proto_length)); |
| |
| TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage); |
| free(proto_bytes); |
| |
| QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message)); |
| |
| start = std::chrono::steady_clock::now(); |
| |
| const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true)); |
| const TaggedMessage &tagged_message = annotated_message.tagged_message; |
| const tmb::message_type_id message_type = tagged_message.message_type(); |
| DLOG(INFO) << "DistributedCli received " << QueryExecutionUtil::MessageTypeToString(message_type) |
| << " from Client " << annotated_message.sender; |
| switch (message_type) { |
| case kCommandResponseMessage: { |
| S::CommandResponseMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| printf("%s", proto.command_response().c_str()); |
| break; |
| } |
| case kQueryExecutionSuccessMessage: { |
| end = std::chrono::steady_clock::now(); |
| |
| S::QueryExecutionSuccessMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| if (proto.has_result_relation()) { |
| const CatalogRelation result_relation(proto.result_relation()); |
| |
| PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout); |
| PrintToScreen::PrintOutputSize(result_relation, storage_manager_.get(), stdout); |
| |
| const vector<block_id> blocks(result_relation.getBlocksSnapshot()); |
| for (const block_id block : blocks) { |
| storage_manager_->deleteBlockOrBlobFile(block); |
| } |
| |
| // Notify Conductor to remove the temp query result relation in the Catalog. |
| S::QueryResultTeardownMessage proto_response; |
| proto_response.set_relation_id(result_relation.getID()); |
| |
| const size_t proto_response_length = proto_response.ByteSize(); |
| char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); |
| CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); |
| |
| TaggedMessage response_message(static_cast<const void*>(proto_response_bytes), |
| proto_response_length, |
| kQueryResultTeardownMessage); |
| free(proto_response_bytes); |
| |
| QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(response_message)); |
| } |
| |
| std::chrono::duration<double, std::milli> time_in_ms = end - start; |
| printf("Time: %s ms\n", DoubleToStringWithSignificantDigits(time_in_ms.count(), 3).c_str()); |
| break; |
| } |
| case kQueryExecutionErrorMessage: { |
| S::QueryExecutionErrorMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| fprintf(stderr, "%s", proto.error_message().c_str()); |
| break; |
| } |
| default: { |
| LOG(ERROR) << "Unknown TMB message type"; |
| } |
| } |
| } else { |
| if (result.condition == ParseResult::kError) { |
| fprintf(stderr, "%s", result.error_message.c_str()); |
| } |
| reset_parser = true; |
| break; |
| } |
| } |
| |
| if (quitting) { |
| break; |
| } else if (reset_parser) { |
| parser_wrapper = make_unique<SqlParserWrapper>(); |
| reset_parser = false; |
| } |
| } |
| |
| bus_.Disconnect(cli_id_); |
| } |
| |
| } // namespace quickstep |