blob: a3ee72d409d08896081dc0fd3d112ea62cdb7fb0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FileReader.h"
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/ReadBufferFromString.h>
#include <Parser/SubstraitParserUtils.h>
#include <Storages/SubstraitSource/Delta/DeltaMeta.h>
#include <Storages/SubstraitSource/Delta/DeltaReader.h>
#include <Storages/SubstraitSource/Iceberg/IcebergReader.h>
#include <Storages/SubstraitSource/ParquetFormatFile.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <Common/CHUtil.h>
#include <Common/Exception.h>
#include <Common/GlutenStringUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
extern const int LOGICAL_ERROR;
}
}
namespace local_engine
{
DB::Columns BaseReader::addVirtualColumn(DB::Chunk dataChunk, size_t rowNum) const
{
// dataChunk may be empty
const size_t rows = dataChunk.empty() ? rowNum : dataChunk.getNumRows();
assert(rows && "read 0 rows from file");
auto read_columns = dataChunk.detachColumns();
const auto & columns = getHeader().getColumnsWithTypeAndName();
const auto & normalized_partition_values = file->getFileNormalizedPartitionValues();
DB::Columns res_columns;
res_columns.reserve(columns.size());
std::ranges::transform(
columns,
std::back_inserter(res_columns),
[&](const auto & column) -> DB::ColumnPtr
{
if (auto it = normalized_partition_values.find(boost::to_lower_copy(column.name)); it != normalized_partition_values.end())
return createPartitionColumn(it->second, column.type, rows);
if (file->fileMetaColumns().virtualColumn(column.name))
return file->fileMetaColumns().createMetaColumn(column.name, column.type, rows);
if (readHeader.has(column.name))
return read_columns[readHeader.getPositionByName(column.name)];
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "Not found column = {} when reading file: {}.", column.name, file->getURIPath());
});
return res_columns;
}
DB::ColumnPtr BaseReader::createConstColumn(DB::DataTypePtr data_type, const DB::Field & field, size_t rows)
{
auto nested_type = DB::removeNullable(data_type);
auto column = nested_type->createColumnConst(rows, field);
if (data_type->isNullable())
column = DB::ColumnNullable::create(column, DB::ColumnUInt8::create(rows, 0));
return column;
}
DB::ColumnPtr BaseReader::createPartitionColumn(const String & value, const DB::DataTypePtr & type, size_t rows)
{
if (GlutenStringUtils::isNullPartitionValue(value))
{
if (!type->isNullable())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Partition column is null value,but column data type is not nullable.");
auto nested_type = static_cast<const DB::DataTypeNullable &>(*type).getNestedType();
auto column = nested_type->createColumnConstWithDefaultValue(rows);
return DB::ColumnNullable::create(column, DB::ColumnUInt8::create(rows, 1));
}
else
{
auto field = buildFieldFromString(value, type);
return createConstColumn(type, field, rows);
}
}
#define BUILD_INT_FIELD(type) \
[](DB::ReadBuffer & in, const String &) \
{ \
type value = 0; \
DB::readIntText(value, in); \
return DB::Field(value); \
}
#define BUILD_FP_FIELD(type) \
[](DB::ReadBuffer & in, const String &) \
{ \
type value = 0.0; \
DB::readFloatText(value, in); \
return DB::Field(value); \
}
DB::Field BaseReader::buildFieldFromString(const String & str_value, DB::DataTypePtr type)
{
using FieldBuilder = std::function<DB::Field(DB::ReadBuffer &, const String &)>;
static std::map<std::string, FieldBuilder> field_builders
= {{"Int8", BUILD_INT_FIELD(Int8)},
{"Int16", BUILD_INT_FIELD(Int16)},
{"Int32", BUILD_INT_FIELD(Int32)},
{"Int64", BUILD_INT_FIELD(Int64)},
{"Float32", BUILD_FP_FIELD(Float32)},
{"Float64", BUILD_FP_FIELD(Float64)},
{"String", [](DB::ReadBuffer &, const String & val) { return DB::Field(val); }},
{"Date",
[](DB::ReadBuffer & in, const String &)
{
DayNum value;
readDateText(value, in);
return DB::Field(value);
}},
{"Date32",
[](DB::ReadBuffer & in, const String &)
{
ExtendedDayNum value;
readDateText(value, in);
return DB::Field(value.toUnderType());
}},
{"Bool",
[](DB::ReadBuffer & in, const String &)
{
bool value;
readBoolTextWord(value, in, true);
return DB::Field(value);
}},
{"DateTime64(6)",
[](DB::ReadBuffer &, const String & s)
{
std::string decoded; // s: "2023-07-12 05%3A05%3A33.798" (spark encoded it) => decoded: "2023-07-12 05:05:33.798"
Poco::URI::decode(s, decoded);
std::string to_read;
if (decoded.length() > 23) // we see cases when spark mistakely? encode the URI twice, so we need to decode twice
Poco::URI::decode(decoded, to_read);
else
to_read = decoded;
DB::ReadBufferFromString read_buffer(to_read);
DB::DateTime64 value;
DB::readDateTime64Text(value, 6, read_buffer);
return DB::Field(value);
}}
};
auto nested_type = DB::removeNullable(type);
DB::ReadBufferFromString read_buffer(str_value);
auto it = field_builders.find(nested_type->getName());
if (it == field_builders.end())
{
DB::WhichDataType which(nested_type->getTypeId());
if (which.isDecimal32())
{
const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal32> &>(*nested_type);
DB::Decimal32 value = dataTypeDecimal.parseFromString(str_value);
return DB::DecimalField<DB::Decimal32>(value, dataTypeDecimal.getScale());
}
else if (which.isDecimal64())
{
const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal64> &>(*nested_type);
DB::Decimal64 value = dataTypeDecimal.parseFromString(str_value);
return DB::DecimalField<DB::Decimal64>(value, dataTypeDecimal.getScale());
}
else if (which.isDecimal128())
{
const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal128> &>(*nested_type);
DB::Decimal128 value = dataTypeDecimal.parseFromString(str_value);
return DB::DecimalField<DB::Decimal128>(value, dataTypeDecimal.getScale());
}
else if (which.isDecimal256())
{
const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal256> &>(*nested_type);
DB::Decimal256 value = dataTypeDecimal.parseFromString(str_value);
return DB::DecimalField<DB::Decimal256>(value, dataTypeDecimal.getScale());
}
throw DB::Exception(DB::ErrorCodes::UNKNOWN_TYPE, "Unsupported data type {}", nested_type->getName());
}
return it->second(read_buffer, str_value);
}
ConstColumnsFileReader::ConstColumnsFileReader(const FormatFilePtr & file_, const DB::Block & header_, size_t blockSize)
: BaseReader(file_, {}, header_), remained_rows(file->getTotalRows().value()), block_size(blockSize)
{
}
bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
{
if (isCancelled())
return false;
if (!remained_rows)
return false;
size_t to_read_rows = 0;
if (remained_rows < block_size)
{
to_read_rows = remained_rows;
remained_rows = 0;
}
else
{
to_read_rows = block_size;
remained_rows -= block_size;
}
/// If the original output header is empty, build a block to represent the row count.
DB::Columns res_columns
= getHeader().columns() > 0 ? addVirtualColumn({}, to_read_rows) : BlockUtil::buildRowCountChunk(to_read_rows).detachColumns();
chunk = DB::Chunk(std::move(res_columns), to_read_rows);
return true;
}
NormalFileReader::NormalFileReader(
const FormatFilePtr & file_,
const DB::Block & to_read_header_,
const DB::Block & output_header_,
const FormatFile::InputFormatPtr & input_format_)
: BaseReader(file_, to_read_header_, output_header_), input_format(input_format_)
{
assert(input_format);
}
bool NormalFileReader::pull(DB::Chunk & chunk)
{
if (isCancelled())
return false;
/// read read real data chunk from input.
DB::Chunk dataChunk = doPull();
const size_t rows = dataChunk.getNumRows();
if (!rows)
return false;
chunk = DB::Chunk(addVirtualColumn(std::move(dataChunk)), rows);
return true;
}
DB::Block BaseReader::buildRowCountHeader(const DB::Block & header)
{
return !header.empty() ? header : BlockUtil::buildRowCountHeader();
}
namespace
{
/// Factory method to create a reader for normal file, iceberg file or delta file
///
std::unique_ptr<NormalFileReader> createNormalFileReader(
const FormatFilePtr & file,
const DB::Block & to_read_header_,
const DB::Block & output_header_,
const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag = nullptr,
const ColumnIndexFilterPtr & column_index_filter = nullptr)
{
file->initialize(column_index_filter);
auto createInputFormat = [&](const DB::Block & new_read_header_) -> FormatFile::InputFormatPtr
{ return file->createInputFormat(new_read_header_, filter_actions_dag); };
if (file->getFileInfo().has_iceberg())
return iceberg::IcebergReader::create(file, to_read_header_, output_header_, createInputFormat);
auto input_format = createInputFormat(to_read_header_);
if (!input_format)
return nullptr;
// when there is a '__delta_internal_is_row_deleted' column, it needs to use DeltaReader to read data and add column
if (DeltaVirtualMeta::hasMetaColumns(to_read_header_))
{
String row_index_ids_encoded;
String row_index_filter_type;
if (file->getFileInfo().other_const_metadata_columns_size())
{
for (const auto & column : file->getFileInfo().other_const_metadata_columns())
{
if (column.key() == DeltaVirtualMeta::DeltaDVBitmapConfig::DELTA_ROW_INDEX_FILTER_ID_ENCODED)
row_index_ids_encoded = toString(column.value());
if (column.key() == DeltaVirtualMeta::DeltaDVBitmapConfig::DELTA_ROW_INDEX_FILTER_TYPE)
row_index_filter_type = toString(column.value());
}
}
return delta::DeltaReader::create(
file, to_read_header_, output_header_, input_format, row_index_ids_encoded, row_index_filter_type);
}
return std::make_unique<NormalFileReader>(file, to_read_header_, output_header_, input_format);
}
}
/// TODO Remove ColumnIndexFilterPtr
std::unique_ptr<BaseReader> BaseReader::create(
const FormatFilePtr & current_file,
const DB::Block & readHeader,
const DB::Block & outputHeader,
const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
const ColumnIndexFilterPtr & column_index_filter)
{
if (readHeader.empty())
{
if (auto totalRows = current_file->getTotalRows())
return std::make_unique<ConstColumnsFileReader>(current_file, outputHeader, *totalRows);
else
{
/// If we can't get total rows from file metadata (i.e. text/json format file), adding a dummy column to
/// indicate the number of rows.
return createNormalFileReader(current_file, buildRowCountHeader(readHeader), buildRowCountHeader(outputHeader));
}
}
return createNormalFileReader(current_file, readHeader, outputHeader, filter_actions_dag, column_index_filter);
}
}