blob: ca8f0f1acc17819c44871e14fac28c557e3d8c92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FillingDeltaInternalRowDeletedStep.h"
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Parquet/ParquetMeta.h>
#include <Storages/SubstraitSource/Delta/DeltaMeta.h>
#include <Storages/SubstraitSource/FormatFile.h>
#include <Common/BlockTypeUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace local_engine
{
static DB::ITransformingStep::Traits getTraits()
{
return DB::ITransformingStep::Traits{
{
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}};
}
FillingDeltaInternalRowDeletedStep::FillingDeltaInternalRowDeletedStep(const DB::SharedHeader & input_header, const MergeTreeTableInstance & _merge_tree_table, const DB::ContextPtr _context)
: ITransformingStep(input_header, input_header, getTraits()), merge_tree_table(_merge_tree_table), context(_context)
{
}
DB::Block FillingDeltaInternalRowDeletedStep::transformHeader(const DB::Block & input)
{
DB::Block output;
for (int i = 0; i < input.columns(); i++)
{
output.insert(input.getByPosition(i));
}
return output;
}
void FillingDeltaInternalRowDeletedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/)
{
pipeline.addSimpleTransform([&](const DB::SharedHeader & header) { return std::make_shared<FillingDeltaInternalRowDeletedTransform>(header, merge_tree_table, context); });
}
void FillingDeltaInternalRowDeletedStep::updateOutputHeader()
{
output_header = toShared(transformHeader(*input_headers.front()));
}
FillingDeltaInternalRowDeletedTransform::FillingDeltaInternalRowDeletedTransform(const DB::SharedHeader & input_header_, const MergeTreeTableInstance & merge_tree_table, const DB::ContextPtr context)
: ISimpleTransform(input_header_, input_header_, true), read_header(*input_header_)
{
for (const auto part : merge_tree_table.parts)
{
if (!part.row_index_filter_type.empty() && !part.row_index_filter_id_encoded.empty())
{
std::shared_ptr<DeltaVirtualMeta::DeltaDVBitmapConfig> bitmap_config =
DeltaVirtualMeta::DeltaDVBitmapConfig::parse_config(part.row_index_filter_id_encoded);
std::unique_ptr<DeltaDVRoaringBitmapArray> bitmap_array = std::make_unique<DeltaDVRoaringBitmapArray>();
bitmap_array->rb_read(bitmap_config->path_or_inline_dv, bitmap_config->offset, bitmap_config->size_in_bytes, context);
std::string part_path_key;
part_path_key.append(merge_tree_table.absolute_path).append("/").append(part.name);
dv_map.emplace(part_path_key, std::move(bitmap_array));
}
}
}
void FillingDeltaInternalRowDeletedTransform::transform(DB::Chunk & chunk)
{
size_t num_rows = chunk.getNumRows();
size_t deleted_row_pos = read_header.getPositionByName(DeltaVirtualMeta::DELTA_INTERNAL_IS_ROW_DELETED);
size_t part_path_key_pos = read_header.getPositionByName(FileMetaColumns::FILE_PATH);
size_t row_index_pos = read_header.getPositionByName(ParquetVirtualMeta::TMP_ROWINDEX);
const auto & input_columns = chunk.getColumns();
auto deleted_row_column_nest = DB::ColumnUInt8::create(num_rows);
auto & vec = deleted_row_column_nest->getData();
const DB::ColumnPtr & part_path_key_column = input_columns[part_path_key_pos];
const auto & part_path_key_column_data = assert_cast<const DB::ColumnLowCardinality&>(*part_path_key_column);
const DB::ColumnPtr & row_index_column = input_columns[row_index_pos];
const auto & row_index_column_data = assert_cast<const DB::ColumnUInt64&>(*row_index_column);
for (size_t i = 0; i < num_rows; ++i)
{
std::string part_name = part_path_key_column_data.getDataAt(i).toString();
if (dv_map.contains(part_name))
{
vec[i] = dv_map.at(part_name)->rb_contains(row_index_column_data.get64(i));
}
else
{
vec[i] = false;
}
}
auto nullmap_column = DB::ColumnUInt8::create(deleted_row_column_nest->size(), 0);
DB::ColumnPtr deleted_row_column = DB::ColumnNullable::create(std::move(deleted_row_column_nest), std::move(nullmap_column));
DB::Columns output_columns = input_columns;
output_columns[deleted_row_pos] = std::move(deleted_row_column);
chunk.setColumns(std::move(output_columns), num_rows);
}
}