blob: 6829d34d4adb8a4dbce8247e5c4b0b0048fca0f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FormatFile.h"
#include <Core/Settings.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/SubstraitSource/JSONFormatFile.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <Common/GlutenConfig.h>
#include <Common/GlutenStringUtils.h>
#include <Common/logger_useful.h>
#if USE_PARQUET
#include <Storages/SubstraitSource/ParquetFormatFile.h>
#endif
#if USE_ORC
#include <Storages/SubstraitSource/ORCFormatFile.h>
#endif
#if USE_HIVE
#include <Storages/SubstraitSource/ExcelTextFormatFile.h>
#include <Storages/SubstraitSource/TextFormatFile.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
}
namespace local_engine
{
using namespace DB;
// Initialize the static variable outside the class definition
std::map<std::string, std::function<Field(const std::string &)>> FileMetaColumns::BASE_METADATA_EXTRACTORS
= {{FILE_PATH, [](const std::string & metadata) { return metadata; }},
{FILE_NAME, [](const std::string & metadata) { return metadata; }},
{FILE_SIZE, [](const std::string & value) { return std::strtoll(value.c_str(), nullptr, 10); }},
{FILE_BLOCK_START, [](const std::string & value) { return std::strtoll(value.c_str(), nullptr, 10); }},
{FILE_BLOCK_LENGTH, [](const std::string & value) { return std::strtoll(value.c_str(), nullptr, 10); }},
{FILE_MODIFICATION_TIME,
[](const std::string & metadata)
{
DB::ReadBufferFromString in(metadata);
DateTime64 time = 0;
readDateTime64Text(time, 6, in, DateLUT::instance("UTC"));
return DecimalField(time, 6);
}}};
// Initialize the static variable outside the class definition
std::map<std::string, std::function<DB::Field(const SubstraitInputFile &)>> FileMetaColumns::INPUT_FUNCTION_EXTRACTORS
= {{INPUT_FILE_NAME, [](const SubstraitInputFile & file) { return file.uri_file(); }},
{INPUT_FILE_BLOCK_START, [](const SubstraitInputFile & file) { return file.start(); }},
{INPUT_FILE_BLOCK_LENGTH, [](const SubstraitInputFile & file) { return file.length(); }}};
FileMetaColumns::FileMetaColumns(const SubstraitInputFile & file)
{
for (const auto & column : file.metadata_columns())
{
if (!BASE_METADATA_EXTRACTORS.contains(column.key()))
continue;
assert(BASE_METADATA_EXTRACTORS.contains(column.key()));
metadata_columns_map[column.key()] = BASE_METADATA_EXTRACTORS[column.key()](column.value());
}
for (const auto & inputExtractor : INPUT_FUNCTION_EXTRACTORS)
{
assert(!metadata_columns_map.contains(inputExtractor.first));
metadata_columns_map[inputExtractor.first] = inputExtractor.second(file);
}
}
DB::ColumnPtr FileMetaColumns::createMetaColumn(const String & columnName, const DB::DataTypePtr & type, size_t rows) const
{
assert(metadata_columns_map.contains(columnName));
const auto field = metadata_columns_map.at(columnName);
if (INPUT_FILE_COLUMNS_SET.contains(columnName))
{
/// copied from InputFileNameParser::addInputFileColumnsToChunk()
/// TODO: check whether using const column is correct or not.
return type->createColumnConst(rows, field);
}
auto mutable_column = type->createColumn();
mutable_column->insertMany(field, rows);
return mutable_column;
}
FormatFile::FormatFile(DB::ContextPtr context_, const SubstraitInputFile & file_info_, const ReadBufferBuilderPtr & read_buffer_builder_)
: context(context_), file_info(file_info_), read_buffer_builder(read_buffer_builder_), meta_columns(file_info_)
{
/// Escaping rules for file_info:
/// 1. file_info.partition_columns() and file_info.partition_values() had been decoded in jvm. We don't need to decode them again in c++.
/// e.g. The raw partition value is "%s", and the decoded value is "%s".
///
/// 2. the file path in file_info.uri_file() and file_info.metadata_columns() are not decoded.
/// e.g. The raw partition value is "%s", the raw file path is "file:/spark-warehouse/sales/year=%25s/part-00000"
/// the file path in file_info.uri_file() and file_info.metadata_columns() is encoded based on raw file path: "file:/spark-warehouse/sales/year=%2525s/part-00000"
/// To get the raw file path, we need to decode file_info.uri_file(). Currently it is implemented in [[ReadBufferBuilder::build()]].
///
/// 3. input_file_name() returns file_info.uri_file() directly instead of decoding it.
if (file_info.partition_columns_size())
{
for (size_t i = 0; i < file_info.partition_columns_size(); ++i)
{
const auto & partition_column = file_info.partition_columns(i);
partition_values[partition_column.key()] = partition_column.value();
auto normalized_key = boost::to_lower_copy(partition_column.key());
normalized_partition_values[normalized_key] = partition_column.value();
}
}
LOG_INFO(
&Poco::Logger::get("FormatFile"),
"Reading File path: {}, format: {}, range: {}, partition_index: {}, partition_values: {}",
file_info.uri_file(),
file_info.file_format_case(),
std::to_string(file_info.start()) + "-" + std::to_string(file_info.start() + file_info.length()),
file_info.partition_index(),
GlutenStringUtils::mkString(partition_values));
}
FormatFilePtr FormatFileUtil::createFile(
DB::ContextPtr context, ReadBufferBuilderPtr read_buffer_builder, const substrait::ReadRel::LocalFiles::FileOrFiles & file)
{
#if USE_PARQUET
if (file.has_parquet() || (file.has_iceberg() && file.iceberg().has_parquet()))
{
auto config = ExecutorConfig::loadFromContext(context);
return std::make_shared<ParquetFormatFile>(context, file, read_buffer_builder, config.use_local_format);
}
#endif
#if USE_ORC
if (file.has_orc() || (file.has_iceberg() && file.iceberg().has_orc()))
return std::make_shared<ORCFormatFile>(context, file, read_buffer_builder);
#endif
#if USE_HIVE
if (file.has_text())
{
if (ExcelTextFormatFile::useThis(context))
return std::make_shared<ExcelTextFormatFile>(context, file, read_buffer_builder);
else
return std::make_shared<TextFormatFile>(context, file, read_buffer_builder);
}
#endif
if (file.has_json())
return std::make_shared<JSONFormatFile>(context, file, read_buffer_builder);
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Format not supported:{}", file.DebugString());
}
}