blob: 9f48eccda81433aa30893bd69dd3e9df3dafa21e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "cli/distributed/Cli.hpp"
#include <chrono>
#include <cstddef>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogRelation.hpp"
#include "cli/CliConfig.h" // For QUICKSTEP_USE_LINENOISE.
#include "cli/Constants.hpp"
#include "cli/Flags.hpp"
#ifdef QUICKSTEP_USE_LINENOISE
#include "cli/LineReaderLineNoise.hpp"
typedef quickstep::LineReaderLineNoise LineReaderImpl;
#else
#include "cli/LineReaderDumb.hpp"
typedef quickstep::LineReaderDumb LineReaderImpl;
#endif
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/SqlError.hpp"
#include "utility/StringUtil.hpp"
#include "tmb/address.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/message_style.h"
#include "tmb/native_net_client_message_bus.h"
#include "tmb/tagged_message.h"
#include "glog/logging.h"
using std::fprintf;
using std::free;
using std::make_unique;
using std::malloc;
using std::move;
using std::printf;
using std::size_t;
using std::string;
using std::vector;
using tmb::AnnotatedMessage;
using tmb::TaggedMessage;
using tmb::client_id;
namespace quickstep {
namespace C = cli;
namespace S = serialization;
void Cli::init() {
cli_id_ = bus_.Connect();
DLOG(INFO) << "DistributedCli TMB Client ID: " << cli_id_;
bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage);
bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage);
DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage to all";
tmb::Address all_addresses;
all_addresses.All(true);
// NOTE(zuyu): The singleton Conductor would need one copy of the message.
tmb::MessageStyle style;
TaggedMessage cli_reg_message(kDistributedCliRegistrationMessage);
CHECK(tmb::MessageBus::SendStatus::kOK ==
bus_.Send(cli_id_, all_addresses, style, move(cli_reg_message)));
// Wait for Conductor to response.
const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true));
DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
cli_reg_response_message.tagged_message.message_type());
conductor_client_id_ = cli_reg_response_message.sender;
DLOG(INFO) << "DistributedCli received DistributedCliRegistrationResponseMessage from Conductor with Client "
<< conductor_client_id_;
// Setup StorageManager.
bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
client_id locator_client_id;
storage_manager_ = make_unique<StorageManager>(
FLAGS_storage_path,
block_locator::getBlockDomain(data_exchanger_.network_address(), cli_id_, &locator_client_id, &bus_),
locator_client_id, &bus_);
data_exchanger_.set_storage_manager(storage_manager_.get());
data_exchanger_.start();
// Prepare for submitting a query or a command.
bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
}
void Cli::run() {
LineReaderImpl line_reader("distributed_quickstep> ",
" ...> ");
auto parser_wrapper = make_unique<SqlParserWrapper>();
std::chrono::time_point<std::chrono::steady_clock> start, end;
for (;;) {
string *command_string = new string();
*command_string = line_reader.getNextCommand();
if (command_string->size() == 0) {
delete command_string;
break;
}
parser_wrapper->feedNextBuffer(command_string);
bool quitting = false;
// A parse error should reset the parser. This is because the thrown quickstep
// SqlError does not do the proper reset work of the YYABORT macro.
bool reset_parser = false;
for (;;) {
ParseResult result = parser_wrapper->getNextStatement();
const ParseStatement &statement = *result.parsed_statement;
if (result.condition == ParseResult::kSuccess) {
if (statement.getStatementType() == ParseStatement::kQuit) {
quitting = true;
break;
}
if (statement.getStatementType() == ParseStatement::kCommand) {
const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
const std::string &command = parse_command.command()->value();
try {
if (command != C::kAnalyzeCommand &&
command != C::kDescribeDatabaseCommand &&
command != C::kDescribeTableCommand) {
THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
}
} catch (const SqlError &error) {
fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
reset_parser = true;
break;
}
}
DLOG(INFO) << "DistributedCli sent SqlQueryMessage to Conductor";
S::SqlQueryMessage proto;
proto.set_sql_query(*command_string);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
free(proto_bytes);
QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
start = std::chrono::steady_clock::now();
const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const tmb::message_type_id message_type = tagged_message.message_type();
DLOG(INFO) << "DistributedCli received " << QueryExecutionUtil::MessageTypeToString(message_type)
<< " from Client " << annotated_message.sender;
switch (message_type) {
case kCommandResponseMessage: {
S::CommandResponseMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
printf("%s", proto.command_response().c_str());
break;
}
case kQueryExecutionSuccessMessage: {
end = std::chrono::steady_clock::now();
S::QueryExecutionSuccessMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
if (proto.has_result_relation()) {
const CatalogRelation result_relation(proto.result_relation());
PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
PrintToScreen::PrintOutputSize(result_relation, storage_manager_.get(), stdout);
const vector<block_id> blocks(result_relation.getBlocksSnapshot());
for (const block_id block : blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
// Notify Conductor to remove the temp query result relation in the Catalog.
S::QueryResultTeardownMessage proto_response;
proto_response.set_relation_id(result_relation.getID());
const size_t proto_response_length = proto_response.ByteSize();
char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
TaggedMessage response_message(static_cast<const void*>(proto_response_bytes),
proto_response_length,
kQueryResultTeardownMessage);
free(proto_response_bytes);
QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(response_message));
}
std::chrono::duration<double, std::milli> time_in_ms = end - start;
printf("Time: %s ms\n", DoubleToStringWithSignificantDigits(time_in_ms.count(), 3).c_str());
break;
}
case kQueryExecutionErrorMessage: {
S::QueryExecutionErrorMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
fprintf(stderr, "%s", proto.error_message().c_str());
break;
}
default: {
LOG(ERROR) << "Unknown TMB message type";
}
}
} else {
if (result.condition == ParseResult::kError) {
fprintf(stderr, "%s", result.error_message.c_str());
}
reset_parser = true;
break;
}
}
if (quitting) {
break;
} else if (reset_parser) {
parser_wrapper = make_unique<SqlParserWrapper>();
reset_parser = false;
}
}
bus_.Disconnect(cli_id_);
}
} // namespace quickstep