blob: 63705a0a9350c1f0e8af4a38e56f9bbd169e8cfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SparkMergeTreeMeta.h"
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/ExpressionActions.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MetaDataHelper.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <google/protobuf/util/json_util.h>
#include <rapidjson/document.h>
#include <write_optimization.pb.h>
#include <Poco/StringTokenizer.h>
#include <Common/DebugUtils.h>
using namespace DB;
using namespace local_engine;
namespace
{
// set skip index for each column if specified
void setSecondaryIndex(
const DB::NamesAndTypesList & columns,
const ContextPtr & context,
const MergeTreeTable & table,
std::shared_ptr<DB::StorageInMemoryMetadata> metadata)
{
std::unordered_set<std::string> minmax_index_cols;
std::unordered_set<std::string> bf_index_cols;
std::unordered_set<std::string> set_index_cols;
{
Poco::StringTokenizer tokenizer(table.minmax_index_key, ",");
for (const auto & token : tokenizer)
minmax_index_cols.insert(token);
}
{
Poco::StringTokenizer tokenizer(table.bf_index_key, ",");
for (const auto & token : tokenizer)
bf_index_cols.insert(token);
}
{
Poco::StringTokenizer tokenizer(table.set_index_key, ",");
for (const auto & token : tokenizer)
set_index_cols.insert(token);
}
std::stringstream ss;
bool first = true;
for (const auto & column : columns)
{
if (minmax_index_cols.contains(column.name))
{
if (!first)
ss << ", ";
else
first = false;
ss << "_minmax_" << column.name << " " << column.name << " TYPE minmax GRANULARITY 1";
}
if (bf_index_cols.contains(column.name))
{
if (!first)
ss << ", ";
else
first = false;
ss << "_bloomfilter_" << column.name << " " << column.name << " TYPE bloom_filter GRANULARITY 1";
}
if (set_index_cols.contains(column.name))
{
if (!first)
ss << ", ";
else
first = false;
ss << "_set_" << column.name << " " << column.name << " TYPE set(0) GRANULARITY 1";
}
}
metadata->setSecondaryIndices(IndicesDescription::parse(ss.str(), metadata->getColumns(), context));
}
void parseTableConfig(MergeTreeTableSettings & settings, const String & config_json)
{
rapidjson::Document doc;
doc.Parse(config_json.c_str());
if (doc.HasMember("storage_policy"))
settings.storage_policy = doc["storage_policy"].GetString();
}
std::shared_ptr<DB::StorageInMemoryMetadata>
doBuildMetadata(const DB::NamesAndTypesList & columns, const ContextPtr & context, const MergeTreeTable & table)
{
std::shared_ptr<DB::StorageInMemoryMetadata> metadata = std::make_shared<DB::StorageInMemoryMetadata>();
ColumnsDescription columns_description;
for (const auto & item : columns)
columns_description.add(ColumnDescription(item.name, item.type));
metadata->setColumns(std::move(columns_description));
setSecondaryIndex(columns, context, table, metadata);
metadata->partition_key = KeyDescription::buildEmptyKey();
metadata->sorting_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context, true);
if (table.primary_key.empty())
if (table.order_by_key != MergeTreeTable::TUPLE)
metadata->primary_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context, true);
else
metadata->primary_key.expression = std::make_shared<ExpressionActions>(ActionsDAG{});
else
metadata->primary_key = KeyDescription::parse(table.primary_key, metadata->getColumns(), context, true);
return metadata;
}
void doParseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString & in)
{
assertString("MergeTree;", in);
readString(table.database, in);
assertChar('\n', in);
readString(table.table, in);
assertChar('\n', in);
readString(table.snapshot_id, in);
assertChar('\n', in);
String schema;
readString(schema, in);
table.schema = JsonStringToMessage<substrait::NamedStruct>(schema);
assertChar('\n', in);
readString(table.order_by_key, in);
assertChar('\n', in);
readString(table.primary_key, in);
assertChar('\n', in);
readString(table.low_card_key, in);
assertChar('\n', in);
readString(table.minmax_index_key, in);
assertChar('\n', in);
readString(table.bf_index_key, in);
assertChar('\n', in);
readString(table.set_index_key, in);
assertChar('\n', in);
readString(table.relative_path, in);
assertChar('\n', in);
readString(table.absolute_path, in);
assertChar('\n', in);
String json;
readString(json, in);
parseTableConfig(table.table_configs, json);
assertChar('\n', in);
}
}
namespace local_engine
{
SparkStorageMergeTreePtr MergeTreeTable::getStorage(ContextMutablePtr context) const
{
const DB::Block header = TypeParser::buildBlockFromNamedStruct(schema, low_card_key);
const auto metadata = buildMetaData(header, context);
return StorageMergeTreeFactory::getStorage(
StorageID(database, table),
snapshot_id,
*this,
[&]() -> SparkStorageMergeTreePtr
{
auto custom_storage_merge_tree = std::make_shared<SparkWriteStorageMergeTree>(*this, *metadata, context);
return custom_storage_merge_tree;
});
}
SparkStorageMergeTreePtr MergeTreeTable::copyToDefaultPolicyStorage(const ContextMutablePtr & context) const
{
MergeTreeTable merge_tree_table{*this};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
merge_tree_table.snapshot_id = "";
merge_tree_table.table_configs.storage_policy = "";
merge_tree_table.relative_path = merge_tree_table.relative_path + "_" + temp_uuid_str;
return merge_tree_table.getStorage(context);
}
SparkStorageMergeTreePtr MergeTreeTable::copyToVirtualStorage(const ContextMutablePtr & context) const
{
MergeTreeTable merge_tree_table{*this};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
merge_tree_table.snapshot_id = "";
return merge_tree_table.getStorage(context);
}
MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info) : MergeTreeTable()
{
ReadBufferFromString in(info);
doParseMergeTreeTableString(*this, in);
while (!in.eof())
{
MergeTreePart part;
readString(part.name, in);
assertChar('\n', in);
readIntText(part.begin, in);
assertChar('\n', in);
readIntText(part.end, in);
assertChar('\n', in);
readString(part.row_index_filter_type, in);
assertChar('\n', in);
readString(part.row_index_filter_id_encoded, in);
assertChar('\n', in);
parts.emplace_back(part);
}
}
MergeTreeTableInstance::MergeTreeTableInstance(const google::protobuf::Any & any) : MergeTreeTableInstance(toString(any))
{
}
MergeTreeTableInstance::MergeTreeTableInstance(const substrait::ReadRel::ExtensionTable & extension_table)
: MergeTreeTableInstance(extension_table.detail())
{
debug::dumpMessage(extension_table, "merge_tree_table");
}
SparkStorageMergeTreePtr MergeTreeTableInstance::restoreStorage(const ContextMutablePtr & context) const
{
auto result = getStorage(context);
restoreMetaData(result, *this, *context);
return result;
}
std::shared_ptr<DB::StorageInMemoryMetadata> MergeTreeTable::buildMetaData(const DB::Block & header, const ContextPtr & context) const
{
return doBuildMetadata(header.getNamesAndTypesList(), context, *this);
}
MergeTreeTable::MergeTreeTable(const local_engine::Write & write, const substrait::NamedStruct & table_schema)
{
assert(write.has_mergetree());
const Write_MergeTreeWrite & merge_tree_write = write.mergetree();
database = merge_tree_write.database();
table = merge_tree_write.table();
snapshot_id = merge_tree_write.snapshot_id();
schema = table_schema;
order_by_key = merge_tree_write.order_by_key();
low_card_key = merge_tree_write.low_card_key();
minmax_index_key = merge_tree_write.minmax_index_key();
bf_index_key = merge_tree_write.bf_index_key();
set_index_key = merge_tree_write.set_index_key();
primary_key = merge_tree_write.primary_key();
relative_path = merge_tree_write.relative_path();
absolute_path = merge_tree_write.absolute_path(); // always empty, see createNativeWrite in java
table_configs.storage_policy = merge_tree_write.storage_policy();
}
std::unique_ptr<SelectQueryInfo> buildQueryInfo(NamesAndTypesList & names_and_types_list)
{
std::unique_ptr<SelectQueryInfo> query_info = std::make_unique<SelectQueryInfo>();
query_info->query = std::make_shared<ASTSelectQuery>();
auto syntax_analyzer_result = std::make_shared<TreeRewriterResult>(names_and_types_list);
syntax_analyzer_result->analyzed_join = std::make_shared<TableJoin>();
query_info->syntax_analyzer_result = syntax_analyzer_result;
return query_info;
}
std::unordered_set<String> MergeTreeTableInstance::getPartNames() const
{
std::unordered_set<String> names;
for (const auto & part : parts)
names.emplace(part.name);
return names;
}
RangesInDataParts MergeTreeTableInstance::extractRange(DataPartsVector parts_vector) const
{
std::unordered_map<String, DataPartPtr> name_index;
std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) { name_index.emplace(part->name, part); });
RangesInDataParts ranges_in_data_parts;
std::ranges::transform(
parts,
std::inserter(ranges_in_data_parts, ranges_in_data_parts.end()),
[&](const MergeTreePart & part)
{
RangesInDataPart ranges_in_data_part{name_index.at(part.name),nullptr, 0, 0, {MarkRange(part.begin, part.end)}};
return ranges_in_data_part;
});
return ranges_in_data_parts;
}
bool sameColumns(const substrait::NamedStruct & left, const substrait::NamedStruct & right)
{
if (left.names_size() != right.names_size())
return false;
std::unordered_map<String, substrait::Type::KindCase> map;
for (size_t i = 0; i < left.names_size(); i++)
map.emplace(left.names(i), left.struct_().types(i).kind_case());
for (size_t i = 0; i < right.names_size(); i++)
if (!map.contains(right.names(i)) || map[right.names(i)] != right.struct_().types(i).kind_case())
return false;
return true;
}
bool MergeTreeTable::sameTable(const MergeTreeTable & other) const
{
return database == other.database && table == other.table && snapshot_id == other.snapshot_id && sameColumns(schema, other.schema)
&& order_by_key == other.order_by_key && low_card_key == other.low_card_key && minmax_index_key == other.minmax_index_key
&& bf_index_key == other.bf_index_key && set_index_key == other.set_index_key && primary_key == other.primary_key
&& relative_path == other.relative_path && absolute_path == other.absolute_path
&& table_configs.storage_policy == other.table_configs.storage_policy;
}
}