blob: feafec1d0de2b61ba96a7dfaa42063853006032f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "cli/CommandExecutor.hpp"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <memory>
#include <string>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogRelationSchema.hpp"
#include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "parser/ParseString.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "types/Type.hpp"
#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "utility/PtrVector.hpp"
#include "utility/SqlError.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
using std::fprintf;
using std::fputc;
using std::fputs;
using std::size_t;
using std::string;
using std::vector;
namespace tmb { class MessageBus; }
namespace quickstep {
namespace cli {
namespace {
namespace C = ::quickstep::cli;
void executeDescribeDatabase(
const PtrVector<ParseString> *arguments,
const CatalogDatabase &catalog_database,
StorageManager *storage_manager,
FILE *out) {
// Column width initialized to 6 to take into account the header name
// and the column value table
int max_column_width = C::kInitMaxColumnWidth;
vector<std::size_t> num_blocks;
const CatalogRelation *relation = nullptr;
if (arguments->size() == 0) {
for (const CatalogRelation &rel : catalog_database) {
max_column_width =
std::max(static_cast<int>(rel.getName().length()), max_column_width);
num_blocks.push_back(rel.size_blocks());
}
} else {
const ParseString &table_name = arguments->front();
const std::string &table_name_val = table_name.value();
relation = catalog_database.getRelationByName(table_name_val);
if (relation == nullptr) {
THROW_SQL_ERROR_AT(&(arguments->front())) << " Unrecognized relation " << table_name_val;
}
max_column_width = std::max(static_cast<int>(relation->getName().length()),
max_column_width);
num_blocks.push_back(relation->size_blocks());
}
// Only if we have relations work on the printing logic.
if (catalog_database.size() > 0) {
const std::size_t max_num_blocks = *std::max_element(num_blocks.begin(), num_blocks.end());
const int max_num_blocks_digits = std::max(PrintToScreen::GetNumberOfDigits(max_num_blocks),
C::kInitMaxColumnWidth+2);
vector<int> column_widths;
column_widths.push_back(max_column_width +1);
column_widths.push_back(C::kInitMaxColumnWidth + 1);
column_widths.push_back(max_num_blocks_digits + 1);
fputs(" List of relations\n\n", out);
fprintf(out, "%-*s |", max_column_width+1, " Name");
fprintf(out, "%-*s |", C::kInitMaxColumnWidth, " Type");
fprintf(out, "%-*s\n", max_num_blocks_digits, " Blocks");
PrintToScreen::printHBar(column_widths, out);
// If there are no argument print the entire list of tables
// else print the particular table only.
vector<std::size_t>::const_iterator num_blocks_it = num_blocks.begin();
if (arguments->size() == 0) {
for (const CatalogRelation &rel : catalog_database) {
fprintf(out, " %-*s |", max_column_width, rel.getName().c_str());
fprintf(out, " %-*s |", C::kInitMaxColumnWidth - 1, "table");
fprintf(out, " %-*lu\n", max_num_blocks_digits - 1, *num_blocks_it);
++num_blocks_it;
}
} else {
fprintf(out, " %-*s |", max_column_width, relation->getName().c_str());
fprintf(out, " %-*s |", C::kInitMaxColumnWidth -1, "table");
fprintf(out, " %-*lu\n", max_num_blocks_digits - 1, *num_blocks_it);
++num_blocks_it;
}
fputc('\n', out);
}
}
void executeDescribeTable(
const PtrVector<ParseString> *arguments,
const CatalogDatabase &catalog_database, FILE *out) {
const ParseString &table_name = arguments->front();
const std::string &table_name_val = table_name.value();
const CatalogRelation *relation =
catalog_database.getRelationByName(table_name_val);
if (relation == nullptr) {
THROW_SQL_ERROR_AT(&(arguments->front())) << " Unrecognized relation " << table_name_val;
}
vector<int> column_widths;
int max_attr_column_width = C::kInitMaxColumnWidth;
int max_type_column_width = C::kInitMaxColumnWidth;
for (const CatalogAttribute &attr : *relation) {
// Printed column needs to be wide enough to print:
// 1. The attribute name (in the printed "header").
// 2. Any value of the attribute's Type.
max_attr_column_width =
std::max(max_attr_column_width,
static_cast<int>(attr.getDisplayName().length()));
max_type_column_width =
std::max(max_type_column_width,
static_cast<int>(attr.getType().getName().length()));
}
// Add room for one extra character to allow spacing between the column ending and the vertical bar
column_widths.push_back(max_attr_column_width+1);
column_widths.push_back(max_type_column_width+1);
fprintf(out, "%*s \"%s\"\n", C::kInitMaxColumnWidth, "Table", table_name_val.c_str());
fprintf(out, "%-*s |", max_attr_column_width+1, " Column");
fprintf(out, "%-*s\n", max_type_column_width+1, " Type");
PrintToScreen::printHBar(column_widths, out);
for (const CatalogAttribute &attr : *relation) {
fprintf(out, " %-*s |", max_attr_column_width,
attr.getDisplayName().c_str());
fprintf(out, " %-*s\n", max_type_column_width,
attr.getType().getName().c_str());
}
// TODO(rogers): Add handlers for partitioning information.
if (relation->hasIndexScheme()) {
fprintf(out, "%*s\n", C::kInitMaxColumnWidth+2, " Indexes");
const quickstep::IndexScheme &index_scheme = relation->getIndexScheme();
for (auto index_it = index_scheme.begin(); index_it != index_scheme.end();
++index_it) {
fprintf(out, " \"%-*s\" %s", static_cast<int>(index_it->first.length()),
index_it->first.c_str(),
index_it->second.IndexSubBlockType_Name(
index_it->second.sub_block_type()).c_str());
fputc(' ', out);
fputc('(', out);
fprintf(out, "%s", relation->getAttributeById(index_it->second.indexed_attribute_ids(0))
->getDisplayName().c_str());
for (std::size_t i = 1; i < static_cast<std::size_t>(index_it->second.indexed_attribute_ids_size()); ++i) {
const char *attribute_display_name = relation->getAttributeById(
index_it->second.indexed_attribute_ids(i))
->getDisplayName().c_str();
fprintf(out, ", %s", attribute_display_name);
}
fputc(')', out);
fputc('\n', out);
}
}
}
inline std::vector<TypedValue> executeQueryForSingleRow(
const tmb::client_id main_thread_client_id,
const tmb::client_id foreman_client_id,
const std::string &query_string,
tmb::MessageBus *bus,
StorageManager *storage_manager,
QueryProcessor *query_processor,
SqlParserWrapper *parser_wrapper) {
parser_wrapper->feedNextBuffer(new std::string(query_string));
ParseResult result = parser_wrapper->getNextStatement();
DCHECK(result.condition == ParseResult::kSuccess);
const ParseStatement &statement = *result.parsed_statement;
// Generate the query plan.
std::unique_ptr<QueryHandle> query_handle(
std::make_unique<QueryHandle>(query_processor->query_id(),
main_thread_client_id,
statement.getPriority()));
query_processor->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
// Use foreman to execute the query plan.
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id, foreman_client_id, query_handle.get(), bus);
QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id, bus);
// Retrieve the scalar result from the result relation.
const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
DCHECK(query_result_relation != nullptr);
std::vector<TypedValue> values;
{
std::vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
DCHECK_EQ(1u, blocks.size());
BlockReference block = storage_manager->getBlock(blocks[0], *query_result_relation);
const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
DCHECK_EQ(1, tuple_store.numTuples());
const std::size_t num_columns = tuple_store.getRelation().size();
if (tuple_store.isPacked()) {
for (std::size_t i = 0; i < num_columns; ++i) {
values.emplace_back(tuple_store.getAttributeValueTyped(0, i));
values[i].ensureNotReference();
}
} else {
std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
for (std::size_t i = 0; i < num_columns; ++i) {
values.emplace_back(
tuple_store.getAttributeValueTyped(*existence_map->begin(), i));
values[i].ensureNotReference();
}
}
}
// Drop the result relation.
DropRelation::Drop(*query_result_relation,
query_processor->getDefaultDatabase(),
query_processor->getStorageManager());
return values;
}
/**
* @brief A helper function that executes a SQL query to obtain a scalar result.
*/
inline TypedValue executeQueryForSingleResult(
const tmb::client_id main_thread_client_id,
const tmb::client_id foreman_client_id,
const std::string &query_string,
tmb::MessageBus *bus,
StorageManager *storage_manager,
QueryProcessor *query_processor,
SqlParserWrapper *parser_wrapper) {
std::vector<TypedValue> results =
executeQueryForSingleRow(
main_thread_client_id,
foreman_client_id,
query_string,
bus,
storage_manager,
query_processor,
parser_wrapper);
DCHECK_EQ(1u, results.size());
return results[0];
}
void executeAnalyze(const PtrVector<ParseString> *arguments,
const tmb::client_id main_thread_client_id,
const tmb::client_id foreman_client_id,
MessageBus *bus,
QueryProcessor *query_processor,
FILE *out) {
const CatalogDatabase &database = *query_processor->getDefaultDatabase();
StorageManager *storage_manager = query_processor->getStorageManager();
std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper());
std::vector<std::reference_wrapper<const CatalogRelation>> relations;
if (arguments->size() == 0) {
relations.insert(relations.begin(), database.begin(), database.end());
} else {
for (const auto &rel_name : *arguments) {
const CatalogRelation *rel = database.getRelationByName(rel_name.value());
if (rel == nullptr) {
THROW_SQL_ERROR_AT(&rel_name) << "Table does not exist";
} else {
relations.emplace_back(*rel);
}
}
}
// Analyze each relation in the database.
for (const CatalogRelation &relation : relations) {
fprintf(out, "Analyzing %s ... ", relation.getName().c_str());
fflush(out);
CatalogRelation *mutable_relation =
query_processor->getDefaultDatabase()->getRelationByIdMutable(relation.getID());
// Get the number of distinct values for each column.
for (const CatalogAttribute &attribute : relation) {
const Type &attr_type = attribute.getType();
bool is_min_max_applicable =
(attr_type.getSuperTypeID() == Type::SuperTypeID::kNumeric);
std::string query_string = "SELECT COUNT(DISTINCT ";
query_string.append(attribute.getName());
query_string.append(")");
if (is_min_max_applicable) {
query_string.append(", MIN(");
query_string.append(attribute.getName());
query_string.append("), MAX(");
query_string.append(attribute.getName());
query_string.append(")");
}
query_string.append(" FROM ");
query_string.append(relation.getName());
query_string.append(";");
std::vector<TypedValue> results =
executeQueryForSingleRow(main_thread_client_id,
foreman_client_id,
query_string,
bus,
storage_manager,
query_processor,
parser_wrapper.get());
auto *stat = mutable_relation->getStatisticsMutable();
const attribute_id attr_id = attribute.getID();
DCHECK(results[0].getTypeID() == TypeID::kLong);
stat->setNumDistinctValues(attr_id,
results[0].getLiteral<std::int64_t>());
if (is_min_max_applicable) {
DCHECK_GE(results.size(), 3u);
DCHECK(results[1].getTypeID() == attr_type.getTypeID());
DCHECK(results[2].getTypeID() == attr_type.getTypeID());
stat->setMinValue(attr_id, results[1]);
stat->setMaxValue(attr_id, results[2]);
}
}
// Get the number of tuples for the relation.
std::string query_string = "SELECT COUNT(*) FROM ";
query_string.append(relation.getName());
query_string.append(";");
TypedValue num_tuples =
executeQueryForSingleResult(main_thread_client_id,
foreman_client_id,
query_string,
bus,
storage_manager,
query_processor,
parser_wrapper.get());
DCHECK(num_tuples.getTypeID() == TypeID::kLong);
mutable_relation->getStatisticsMutable()->setNumTuples(
num_tuples.getLiteral<std::int64_t>());
fprintf(out, "done\n");
fflush(out);
}
query_processor->markCatalogAltered();
query_processor->saveCatalog();
}
} // namespace
void executeCommand(const ParseStatement &statement,
const CatalogDatabase &catalog_database,
const tmb::client_id main_thread_client_id,
const tmb::client_id foreman_client_id,
MessageBus *bus,
StorageManager *storage_manager,
QueryProcessor *query_processor,
FILE *out) {
const ParseCommand &command = static_cast<const ParseCommand &>(statement);
const PtrVector<ParseString> *arguments = command.arguments();
const std::string &command_str = command.command()->value();
if (command_str == C::kDescribeDatabaseCommand) {
executeDescribeDatabase(arguments, catalog_database, storage_manager, out);
} else if (command_str == C::kDescribeTableCommand) {
if (arguments->size() == 0) {
executeDescribeDatabase(arguments, catalog_database, storage_manager, out);
} else {
executeDescribeTable(arguments, catalog_database, out);
}
} else if (command_str == C::kAnalyzeCommand) {
executeAnalyze(arguments,
main_thread_client_id,
foreman_client_id,
bus,
query_processor, out);
} else {
THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
}
}
} // namespace cli
} // namespace quickstep