blob: 111a9a0b1be51ffd7ee81a41b534ceeb5c2e5dae [file] [log] [blame]
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
* Copyright 2015-2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
/* A standalone command-line interface to Distributed QuickStep Conductor */
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <exception>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include "catalog/Catalog.pb.h"
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/ForemanDistributed.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "utility/SqlError.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "grpc/grpc.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/native_net_client_message_bus.h"
#include "tmb/tagged_message.h"
using std::free;
using std::malloc;
using std::move;
using std::size_t;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using quickstep::QueryExecutionUtil;
using quickstep::QueryProcessor;
using quickstep::kDistributedCliRegistrationResponseMessage;
using quickstep::kDistributedCliRegistrationMessage;
using quickstep::kQueryExecutionErrorMessage;
using quickstep::kQueryExecutionSuccessMessage;
using quickstep::kSqlQueryMessage;
using tmb::AnnotatedMessage;
using tmb::TaggedMessage;
using tmb::client_id;
namespace quickstep {
#ifdef QUICKSTEP_OS_WINDOWS
static constexpr char kPathSeparator = '\\';
static constexpr char kDefaultStoragePath[] = "qsstor\\";
#else
static constexpr char kPathSeparator = '/';
static constexpr char kDefaultStoragePath[] = "qsstor/";
#endif
DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
static bool ValidateTmbServerPort(const char *flagname,
std::int32_t value) {
if (value > 0 && value < 65536) {
return true;
} else {
std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
return false;
}
}
DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
static const bool tmb_server_port_dummy
= gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
DEFINE_string(storage_path, kDefaultStoragePath,
"Filesystem path for quickstep database storage.");
} // namespace quickstep
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
grpc_init();
quickstep::SqlParserWrapper parser_wrapper;
// Setup the paths used by StorageManager.
string fixed_storage_path(quickstep::FLAGS_storage_path);
if (!fixed_storage_path.empty()
&& (fixed_storage_path.back() != quickstep::kPathSeparator)) {
fixed_storage_path.push_back(quickstep::kPathSeparator);
}
string catalog_path(fixed_storage_path);
catalog_path.append("catalog.pb.bin");
// Setup QueryProcessor, including CatalogDatabase and StorageManager.
std::unique_ptr<QueryProcessor> query_processor;
try {
query_processor.reset(new QueryProcessor(catalog_path, fixed_storage_path));
} catch (const std::exception &e) {
LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what();
} catch (...) {
LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
}
tmb::NativeNetClientMessageBus bus;
bus.AddServer(quickstep::FLAGS_tmb_server_ip.c_str(), quickstep::FLAGS_tmb_server_port);
bus.Initialize();
bus.ResetBus();
const client_id conductor_id = bus.Connect();
LOG(INFO) << "Conductor TMB Client ID: " << conductor_id;
bus.RegisterClientAsReceiver(conductor_id, kDistributedCliRegistrationMessage);
bus.RegisterClientAsSender(conductor_id, kDistributedCliRegistrationResponseMessage);
bus.RegisterClientAsReceiver(conductor_id, kSqlQueryMessage);
bus.RegisterClientAsSender(conductor_id, kQueryExecutionSuccessMessage);
bus.RegisterClientAsSender(conductor_id, kQueryExecutionErrorMessage);
quickstep::ForemanDistributed foreman(&bus, query_processor->getDefaultDatabase());
for (;;) {
AnnotatedMessage annotated_message(bus.Receive(conductor_id, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const client_id sender = annotated_message.sender;
LOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
<< "' message from client " << sender;
switch (tagged_message.message_type()) {
case kDistributedCliRegistrationMessage: {
quickstep::serialization::EmptyMessage proto;
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kDistributedCliRegistrationResponseMessage);
free(proto_bytes);
LOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
<< kDistributedCliRegistrationResponseMessage
<< "') to Distributed CLI " << sender;
QueryExecutionUtil::SendTMBMessage(&bus,
conductor_id,
sender,
move(message));
break;
}
case kSqlQueryMessage: {
quickstep::serialization::SqlQueryMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
LOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
string *command_string = new string(move(proto.sql_query()));
parser_wrapper.feedNextBuffer(command_string);
quickstep::ParseResult parse_result = parser_wrapper.getNextStatement();
unique_ptr<quickstep::QueryHandle> query_handle;
try {
query_handle.reset(query_processor->generateQueryHandle(*parse_result.parsed_statement));
} catch (const quickstep::SqlError &sql_error) {
// Set the query execution status along with the error message.
quickstep::serialization::QueryExecutionErrorMessage proto;
proto.set_error_message(sql_error.formatMessage(*command_string));
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kQueryExecutionErrorMessage);
free(proto_bytes);
LOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
<< kQueryExecutionErrorMessage
<< "') to Distributed CLI " << sender;
QueryExecutionUtil::SendTMBMessage(&bus,
conductor_id,
sender,
move(message));
break;
}
try {
foreman.setQueryHandle(query_handle.get());
foreman.start();
foreman.join();
quickstep::serialization::QueryExecutionSuccessMessage proto;
const quickstep::CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
if (query_result_relation) {
proto.mutable_query_result_relation()->MergeFrom(query_result_relation->getProto());
}
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kQueryExecutionSuccessMessage);
free(proto_bytes);
LOG(INFO) << "Conductor sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
<< "') to Distributed CLI " << sender;
QueryExecutionUtil::SendTMBMessage(&bus,
conductor_id,
sender,
move(message));
if (query_result_relation) {
const quickstep::relation_id query_result_relation_id = query_result_relation->getID();
DCHECK(query_processor->getDefaultDatabase()->hasRelationWithId(query_result_relation_id));
query_processor->getDefaultDatabase()->dropRelationById(query_result_relation_id);
}
query_processor->saveCatalog();
} catch (const std::exception &e) {
string error_message("QUERY EXECUTION ERROR: ");
error_message.append(e.what());
error_message.push_back('\n');
quickstep::serialization::QueryExecutionErrorMessage proto;
proto.set_error_message(error_message);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kQueryExecutionErrorMessage);
free(proto_bytes);
LOG(INFO) << "Conductor (on behalf of Executor) sent QueryExecutionErrorMessage (typed '"
<< kQueryExecutionErrorMessage
<< "') to Distributed CLI " << sender;
QueryExecutionUtil::SendTMBMessage(&bus,
conductor_id,
sender,
move(message));
}
break;
}
default: {
LOG(FATAL) << "Unknown TMB message type";
}
}
}
return 0;
}