blob: ecf59f9fea96cb0b88e6c90aa9e498b10ed6bdc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "WriteRelParser.h"
#include <Core/Settings.h>
#include <DataTypes/DataTypeTuple.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Parser/TypeParser.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <QueryPipeline/Chain.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/MergeTree/SparkMergeTreeSink.h>
#include <Storages/MergeTree/SparkStorageMergeTree.h>
#include <Storages/Output/NormalFileWriter.h>
#include <google/protobuf/wrappers.pb.h>
#include <substrait/algebra.pb.h>
#include <substrait/type.pb.h>
#include <write_optimization.pb.h>
#include <Common/GlutenSettings.h>
namespace DB::Setting
{
extern const SettingsUInt64 min_insert_block_size_rows;
extern const SettingsUInt64 min_insert_block_size_bytes;
}
using namespace local_engine;
using namespace DB;
namespace
{
DB::ProcessorPtr make_sink(
const DB::ContextPtr & context,
const DB::Names & partition_by,
const DB::SharedHeader & input_header,
const DB::SharedHeader & output_header,
const std::string & base_path,
const FileNameGenerator & generator,
const std::string & format_hint,
const std::shared_ptr<WriteStats> & stats)
{
bool no_bucketed = !SparkPartitionedBaseSink::isBucketedWrite(*input_header);
if (partition_by.empty() && no_bucketed)
{
return std::make_shared<SubstraitFileSink>(
context, base_path, "", false, generator.generate(), format_hint, input_header, stats, DeltaStats{input_header->columns()});
}
return std::make_shared<SubstraitPartitionedFileSink>(
context, partition_by, input_header, output_header, base_path, generator, format_hint, stats);
}
DB::ExpressionActionsPtr create_rename_action(const DB::Block & input, const DB::Block & output)
{
ActionsDAG actions_dag{blockToRowType(input)};
actions_dag.project(buildNamesWithAliases(input, output));
return std::make_shared<DB::ExpressionActions>(std::move(actions_dag));
}
DB::ExpressionActionsPtr create_project_action(const DB::Block & input, const DB::Block & output)
{
DB::ColumnsWithTypeAndName final_cols;
std::ranges::transform(
output,
std::back_inserter(final_cols),
[](const DB::ColumnWithTypeAndName & out_ocl)
{
const auto out_type = out_ocl.type;
return DB::ColumnWithTypeAndName(out_type->createColumn(), out_type, out_ocl.name);
});
assert(final_cols.size() == output.columns());
const auto & original_cols = input.getColumnsWithTypeAndName();
ActionsDAG final_project = ActionsDAG::makeConvertingActions(original_cols, final_cols, ActionsDAG::MatchColumnsMode::Position);
return std::make_shared<DB::ExpressionActions>(std::move(final_project));
}
void adjust_output(const DB::QueryPipelineBuilderPtr & builder, const DB::Block & output)
{
const auto input = builder->getHeader();
if (input.columns() != output.columns())
{
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"Mismatch result columns size, input size is {}, but output size is {}",
input.columns(),
output.columns());
}
DB::ExpressionActionsPtr convert_action;
if (sameType(input, output))
{
if (!sameName(input, output))
convert_action = create_rename_action(input, output); // name_is_different
}
else
convert_action = create_project_action(input, output); // type_is_different
if (!convert_action)
return;
builder->addSimpleTransform(
[&](const DB::SharedHeader & cur_header, const DB::QueryPipelineBuilder::StreamType stream_type) -> DB::ProcessorPtr
{
if (stream_type != DB::QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<DB::ConvertingTransform>(cur_header, convert_action);
});
}
void addMergeTreeSinkTransform(
const DB::ContextPtr & context,
const DB::QueryPipelineBuilderPtr & builder,
const MergeTreeTable & merge_tree_table,
const DB::SharedHeader & header,
const DB::Names & partition_by)
{
Chain chain;
//
auto stats = MergeTreeStats::create(header, partition_by);
chain.addSink(stats);
//
SparkMergeTreeWriteSettings write_settings{context};
auto sink = partition_by.empty()
? SparkMergeTreeSink::create(merge_tree_table, write_settings, context->getQueryContext(), DeltaStats{header->columns()}, {stats})
: std::make_shared<SparkMergeTreePartitionedFileSink>(header, partition_by, merge_tree_table, write_settings, context, stats);
chain.addSource(sink);
builder->addChain(std::move(chain));
}
void addNormalFileWriterSinkTransform(
const DB::ContextPtr & context,
const DB::QueryPipelineBuilderPtr & builder,
const std::string & format_hint,
const DB::SharedHeader & output,
const DB::Names & partition_by)
{
GlutenWriteSettings write_settings = GlutenWriteSettings::get(context);
if (write_settings.task_write_tmp_dir.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Write Pipeline need inject temp directory.");
if (write_settings.task_write_filename_pattern.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Write Pipeline need inject file pattern.");
FileNameGenerator generator(write_settings.task_write_filename_pattern);
auto stats = WriteStats::create(output, partition_by);
builder->addSimpleTransform(
[&](const SharedHeader & cur_header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return make_sink(context, partition_by, cur_header, output, write_settings.task_write_tmp_dir, generator, format_hint, stats);
});
builder->addSimpleTransform(
[&](const SharedHeader &, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return stats;
});
}
}
namespace local_engine
{
IMPLEMENT_GLUTEN_SETTINGS(GlutenWriteSettings, WRITE_RELATED_SETTINGS)
void addSinkTransform(const DB::ContextPtr & context, const substrait::WriteRel & write_rel, const DB::QueryPipelineBuilderPtr & builder)
{
assert(write_rel.has_named_table());
const substrait::NamedObjectWrite & named_table = write_rel.named_table();
local_engine::Write write;
if (!named_table.advanced_extension().optimization().UnpackTo(&write))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to unpack write optimization with local_engine::Write.");
assert(write.has_common());
const substrait::NamedStruct & table_schema = write_rel.table_schema();
auto partition_indexes = write.common().partition_col_index();
if (write.has_mergetree())
{
MergeTreeTable merge_tree_table(write, table_schema);
auto output = toShared(TypeParser::buildBlockFromNamedStruct(table_schema, merge_tree_table.low_card_key));
adjust_output(builder, *output);
builder->addSimpleTransform(
[&](const SharedHeader & in_header) -> ProcessorPtr { return std::make_shared<MaterializingTransform>(in_header, false); });
const auto partition_by = collect_partition_cols(*output, table_schema, partition_indexes);
GlutenWriteSettings write_settings = GlutenWriteSettings::get(context);
if (write_settings.task_write_tmp_dir.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "MergeTree Write Pipeline need inject relative path.");
if (!merge_tree_table.relative_path.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Non empty relative path for MergeTree table in pipeline mode.");
merge_tree_table.relative_path = write_settings.task_write_tmp_dir;
addMergeTreeSinkTransform(context, builder, merge_tree_table, output, partition_by);
}
else
{
auto output = toShared(TypeParser::buildBlockFromNamedStruct(table_schema));
adjust_output(builder, *output);
const auto partition_by = collect_partition_cols(*output, table_schema, partition_indexes);
addNormalFileWriterSinkTransform(context, builder, write.common().format(), output, partition_by);
}
}
DB::Names collect_partition_cols(const DB::Block & header, const substrait::NamedStruct & struct_, const PartitionIndexes & partition_by)
{
if (partition_by.empty())
{
assert(std::ranges::all_of(
struct_.column_types(), [](const int32_t type) { return type != ::substrait::NamedStruct::PARTITION_COL; }));
return {};
}
assert(struct_.column_types_size() == header.columns());
assert(struct_.column_types_size() == struct_.struct_().types_size());
DB::Names result;
result.reserve(partition_by.size());
for (auto idx : partition_by)
{
assert(idx >= 0 && idx < header.columns());
assert(struct_.column_types(idx) == ::substrait::NamedStruct::PARTITION_COL);
result.emplace_back(header.getByPosition(idx).name);
}
return result;
}
}