| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "FileReader.h" |
| |
| #include <Columns/ColumnNullable.h> |
| #include <DataTypes/DataTypeNullable.h> |
| #include <DataTypes/DataTypesDecimal.h> |
| #include <IO/ReadBufferFromString.h> |
| #include <Parser/SubstraitParserUtils.h> |
| #include <Storages/SubstraitSource/Delta/DeltaMeta.h> |
| #include <Storages/SubstraitSource/Delta/DeltaReader.h> |
| #include <Storages/SubstraitSource/Iceberg/IcebergReader.h> |
| #include <Storages/SubstraitSource/ParquetFormatFile.h> |
| #include <boost/algorithm/string/case_conv.hpp> |
| #include <Common/CHUtil.h> |
| #include <Common/Exception.h> |
| #include <Common/GlutenStringUtils.h> |
| |
| namespace DB |
| { |
| namespace ErrorCodes |
| { |
| extern const int UNKNOWN_TYPE; |
| extern const int LOGICAL_ERROR; |
| } |
| } |
| |
| namespace local_engine |
| { |
| DB::Columns BaseReader::addVirtualColumn(DB::Chunk dataChunk, size_t rowNum) const |
| { |
| // dataChunk may be empty |
| const size_t rows = dataChunk.empty() ? rowNum : dataChunk.getNumRows(); |
| assert(rows && "read 0 rows from file"); |
| |
| auto read_columns = dataChunk.detachColumns(); |
| const auto & columns = getHeader().getColumnsWithTypeAndName(); |
| const auto & normalized_partition_values = file->getFileNormalizedPartitionValues(); |
| |
| DB::Columns res_columns; |
| res_columns.reserve(columns.size()); |
| std::ranges::transform( |
| columns, |
| std::back_inserter(res_columns), |
| [&](const auto & column) -> DB::ColumnPtr |
| { |
| if (auto it = normalized_partition_values.find(boost::to_lower_copy(column.name)); it != normalized_partition_values.end()) |
| return createPartitionColumn(it->second, column.type, rows); |
| if (file->fileMetaColumns().virtualColumn(column.name)) |
| return file->fileMetaColumns().createMetaColumn(column.name, column.type, rows); |
| if (readHeader.has(column.name)) |
| return read_columns[readHeader.getPositionByName(column.name)]; |
| throw DB::Exception( |
| DB::ErrorCodes::LOGICAL_ERROR, "Not found column = {} when reading file: {}.", column.name, file->getURIPath()); |
| }); |
| return res_columns; |
| } |
| |
| DB::ColumnPtr BaseReader::createConstColumn(DB::DataTypePtr data_type, const DB::Field & field, size_t rows) |
| { |
| auto nested_type = DB::removeNullable(data_type); |
| auto column = nested_type->createColumnConst(rows, field); |
| |
| if (data_type->isNullable()) |
| column = DB::ColumnNullable::create(column, DB::ColumnUInt8::create(rows, 0)); |
| return column; |
| } |
| |
| DB::ColumnPtr BaseReader::createPartitionColumn(const String & value, const DB::DataTypePtr & type, size_t rows) |
| { |
| if (GlutenStringUtils::isNullPartitionValue(value)) |
| { |
| if (!type->isNullable()) |
| throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Partition column is null value,but column data type is not nullable."); |
| auto nested_type = static_cast<const DB::DataTypeNullable &>(*type).getNestedType(); |
| auto column = nested_type->createColumnConstWithDefaultValue(rows); |
| return DB::ColumnNullable::create(column, DB::ColumnUInt8::create(rows, 1)); |
| } |
| else |
| { |
| auto field = buildFieldFromString(value, type); |
| return createConstColumn(type, field, rows); |
| } |
| } |
| |
| #define BUILD_INT_FIELD(type) \ |
| [](DB::ReadBuffer & in, const String &) \ |
| { \ |
| type value = 0; \ |
| DB::readIntText(value, in); \ |
| return DB::Field(value); \ |
| } |
| |
| #define BUILD_FP_FIELD(type) \ |
| [](DB::ReadBuffer & in, const String &) \ |
| { \ |
| type value = 0.0; \ |
| DB::readFloatText(value, in); \ |
| return DB::Field(value); \ |
| } |
| |
| DB::Field BaseReader::buildFieldFromString(const String & str_value, DB::DataTypePtr type) |
| { |
| using FieldBuilder = std::function<DB::Field(DB::ReadBuffer &, const String &)>; |
| static std::map<std::string, FieldBuilder> field_builders |
| = {{"Int8", BUILD_INT_FIELD(Int8)}, |
| {"Int16", BUILD_INT_FIELD(Int16)}, |
| {"Int32", BUILD_INT_FIELD(Int32)}, |
| {"Int64", BUILD_INT_FIELD(Int64)}, |
| {"Float32", BUILD_FP_FIELD(Float32)}, |
| {"Float64", BUILD_FP_FIELD(Float64)}, |
| {"String", [](DB::ReadBuffer &, const String & val) { return DB::Field(val); }}, |
| {"Date", |
| [](DB::ReadBuffer & in, const String &) |
| { |
| DayNum value; |
| readDateText(value, in); |
| return DB::Field(value); |
| }}, |
| {"Date32", |
| [](DB::ReadBuffer & in, const String &) |
| { |
| ExtendedDayNum value; |
| readDateText(value, in); |
| return DB::Field(value.toUnderType()); |
| }}, |
| {"Bool", |
| [](DB::ReadBuffer & in, const String &) |
| { |
| bool value; |
| readBoolTextWord(value, in, true); |
| return DB::Field(value); |
| }}, |
| {"DateTime64(6)", |
| [](DB::ReadBuffer &, const String & s) |
| { |
| std::string decoded; // s: "2023-07-12 05%3A05%3A33.798" (spark encoded it) => decoded: "2023-07-12 05:05:33.798" |
| Poco::URI::decode(s, decoded); |
| |
| std::string to_read; |
| if (decoded.length() > 23) // we see cases when spark mistakely? encode the URI twice, so we need to decode twice |
| Poco::URI::decode(decoded, to_read); |
| else |
| to_read = decoded; |
| |
| DB::ReadBufferFromString read_buffer(to_read); |
| DB::DateTime64 value; |
| DB::readDateTime64Text(value, 6, read_buffer); |
| return DB::Field(value); |
| }} |
| |
| }; |
| |
| auto nested_type = DB::removeNullable(type); |
| DB::ReadBufferFromString read_buffer(str_value); |
| auto it = field_builders.find(nested_type->getName()); |
| if (it == field_builders.end()) |
| { |
| DB::WhichDataType which(nested_type->getTypeId()); |
| if (which.isDecimal32()) |
| { |
| const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal32> &>(*nested_type); |
| DB::Decimal32 value = dataTypeDecimal.parseFromString(str_value); |
| return DB::DecimalField<DB::Decimal32>(value, dataTypeDecimal.getScale()); |
| } |
| else if (which.isDecimal64()) |
| { |
| const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal64> &>(*nested_type); |
| DB::Decimal64 value = dataTypeDecimal.parseFromString(str_value); |
| return DB::DecimalField<DB::Decimal64>(value, dataTypeDecimal.getScale()); |
| } |
| else if (which.isDecimal128()) |
| { |
| const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal128> &>(*nested_type); |
| DB::Decimal128 value = dataTypeDecimal.parseFromString(str_value); |
| return DB::DecimalField<DB::Decimal128>(value, dataTypeDecimal.getScale()); |
| } |
| else if (which.isDecimal256()) |
| { |
| const auto & dataTypeDecimal = static_cast<const DB::DataTypeDecimal<DB::Decimal256> &>(*nested_type); |
| DB::Decimal256 value = dataTypeDecimal.parseFromString(str_value); |
| return DB::DecimalField<DB::Decimal256>(value, dataTypeDecimal.getScale()); |
| } |
| |
| throw DB::Exception(DB::ErrorCodes::UNKNOWN_TYPE, "Unsupported data type {}", nested_type->getName()); |
| } |
| return it->second(read_buffer, str_value); |
| } |
| |
| ConstColumnsFileReader::ConstColumnsFileReader(const FormatFilePtr & file_, const DB::Block & header_, size_t blockSize) |
| : BaseReader(file_, {}, header_), remained_rows(file->getTotalRows().value()), block_size(blockSize) |
| { |
| } |
| |
| bool ConstColumnsFileReader::pull(DB::Chunk & chunk) |
| { |
| if (isCancelled()) |
| return false; |
| |
| if (!remained_rows) |
| return false; |
| |
| size_t to_read_rows = 0; |
| if (remained_rows < block_size) |
| { |
| to_read_rows = remained_rows; |
| remained_rows = 0; |
| } |
| else |
| { |
| to_read_rows = block_size; |
| remained_rows -= block_size; |
| } |
| |
| /// If the original output header is empty, build a block to represent the row count. |
| DB::Columns res_columns |
| = getHeader().columns() > 0 ? addVirtualColumn({}, to_read_rows) : BlockUtil::buildRowCountChunk(to_read_rows).detachColumns(); |
| |
| chunk = DB::Chunk(std::move(res_columns), to_read_rows); |
| return true; |
| } |
| |
| |
| NormalFileReader::NormalFileReader( |
| const FormatFilePtr & file_, |
| const DB::Block & to_read_header_, |
| const DB::Block & output_header_, |
| const FormatFile::InputFormatPtr & input_format_) |
| : BaseReader(file_, to_read_header_, output_header_), input_format(input_format_) |
| { |
| assert(input_format); |
| } |
| |
| bool NormalFileReader::pull(DB::Chunk & chunk) |
| { |
| if (isCancelled()) |
| return false; |
| |
| /// read read real data chunk from input. |
| DB::Chunk dataChunk = doPull(); |
| const size_t rows = dataChunk.getNumRows(); |
| if (!rows) |
| return false; |
| |
| chunk = DB::Chunk(addVirtualColumn(std::move(dataChunk)), rows); |
| return true; |
| } |
| |
| DB::Block BaseReader::buildRowCountHeader(const DB::Block & header) |
| { |
| return !header.empty() ? header : BlockUtil::buildRowCountHeader(); |
| } |
| |
| namespace |
| { |
| /// Factory method to create a reader for normal file, iceberg file or delta file |
| /// |
| std::unique_ptr<NormalFileReader> createNormalFileReader( |
| const FormatFilePtr & file, |
| const DB::Block & to_read_header_, |
| const DB::Block & output_header_, |
| const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag = nullptr, |
| const ColumnIndexFilterPtr & column_index_filter = nullptr) |
| { |
| file->initialize(column_index_filter); |
| auto createInputFormat = [&](const DB::Block & new_read_header_) -> FormatFile::InputFormatPtr |
| { return file->createInputFormat(new_read_header_, filter_actions_dag); }; |
| |
| if (file->getFileInfo().has_iceberg()) |
| return iceberg::IcebergReader::create(file, to_read_header_, output_header_, createInputFormat); |
| |
| auto input_format = createInputFormat(to_read_header_); |
| |
| if (!input_format) |
| return nullptr; |
| |
| // when there is a '__delta_internal_is_row_deleted' column, it needs to use DeltaReader to read data and add column |
| if (DeltaVirtualMeta::hasMetaColumns(to_read_header_)) |
| { |
| String row_index_ids_encoded; |
| String row_index_filter_type; |
| if (file->getFileInfo().other_const_metadata_columns_size()) |
| { |
| for (const auto & column : file->getFileInfo().other_const_metadata_columns()) |
| { |
| if (column.key() == DeltaVirtualMeta::DeltaDVBitmapConfig::DELTA_ROW_INDEX_FILTER_ID_ENCODED) |
| row_index_ids_encoded = toString(column.value()); |
| if (column.key() == DeltaVirtualMeta::DeltaDVBitmapConfig::DELTA_ROW_INDEX_FILTER_TYPE) |
| row_index_filter_type = toString(column.value()); |
| } |
| } |
| return delta::DeltaReader::create( |
| file, to_read_header_, output_header_, input_format, row_index_ids_encoded, row_index_filter_type); |
| } |
| |
| return std::make_unique<NormalFileReader>(file, to_read_header_, output_header_, input_format); |
| } |
| } |
| |
| /// TODO Remove ColumnIndexFilterPtr |
| std::unique_ptr<BaseReader> BaseReader::create( |
| const FormatFilePtr & current_file, |
| const DB::Block & readHeader, |
| const DB::Block & outputHeader, |
| const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag, |
| const ColumnIndexFilterPtr & column_index_filter) |
| { |
| if (readHeader.empty()) |
| { |
| if (auto totalRows = current_file->getTotalRows()) |
| return std::make_unique<ConstColumnsFileReader>(current_file, outputHeader, *totalRows); |
| else |
| { |
| /// If we can't get total rows from file metadata (i.e. text/json format file), adding a dummy column to |
| /// indicate the number of rows. |
| return createNormalFileReader(current_file, buildRowCountHeader(readHeader), buildRowCountHeader(outputHeader)); |
| } |
| } |
| |
| return createNormalFileReader(current_file, readHeader, outputHeader, filter_actions_dag, column_index_filter); |
| } |
| |
| |
| } |