blob: f8e3a3c914187960c181a470703cd354450b46bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ParquetFormatFile.h"
#if USE_PARQUET
#include <Core/Settings.h>
#include <DataTypes/DataTypeNullable.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <Storages/Parquet/VectorizedParquetRecordReader.h>
#include <Storages/Parquet/VirtualColumnRowIndexReader.h>
#include <Storages/SubstraitSource/Delta/DeltaMeta.h>
#include <Common/BlockTypeUtils.h>
namespace DB
{
namespace Setting
{
extern const SettingsMaxThreads max_download_threads;
extern const SettingsMaxThreads max_parsing_threads;
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TYPE;
}
}
using namespace DB;
namespace local_engine
{
struct ShouldIncludeRowGroup
{
const SubstraitInputFile & file_info;
explicit ShouldIncludeRowGroup(const SubstraitInputFile & file_info) : file_info(file_info) { }
bool operator()(UInt64 midpoint_offset) const
{
return file_info.start() <= midpoint_offset && midpoint_offset < file_info.start() + file_info.length();
}
};
namespace
{
ParquetMetaBuilder collectRequiredRowGroups(ReadBuffer & read_buffer, const SubstraitInputFile & file_info)
{
ParquetMetaBuilder result;
ShouldIncludeRowGroup should_include_row_group{file_info};
result.build(read_buffer, should_include_row_group);
return result;
}
}
using namespace ParquetVirtualMeta;
class ParquetInputFormat : public FormatFile::InputFormat
{
const Block readHeader;
const Block outputHeader;
std::unique_ptr<ColumnIndexRowRangesProvider> rowRangesProvider;
std::optional<VirtualColumnRowIndexReader> row_index_reader;
public:
ParquetInputFormat(
std::unique_ptr<ReadBuffer> read_buffer_,
const InputFormatPtr & input_,
std::unique_ptr<ColumnIndexRowRangesProvider> provider,
const Block & readHeader_,
const Block & outputHeader_)
: InputFormat(std::move(read_buffer_), input_)
, readHeader(readHeader_)
, outputHeader(outputHeader_)
, rowRangesProvider(std::move(provider))
, row_index_reader(
outputHeader.columns() > readHeader.columns()
? std::make_optional<VirtualColumnRowIndexReader>(*rowRangesProvider, getMetaColumnType(outputHeader))
: std::nullopt)
{
}
Chunk generate() override
{
if (readHeader.columns() == 0)
{
assert(outputHeader.columns());
assert(row_index_reader);
// TODO: format_settings_.parquet.max_block_size
Columns cols{row_index_reader->readBatch(8192)};
size_t rows = cols[0]->size();
return Chunk(std::move(cols), rows);
}
auto chunk = input->generate();
size_t num_rows = chunk.getNumRows();
if (row_index_reader && num_rows)
{
auto row_index_column = row_index_reader->readBatch(num_rows);
assert(outputHeader.columns() > readHeader.columns());
size_t column_pos = outputHeader.getPositionByName(TMP_ROWINDEX);
if (column_pos < chunk.getNumColumns())
chunk.addColumn(column_pos, std::move(row_index_column));
else
chunk.addColumn(std::move(row_index_column));
}
return chunk;
}
};
ParquetFormatFile::ParquetFormatFile(
const ContextPtr & context_,
const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_,
const ReadBufferBuilderPtr & read_buffer_builder_,
bool use_local_format_)
: FormatFile(context_, file_info_, read_buffer_builder_), use_pageindex_reader{use_local_format_}
{
}
void ParquetFormatFile::initialize(const ColumnIndexFilterPtr & filter)
{
column_index_filter_ = filter;
read_buffer_ = read_buffer_builder->build(file_info);
if (file_info.has_iceberg())
file_schema = ParquetMetaBuilder::collectFileSchema(context, *read_buffer_);
}
FormatFile::InputFormatPtr
ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
{
assert(read_buffer_);
bool readRowIndex = hasMetaColumns(header);
bool usePageIndexReader = (use_pageindex_reader || readRowIndex) && onlyHasFlatType(header);
auto format_settings = getFormatSettings(context);
auto read_header = toShared(DeltaVirtualMeta::removeMetaColumns(removeMetaColumns(header)));
ParquetMetaBuilder metaBuilder{
.format_settings = format_settings,
.collectPageIndex = usePageIndexReader || readRowIndex,
.collectSkipRowGroup = !usePageIndexReader,
.case_insensitive = format_settings.parquet.case_insensitive_column_matching,
.allow_missing_columns = format_settings.parquet.allow_missing_columns};
ShouldIncludeRowGroup should_include_row_group{file_info};
if (auto * seekable_in = dynamic_cast<SeekableReadBuffer *>(read_buffer_.get()))
{
// reuse the read_buffer to avoid opening the file twice.
// especially,the cost of opening a hdfs file is large.
metaBuilder.build(*seekable_in, *read_header, column_index_filter_.get(), should_include_row_group);
seekable_in->seek(0, SEEK_SET);
}
else
{
const auto in = read_buffer_builder->build(file_info);
metaBuilder.build(*in, *read_header, column_index_filter_.get(), should_include_row_group);
}
column_index_filter_.reset();
if (metaBuilder.readRowGroups.empty())
return nullptr;
auto provider = usePageIndexReader || readRowIndex ? std::make_unique<ColumnIndexRowRangesProvider>(metaBuilder) : nullptr;
auto createVectorizedFormat = [&]() -> InputFormatPtr
{
auto input = std::make_shared<VectorizedParquetBlockInputFormat>(*read_buffer_, read_header, *provider, format_settings);
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header);
};
auto createParquetBlockInputFormat = [&]() -> InputFormatPtr
{
const Settings & settings = context->getSettingsRef();
format_settings.parquet.skip_row_groups
= std::unordered_set<int>(metaBuilder.skipRowGroups.begin(), metaBuilder.skipRowGroups.end());
if (readRowIndex)
{
assert(provider);
// In the case of readRowIndex, we need to preserve the order of the rows
format_settings.parquet.preserve_order = true;
// TODO: enable filter push down again
// We need to disable fiter push down and read all row groups, so that we can get correct row index.
format_settings.parquet.filter_push_down = false;
}
auto parser_group = std::make_shared<FormatFilterInfo>(filter_actions_dag, context, nullptr);
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(context->getSettingsRef(), /*num_streams_=*/1);
auto input = std::make_shared<ParquetBlockInputFormat>(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, 8192);
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header);
};
return usePageIndexReader ? createVectorizedFormat() : createParquetBlockInputFormat();
}
std::optional<size_t> ParquetFormatFile::getTotalRows()
{
{
std::lock_guard lock(mutex);
if (total_rows)
return total_rows;
}
auto in = read_buffer_builder->build(file_info);
auto result = collectRequiredRowGroups(*in, file_info);
size_t rows = std::ranges::fold_left(
result.readRowGroups, static_cast<size_t>(0), [](size_t sum, const auto & row_group) { return sum + row_group.num_rows; });
{
std::lock_guard lock(mutex);
total_rows = rows;
return total_rows;
}
}
bool ParquetFormatFile::onlyHasFlatType(const Block & header)
{
return std::ranges::all_of(
header,
[](ColumnWithTypeAndName const & col)
{
const DataTypePtr type_not_nullable = removeNullable(col.type);
const WhichDataType which(type_not_nullable);
return !isArray(which) && !isMap(which) && !isTuple(which);
});
}
}
#endif