blob: 8b55872017335c05ab211cfddd6495da7fc60e88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MergeTreeRelParser.h"
#include <Core/Settings.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Operator/FillingDeltaInternalRowDeletedStep.h>
#include <Parser/ExpressionParser.h>
#include <Parser/FunctionParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <Storages/SubstraitSource/FormatFile.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <google/protobuf/wrappers.pb.h>
#include <Poco/StringTokenizer.h>
#include <Common/BlockTypeUtils.h>
#include <Common/GlutenSettings.h>
#include <Common/PlanUtil.h>
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 max_block_size;
}
namespace ErrorCodes
{
extern const int NO_SUCH_DATA_PART;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_FUNCTION;
extern const int UNKNOWN_TYPE;
}
}
namespace local_engine
{
using namespace DB;
void replaceFilePathNodeCommon(
const String & alias_name, DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
{
auto concat_func = DB::FunctionFactory::instance().get("concat", context);
DB::ActionsDAG::NodeRawConstPtrs args;
const auto string_type = std::make_shared<DB::DataTypeString>();
const auto * path_node = &actions_dag.addColumn(
DB::ColumnWithTypeAndName(string_type->createColumnConst(1, merge_tree_table.absolute_path + "/"), string_type, "path"));
args.emplace_back(path_node);
const auto & part_name = actions_dag.findInOutputs(MergeTreeRelParser::VIRTUAL_COLUMN_PART);
args.emplace_back(&part_name);
actions_dag.addOrReplaceInOutputs(actions_dag.addFunction(concat_func, args, alias_name));
}
void replaceInputFileNameNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
{
replaceFilePathNodeCommon(FileMetaColumns::INPUT_FILE_NAME, actions_dag, merge_tree_table, context);
}
void replaceFilePathNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
{
replaceFilePathNodeCommon(FileMetaColumns::FILE_PATH, actions_dag, merge_tree_table, context);
}
void replaceFileNameNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance &, DB::ContextPtr)
{
const auto & part_name = actions_dag.findInOutputs(MergeTreeRelParser::VIRTUAL_COLUMN_PART);
const auto & alias = actions_dag.addAlias(part_name, FileMetaColumns::FILE_NAME);
actions_dag.addOrReplaceInOutputs(alias);
}
void replaceFileSizeNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
{
const auto int64_type = std::make_shared<DB::DataTypeInt64>();
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1), int64_type, FileMetaColumns::FILE_SIZE)));
}
void replaceFileModificationTimeNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
{
const auto decimal64_type = std::make_shared<DB::DataTypeDateTime64>(6);
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
DB::ColumnWithTypeAndName(decimal64_type->createColumnConst(1, DecimalField<DateTime64>(0, 6)), decimal64_type, FileMetaColumns::FILE_MODIFICATION_TIME)));
}
void replaceDeltaInternalRowDeletedNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
{
const auto data_type = std::make_shared<DB::DataTypeNullable>(std::make_shared<DB::DataTypeInt8>());
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
DB::ColumnWithTypeAndName(data_type->createColumn(), data_type, DeltaVirtualMeta::DELTA_INTERNAL_IS_ROW_DELETED)));
}
void replaceFileBlockStartNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance &, DB::ContextPtr)
{
const auto int64_type = std::make_shared<DB::DataTypeInt64>();
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1), int64_type, FileMetaColumns::FILE_BLOCK_START)));
}
void replaceFileBlockLengthNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance &, DB::ContextPtr)
{
const auto int64_type = std::make_shared<DB::DataTypeInt64>();
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1), int64_type, FileMetaColumns::FILE_BLOCK_LENGTH)));
}
void replaceInputFileBlockStartNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance &, DB::ContextPtr)
{
const auto int64_type = std::make_shared<DB::DataTypeInt64>();
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_START)));
}
void replaceInputFileBlockLengthNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance &, DB::ContextPtr)
{
const auto int64_type = std::make_shared<DB::DataTypeInt64>();
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)));
}
void replaceTmpRowIndexNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance &, DB::ContextPtr)
{
const auto & tmp_metadata_row_index = actions_dag.findInOutputs(DB::BlockOffsetColumn::name);
const auto & alias = actions_dag.addAlias(tmp_metadata_row_index, ParquetVirtualMeta::TMP_ROWINDEX);
actions_dag.addOrReplaceInOutputs(alias);
}
/// Find minimal position of the column in primary key.
static Int64 findMinPosition(const NameSet & condition_table_columns, const NameToIndexMap & primary_key_positions)
{
Int64 min_position = std::numeric_limits<Int64>::max() - 1;
for (const auto & column : condition_table_columns)
{
auto it = primary_key_positions.find(column);
if (it != primary_key_positions.end())
min_position = std::min(min_position, static_cast<Int64>(it->second));
}
return min_position;
}
/// Initialize DELTA_META_COLUMN_MAP only upon its first use to avoid static initialization order fiasco.
const std::unordered_map<String, std::tuple<std::optional<String>, DB::DataTypePtr, ReplaceDeltaNodeFunc>> & getDeltaMetaColumnMap()
{
static const std::unordered_map<String, std::tuple<std::optional<String>, DB::DataTypePtr, ReplaceDeltaNodeFunc>> DELTA_META_COLUMN_MAP
= {{FileMetaColumns::INPUT_FILE_NAME, std::tuple("_part", std::make_shared<DB::DataTypeString>(), replaceInputFileNameNode)},
{FileMetaColumns::INPUT_FILE_BLOCK_START,
std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(), replaceInputFileBlockStartNode)},
{FileMetaColumns::INPUT_FILE_BLOCK_LENGTH,
std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(), replaceInputFileBlockLengthNode)},
{ParquetVirtualMeta::TMP_ROWINDEX,
std::tuple(DB::BlockOffsetColumn::name, std::make_shared<DB::DataTypeUInt64>(), replaceTmpRowIndexNode)},
{FileMetaColumns::FILE_PATH, std::tuple("_part", std::make_shared<DB::DataTypeString>(), replaceFilePathNode)},
{FileMetaColumns::FILE_NAME, std::tuple("_part", std::make_shared<DB::DataTypeString>(), replaceFileNameNode)},
{FileMetaColumns::FILE_BLOCK_START,
std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(), replaceFileBlockStartNode)},
{FileMetaColumns::FILE_BLOCK_LENGTH,
std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(), replaceFileBlockLengthNode)},
{FileMetaColumns::FILE_SIZE, std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(), replaceFileSizeNode)},
{FileMetaColumns::FILE_MODIFICATION_TIME, std::tuple(std::nullopt, std::make_shared<DB::DataTypeDateTime64>(6), replaceFileModificationTimeNode)},
{DeltaVirtualMeta::DELTA_INTERNAL_IS_ROW_DELETED,
std::tuple("_part", std::make_shared<DB::DataTypeNullable>(std::make_shared<DB::DataTypeInt8>()), replaceDeltaInternalRowDeletedNode)} // make sure there is a '_part' column
};
return DELTA_META_COLUMN_MAP;
}
DB::Block MergeTreeRelParser::parseMergeTreeOutput(const substrait::ReadRel & rel, SparkStorageMergeTreePtr storage)
{
if (rel.has_base_schema() && rel.base_schema().names_size())
return TypeParser::buildBlockFromNamedStruct(rel.base_schema());
NamesAndTypesList one_column_name_type;
one_column_name_type.push_back(storage->getInMemoryMetadataPtr()->getColumns().getAll().front());
LOG_DEBUG(getLogger("SerializedPlanParser"), "Try to read ({}) instead of empty header", one_column_name_type.front().dump());
return toSampleBlock(one_column_name_type);
}
DB::Block MergeTreeRelParser::replaceDeltaNameIfNeeded(const DB::Block & output)
{
DB::ColumnsWithTypeAndName read_block;
NameSet names;
for (const auto & column : output)
{
if (getDeltaMetaColumnMap().contains(column.name))
{
if (auto tuple = getDeltaMetaColumnMap().at(column.name); std::get<0>(tuple).has_value())
{
if (!names.contains(std::get<0>(tuple).value()))
{
read_block.emplace_back(ColumnWithTypeAndName(std::get<1>(tuple), std::get<0>(tuple).value()));
names.insert(std::get<0>(tuple).value());
}
}
}
else
{
read_block.emplace_back(column);
names.insert(column.name);
}
}
return DB::Block(std::move(read_block));
}
void MergeTreeRelParser::recoverDeltaNameIfNeeded(
DB::QueryPlan & plan, const DB::Block & output, const MergeTreeTableInstance & merge_tree_table)
{
const auto & header = *plan.getCurrentHeader();
DB::ActionsDAG actions_dag(header.getNamesAndTypesList());
// Use 'Names' to make sure the orders of the output
Names names;
names.reserve(output.getColumns().size());
bool need_recover = false;
for (const auto & column : output)
{
if (getDeltaMetaColumnMap().contains(column.name))
{
need_recover = true;
auto tuple = getDeltaMetaColumnMap().at(column.name);
ReplaceDeltaNodeFunc func = std::get<2>(tuple);
func(actions_dag, merge_tree_table, context);
}
names.push_back(column.name);
}
if (!need_recover)
return;
actions_dag.removeUnusedActions(names);
auto step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), std::move(actions_dag));
step->setStepDescription("Recover virtual columns");
steps.emplace_back(step.get());
plan.addStep(std::move(step));
}
void MergeTreeRelParser::replaceNodeWithCaseSensitive(DB::Block & read_block, SparkStorageMergeTreePtr storage)
{
// case_insensitive_matching
if (spark_sql_config.caseSensitive)
return;
auto all = storage->getInMemoryMetadataPtr()->getColumns().getNamesOfPhysical();
std::ranges::for_each(
read_block,
[&all](ColumnWithTypeAndName & column)
{
const auto found
= std::ranges::find_if(all, [&column](const auto & name) -> bool { return boost::iequals(column.name, name); });
if (found != all.end())
column.name = *found;
});
}
void MergeTreeRelParser::recoverNodeWithCaseSensitive(DB::QueryPlan & query_plan, const DB::Block & output)
{
if (spark_sql_config.caseSensitive)
return;
const auto & read_Header = *query_plan.getCurrentHeader();
NameToNameMap names;
names.reserve(output.columns());
for (const auto & elem : output.getColumnsWithTypeAndName())
names[Poco::toLower(elem.name)] = elem.name;
DB::NamesWithAliases aliases;
aliases.reserve(read_Header.columns());
bool need_alias = false;
for (const auto & elem : read_Header)
{
if (auto lower_name = Poco::toLower(elem.name); names.contains(lower_name))
{
if (!need_alias && !boost::equals(elem.name, names[lower_name]))
need_alias = true;
aliases.emplace_back(DB::NameWithAlias(elem.name, names[lower_name]));
}
else
{
aliases.emplace_back(DB::NameWithAlias(elem.name, elem.name));
}
}
if (!need_alias)
return;
DB::ActionsDAG actions_dag{blockToRowType(*query_plan.getCurrentHeader())};
actions_dag.project(aliases);
auto expression_step = std::make_unique<DB::ExpressionStep>(query_plan.getCurrentHeader(), std::move(actions_dag));
expression_step->setStepDescription("Rename MergeTree Output(Cause: case sensitive)");
steps.emplace_back(expression_step.get());
query_plan.addStep(std::move(expression_step));
}
DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const substrait::ReadRel::ExtensionTable & extension_table)
{
MergeTreeTableInstance merge_tree_table(extension_table);
// ignore snapshot id for a query
merge_tree_table.snapshot_id = "";
auto storage = merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
const DB::Block output = parseMergeTreeOutput(rel, storage);
const bool has_delta_internal_is_row_deleted = DeltaVirtualMeta::hasMetaColumns(output);
DB::Block read_block = replaceDeltaNameIfNeeded(output);
replaceNodeWithCaseSensitive(read_block, storage);
std::vector<DataPartPtr> selected_parts = StorageMergeTreeFactory::getDataPartsByNames(
storage->getStorageID(), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());
for (const auto & [name, sizes] : storage->getColumnSizes())
column_sizes[name] = sizes.data_compressed;
auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
auto names_and_types_list = read_block.getNamesAndTypesList();
auto query_info = buildQueryInfo(names_and_types_list);
std::set<String> non_nullable_columns;
if (rel.has_filter())
{
NonNullableColumnsResolver non_nullable_columns_resolver(read_block, parser_context, rel.filter());
non_nullable_columns = non_nullable_columns_resolver.resolve();
query_info->prewhere_info = parsePreWhereInfo(rel.filter(), read_block);
}
auto read_step = storage->reader.readFromParts(
RangesInDataParts({selected_parts}),
storage->getMutationsSnapshot({}),
names_and_types_list.getNames(),
storage_snapshot,
*query_info,
context,
context->getSettingsRef()[Setting::max_block_size],
1);
auto * source_step_with_filter = static_cast<SourceStepWithFilterBase *>(read_step.get());
if (const auto & storage_preWhere_info = query_info->prewhere_info)
{
source_step_with_filter->addFilter(storage_preWhere_info->prewhere_actions.clone(), storage_preWhere_info->prewhere_column_name);
source_step_with_filter->applyFilters();
}
auto ranges = merge_tree_table.extractRange(selected_parts);
if (settingsEqual(context->getSettingsRef(), "enabled_driver_filter_mergetree_index", "true"))
SparkStorageMergeTree::analysisPartsByRanges(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges);
else
SparkStorageMergeTree::wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges);
steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));
if (!non_nullable_columns.empty())
{
const auto & input_header = *query_plan->getCurrentHeader();
std::erase_if(non_nullable_columns, [input_header](auto item) -> bool { return !input_header.has(item); });
if (auto * remove_null_step = PlanUtil::addRemoveNullableStep(*query_plan, parser_context->queryContext(), non_nullable_columns))
steps.emplace_back(remove_null_step);
}
recoverNodeWithCaseSensitive(*query_plan, output);
recoverDeltaNameIfNeeded(*query_plan, output, merge_tree_table);
// set '_delta_internal_is_row_deleted' values
if (has_delta_internal_is_row_deleted)
{
auto filling_row_deleted_step = std::make_unique<FillingDeltaInternalRowDeletedStep>(query_plan->getCurrentHeader(), merge_tree_table, context);
filling_row_deleted_step->setStepDescription("FillingDeltaInternalRowDeleted");
query_plan->addStep(std::move(filling_row_deleted_step));
}
return query_plan;
}
PrewhereInfoPtr MergeTreeRelParser::parsePreWhereInfo(const substrait::Expression & rel, const Block & input)
{
std::string filter_name;
auto prewhere_info = std::make_shared<PrewhereInfo>();
prewhere_info->prewhere_actions = optimizePrewhereAction(rel, filter_name, input);
prewhere_info->prewhere_column_name = filter_name;
prewhere_info->need_filter = true;
prewhere_info->remove_prewhere_column = true;
for (const auto & name : input.getNames())
prewhere_info->prewhere_actions.tryRestoreColumn(name);
return prewhere_info;
}
DB::ActionsDAG MergeTreeRelParser::optimizePrewhereAction(const substrait::Expression & rel, std::string & filter_name, const Block & block)
{
Conditions res;
std::set<Int64> pk_positions;
analyzeExpressions(res, rel, pk_positions, block);
Int64 min_valid_pk_pos = -1;
for (auto pk_pos : pk_positions)
{
if (pk_pos != min_valid_pk_pos + 1)
break;
min_valid_pk_pos = pk_pos;
}
// TODO need to test
for (auto & cond : res)
if (cond.min_position_in_primary_key > min_valid_pk_pos)
cond.min_position_in_primary_key = std::numeric_limits<Int64>::max() - 1;
// filter less size column first
res.sort();
ActionsDAG filter_action{block.getNamesAndTypesList()};
if (res.size() == 1)
{
parseToAction(filter_action, res.back().node, filter_name);
}
else
{
DB::ActionsDAG::NodeRawConstPtrs args;
for (const Condition & cond : res)
{
String ignore;
parseToAction(filter_action, cond.node, ignore);
args.emplace_back(&filter_action.getNodes().back());
}
auto function_builder = FunctionFactory::instance().get("and", context);
std::string args_name = join(args, ',');
filter_name = +"and(" + args_name + ")";
const auto * and_function = &filter_action.addFunction(function_builder, args, filter_name);
filter_action.addOrReplaceInOutputs(*and_function);
}
filter_action.removeUnusedActions(Names{filter_name}, false, true);
return filter_action;
}
void MergeTreeRelParser::parseToAction(ActionsDAG & filter_action, const substrait::Expression & rel, std::string & filter_name) const
{
if (rel.has_scalar_function())
{
const auto * node = expression_parser->parseFunction(rel.scalar_function(), filter_action, true);
filter_name = node->result_name;
}
else
{
const auto * in_node = parseExpression(filter_action, rel);
filter_action.addOrReplaceInOutputs(*in_node);
filter_name = in_node->result_name;
}
}
void MergeTreeRelParser::analyzeExpressions(
Conditions & res, const substrait::Expression & rel, std::set<Int64> & pk_positions, const Block & block)
{
if (rel.has_scalar_function() && getCHFunctionName(rel.scalar_function()) == "and")
{
int arguments_size = rel.scalar_function().arguments_size();
for (int i = 0; i < arguments_size; ++i)
{
auto argument = rel.scalar_function().arguments(i);
analyzeExpressions(res, argument.value(), pk_positions, block);
}
}
else
{
Condition cond(rel);
collectColumns(rel, cond.table_columns, block);
cond.columns_size = getColumnsSize(cond.table_columns);
// TODO: get primary_key_names
const NameToIndexMap primary_key_names_positions;
cond.min_position_in_primary_key = findMinPosition(cond.table_columns, primary_key_names_positions);
pk_positions.emplace(cond.min_position_in_primary_key);
res.emplace_back(std::move(cond));
}
}
UInt64 MergeTreeRelParser::getColumnsSize(const NameSet & columns)
{
UInt64 size = 0;
for (const auto & column : columns)
if (column_sizes.contains(column))
size += column_sizes[column];
return size;
}
void MergeTreeRelParser::collectColumns(const substrait::Expression & rel, NameSet & columns, const Block & block)
{
switch (rel.rex_type_case())
{
case substrait::Expression::RexTypeCase::kLiteral: {
return;
}
case substrait::Expression::RexTypeCase::kSelection: {
auto idx = SubstraitParserUtils::getStructFieldIndex(rel);
if (!idx)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Selection node must have direct reference.");
if (const Names names = block.getNames(); names.size() > *idx)
columns.insert(names[*idx]);
return;
}
case substrait::Expression::RexTypeCase::kCast: {
const auto & input = rel.cast().input();
collectColumns(input, columns, block);
return;
}
case substrait::Expression::RexTypeCase::kIfThen: {
const auto & if_then = rel.if_then();
auto condition_nums = if_then.ifs_size();
for (int i = 0; i < condition_nums; ++i)
{
const auto & ifs = if_then.ifs(i);
collectColumns(ifs.if_(), columns, block);
collectColumns(ifs.then(), columns, block);
}
return;
}
case substrait::Expression::RexTypeCase::kScalarFunction: {
for (const auto & arg : rel.scalar_function().arguments())
collectColumns(arg.value(), columns, block);
return;
}
case substrait::Expression::RexTypeCase::kSingularOrList: {
const auto & options = rel.singular_or_list().options();
/// options is empty always return false
if (options.empty())
return;
collectColumns(rel.singular_or_list().value(), columns, block);
return;
}
default:
throw Exception(
ErrorCodes::UNKNOWN_TYPE,
"Unsupported spark expression type {} : {}",
magic_enum::enum_name(rel.rex_type_case()),
rel.DebugString());
}
}
String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func) const
{
return expression_parser->getFunctionName(substrait_func);
}
String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_rel)
{
MergeTreeTableInstance merge_tree_table(read_rel.advanced_extension().enhancement());
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
auto storage = merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
auto names_and_types_list = input.getNamesAndTypesList();
auto query_info = buildQueryInfo(names_and_types_list);
query_info->prewhere_info = parsePreWhereInfo(read_rel.filter(), input);
std::vector<DataPartPtr> selected_parts = StorageMergeTreeFactory::getDataPartsByNames(
StorageID(merge_tree_table.database, merge_tree_table.table), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());
auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
if (selected_parts.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
auto read_step = storage->reader.readFromParts(
RangesInDataParts({selected_parts}),
/* alter_conversions = */
storage->getMutationsSnapshot({}),
names_and_types_list.getNames(),
storage_snapshot,
*query_info,
context,
context->getSettingsRef()[Setting::max_block_size],
10); // TODO: Expect use driver cores.
auto * read_from_mergetree = static_cast<ReadFromMergeTree *>(read_step.get());
if (const auto & storage_prewhere_info = query_info->prewhere_info)
{
ActionDAGNodes filter_nodes;
filter_nodes.nodes.emplace_back(
&storage_prewhere_info->prewhere_actions.findInOutputs(storage_prewhere_info->prewhere_column_name));
read_from_mergetree->applyFilters(std::move(filter_nodes));
}
auto analysis = read_from_mergetree->getAnalysisResult();
rapidjson::StringBuffer result;
rapidjson::Writer<rapidjson::StringBuffer> writer(result);
writer.StartArray();
for (auto & parts_with_range : analysis.parts_with_ranges)
{
MarkRanges final_ranges;
for (auto & range : parts_with_range.ranges)
{
writer.StartObject();
writer.Key("part_name");
writer.String(parts_with_range.data_part->name.c_str());
writer.Key("begin");
writer.Uint(range.begin);
writer.Key("end");
writer.Uint(range.end);
writer.EndObject();
}
}
writer.EndArray();
return result.GetString();
}
}