blob: f3ffab59a6a8b32a5ddd89dc31b44de903aedbf4 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VeloxPlanConverter.h"
#include <filesystem>
#include "config/GlutenConfig.h"
#include "iceberg/IcebergPlanConverter.h"
#include "operators/plannodes/IteratorSplit.h"
namespace gluten {
using namespace facebook;
VeloxPlanConverter::VeloxPlanConverter(
velox::memory::MemoryPool* veloxPool,
const facebook::velox::config::ConfigBase* veloxCfg,
const std::vector<std::shared_ptr<ResultIterator>>& rowVectors,
VeloxConnectorIds connectorIds,
const std::optional<std::string> writeFilesTempPath,
const std::optional<std::string> writeFileName,
bool validationMode)
: validationMode_(validationMode),
veloxCfg_(veloxCfg),
substraitVeloxPlanConverter_(
veloxPool,
veloxCfg,
rowVectors,
std::move(connectorIds),
writeFilesTempPath,
writeFileName,
validationMode) {
VELOX_USER_CHECK_NOT_NULL(veloxCfg_);
}
namespace {
std::shared_ptr<SplitInfo> parseScanSplitInfo(
const facebook::velox::config::ConfigBase* veloxCfg,
const google::protobuf::RepeatedPtrField<substrait::ReadRel_LocalFiles_FileOrFiles>& fileList) {
using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
splitInfo->paths.reserve(fileList.size());
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
splitInfo->partitionColumns.reserve(fileList.size());
splitInfo->properties.reserve(fileList.size());
splitInfo->metadataColumns.reserve(fileList.size());
for (const auto& file : fileList) {
// Expect all Partitions share the same index.
splitInfo->partitionIndex = file.partition_index();
std::unordered_map<std::string, std::string> partitionColumnMap;
for (const auto& partitionColumn : file.partition_columns()) {
partitionColumnMap[partitionColumn.key()] = partitionColumn.value();
}
splitInfo->partitionColumns.emplace_back(partitionColumnMap);
std::unordered_map<std::string, std::string> metadataColumnMap;
for (const auto& metadataColumn : file.metadata_columns()) {
metadataColumnMap[metadataColumn.key()] = metadataColumn.value();
}
splitInfo->metadataColumns.emplace_back(metadataColumnMap);
splitInfo->paths.emplace_back(file.uri_file());
splitInfo->starts.emplace_back(file.start());
splitInfo->lengths.emplace_back(file.length());
facebook::velox::FileProperties fileProps;
if (file.has_properties()) {
fileProps.fileSize = file.properties().filesize();
fileProps.modificationTime = file.properties().modificationtime();
}
splitInfo->properties.emplace_back(fileProps);
switch (file.file_format_case()) {
case SubstraitFileFormatCase::kOrc:
splitInfo->format = dwio::common::FileFormat::ORC;
break;
case SubstraitFileFormatCase::kDwrf:
splitInfo->format = dwio::common::FileFormat::DWRF;
break;
case SubstraitFileFormatCase::kParquet:
splitInfo->format = dwio::common::FileFormat::PARQUET;
break;
case SubstraitFileFormatCase::kText:
splitInfo->format = dwio::common::FileFormat::TEXT;
break;
case SubstraitFileFormatCase::kIceberg:
splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, std::move(splitInfo));
break;
default:
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
}
// The schema in file represents the table schema, it is set when the TableScan requires the
// table schema to be present, currently when the option is set to map columns by index rather
// than by name in Parquet or ORC files. Since the table schema should be the same for all
// files, we set it in the SplitInfo based on the first file we encounter with the schema set.
if (!splitInfo->tableSchema && file.has_schema()) {
const auto& schema = file.schema();
std::vector<std::string> names;
std::vector<TypePtr> types;
names.reserve(schema.names().size());
const bool asLowerCase = !veloxCfg->get<bool>(kCaseSensitive, false);
for (const auto& name : schema.names()) {
std::string fieldName = name;
if (asLowerCase) {
folly::toLowerAscii(fieldName);
}
names.emplace_back(std::move(fieldName));
}
types = SubstraitParser::parseNamedStruct(schema, asLowerCase);
splitInfo->tableSchema = ROW(std::move(names), std::move(types));
}
}
return splitInfo;
}
void parseLocalFileNodes(
SubstraitToVeloxPlanConverter* planConverter,
const facebook::velox::config::ConfigBase* veloxCfg,
std::vector<::substrait::ReadRel_LocalFiles>& localFiles) {
std::vector<std::shared_ptr<SplitInfo>> splitInfos;
splitInfos.reserve(localFiles.size());
for (const auto& localFile : localFiles) {
const auto& fileList = localFile.items();
splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList));
}
planConverter->setSplitInfos(std::move(splitInfos));
}
} // namespace
std::shared_ptr<const facebook::velox::core::PlanNode> VeloxPlanConverter::toVeloxPlan(
const ::substrait::Plan& substraitPlan,
std::vector<::substrait::ReadRel_LocalFiles> localFiles) {
if (!validationMode_) {
parseLocalFileNodes(&substraitVeloxPlanConverter_, veloxCfg_, localFiles);
}
return substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan);
}
} // namespace gluten