blob: 11fe936fec6081c609a02bc4d44bf799cad7a2ea [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/file-metadata-utils.h"
#include "exec/hdfs-scan-node-base.h"
#include "exec/scanner-context.h"
#include "exec/text-converter.inline.h"
#include "runtime/descriptors.h"
#include "runtime/tuple.h"
#include "util/flat_buffer.h"
#include "util/string-parser.h"
#include "common/names.h"
namespace impala {
void FileMetadataUtils::SetFile(RuntimeState* state, const HdfsFileDesc* file_desc) {
DCHECK(state != nullptr);
DCHECK(file_desc != nullptr);
state_ = state;
file_desc_ = file_desc;
}
Tuple* FileMetadataUtils::CreateTemplateTuple(int64_t partition_id, MemPool* mem_pool,
std::map<const SlotId, const SlotDescriptor*>* slot_descs_written) {
DCHECK(file_desc_ != nullptr);
// Initialize the template tuple, it is copied from the template tuple map in the
// HdfsScanNodeBase.
Tuple* template_tuple = scan_node_->GetTemplateTupleForPartitionId(partition_id);
if (template_tuple != nullptr) {
template_tuple = template_tuple->DeepCopy(*scan_node_->tuple_desc(), mem_pool);
}
if (UNLIKELY(!scan_node_->virtual_column_slots().empty())) {
AddFileLevelVirtualColumns(mem_pool, template_tuple);
}
if (scan_node_->hdfs_table()->IsIcebergTable()) {
AddIcebergColumns(mem_pool, &template_tuple, slot_descs_written);
}
return template_tuple;
}
void FileMetadataUtils::AddFileLevelVirtualColumns(MemPool* mem_pool,
Tuple* template_tuple) {
if (template_tuple == nullptr) return;
for (int i = 0; i < scan_node_->virtual_column_slots().size(); ++i) {
const SlotDescriptor* slot_desc = scan_node_->virtual_column_slots()[i];
if (slot_desc->virtual_column_type() == TVirtualColumnType::INPUT_FILE_NAME) {
StringValue* slot = template_tuple->GetStringSlot(slot_desc->tuple_offset());
const char* filename = file_desc_->filename.c_str();
int len = strlen(filename);
char* filename_copy = reinterpret_cast<char*>(mem_pool->Allocate(len));
Ubsan::MemCpy(filename_copy, filename, len);
slot->Assign(filename_copy, len);
template_tuple->SetNotNull(slot_desc->null_indicator_offset());
} else if (slot_desc->virtual_column_type() ==
TVirtualColumnType::ICEBERG_DATA_SEQUENCE_NUMBER) {
using namespace org::apache::impala::fb;
const FbIcebergMetadata* ice_metadata =
file_desc_->file_metadata->iceberg_metadata();
DCHECK(ice_metadata != nullptr);
int64_t data_seq_num = ice_metadata->data_sequence_number();
if (data_seq_num > -1) {
int64_t* slot = template_tuple->GetBigIntSlot(slot_desc->tuple_offset());
*slot = data_seq_num;
template_tuple->SetNotNull(slot_desc->null_indicator_offset());
} else {
template_tuple->SetNull(slot_desc->null_indicator_offset());
}
}
}
}
auto FileMetadataUtils::IcebergPartitionTransforms() const {
DCHECK(file_desc_ != nullptr);
DCHECK(scan_node_->hdfs_table()->IsIcebergTable());
using namespace org::apache::impala::fb;
const FbFileMetadata* file_metadata = file_desc_->file_metadata;
DCHECK(file_metadata != nullptr);
const FbIcebergMetadata* ice_metadata = file_metadata->iceberg_metadata();
DCHECK(ice_metadata != nullptr);
return ice_metadata->partition_keys();
}
void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tuple,
std::map<const SlotId, const SlotDescriptor*>* slot_descs_written) {
using namespace org::apache::impala::fb;
TextConverter text_converter(/* escape_char = */ '\\',
scan_node_->hdfs_table()->null_partition_key_value(),
/* check_null = */ true, /* strict_mode = */ true);
const FbFileMetadata* file_metadata = file_desc_->file_metadata;
const FbIcebergMetadata* ice_metadata = file_metadata->iceberg_metadata();
auto transforms = ice_metadata->partition_keys();
const TupleDescriptor* tuple_desc = scan_node_->tuple_desc();
if (*template_tuple == nullptr) {
*template_tuple = Tuple::Create(tuple_desc->byte_size(), mem_pool);
}
for (const SlotDescriptor* slot_desc : scan_node_->tuple_desc()->slots()) {
if (slot_desc->IsVirtual()) {
AddVirtualIcebergColumn(mem_pool, *template_tuple, *ice_metadata, slot_desc);
continue;
}
if (transforms == nullptr) continue;
const SchemaPath& path = slot_desc->col_path();
if (path.size() != 1) continue;
const ColumnDescriptor& col_desc =
scan_node_->hdfs_table()->col_descs()[path.front()];
int field_id = col_desc.field_id();
for (int i = 0; i < transforms->size(); ++i) {
auto transform = transforms->Get(i);
if (transform->transform_type() !=
FbIcebergTransformType::FbIcebergTransformType_IDENTITY) {
continue;
}
if (field_id != transform->source_id()) continue;
if (!text_converter.WriteSlot(slot_desc, *template_tuple,
(const char*)transform->transform_value()->data(),
transform->transform_value()->size(),
true, false,
mem_pool)) {
ErrorMsg error_msg(TErrorCode::GENERAL,
Substitute("Could not parse partition value for "
"column '$0' in file '$1'. Partition string is '$2' "
"NULL Partition key value is '$3'",
col_desc.name(), file_desc_->filename,
transform->transform_value()->data(),
scan_node_->hdfs_table()->null_partition_key_value()));
// Dates are stored as INTs in the partition data in Iceberg, so let's try
// to parse them as INTs.
if (col_desc.type().type == PrimitiveType::TYPE_DATE) {
int32_t* slot = (*template_tuple)->GetIntSlot(slot_desc->tuple_offset());
StringParser::ParseResult parse_result;
*slot = StringParser::StringToInt<int32_t>(
(const char*)transform->transform_value()->data(),
transform->transform_value()->size(),
&parse_result);
if (parse_result == StringParser::ParseResult::PARSE_SUCCESS) {
(*template_tuple)->SetNotNull(slot_desc->null_indicator_offset());
slot_descs_written->insert({slot_desc->id(), slot_desc});
} else {
state_->LogError(error_msg);
}
} else {
state_->LogError(error_msg);
}
} else {
slot_descs_written->insert({slot_desc->id(), slot_desc});
}
}
}
}
void FileMetadataUtils::AddVirtualIcebergColumn(MemPool* mem_pool, Tuple* template_tuple,
const org::apache::impala::fb::FbIcebergMetadata& ice_metadata,
const SlotDescriptor* slot_desc) {
DCHECK(slot_desc->IsVirtual());
if (slot_desc->virtual_column_type() == TVirtualColumnType::PARTITION_SPEC_ID) {
int32_t* slot = template_tuple->GetIntSlot(slot_desc->tuple_offset());
*slot = ice_metadata.spec_id();
} else if (slot_desc->virtual_column_type() ==
TVirtualColumnType::ICEBERG_PARTITION_SERIALIZED) {
auto transforms = ice_metadata.partition_keys();
if (transforms == nullptr) return;
string partitions;
for (int i = 0; i < transforms->size(); ++i) {
using namespace org::apache::impala::fb;
auto transform = transforms->Get(i);
if (transform->transform_type() ==
FbIcebergTransformType::FbIcebergTransformType_VOID) {
continue;
}
stringstream part_ss;
Base64Encode((const char*)transform->transform_value()->data(),
transform->transform_value()->size(), &part_ss);
string part_value(part_ss.str());
if (partitions.empty()) {
partitions = part_value;
} else {
partitions += '.' + part_value;
}
}
StringValue* slot = template_tuple->GetStringSlot(slot_desc->tuple_offset());
int len = partitions.length();
char* partition_serialized_copy = reinterpret_cast<char*>(mem_pool->Allocate(len));
Ubsan::MemCpy(partition_serialized_copy, partitions.c_str(), len);
slot->Assign(partition_serialized_copy, len);
template_tuple->SetNotNull(slot_desc->null_indicator_offset());
}
}
bool FileMetadataUtils::IsValuePartitionCol(const SlotDescriptor* slot_desc) {
DCHECK(file_desc_ != nullptr);
if (slot_desc->col_pos() < scan_node_->num_partition_keys() &&
!slot_desc->IsVirtual()) {
return true;
}
if (!scan_node_->hdfs_table()->IsIcebergTable()) return false;
using namespace org::apache::impala::fb;
const SchemaPath& path = slot_desc->col_path();
if (path.size() != 1) return false;
int field_id = scan_node_->hdfs_table()->col_descs()[path.front()].field_id();
auto transforms = IcebergPartitionTransforms();
if (transforms == nullptr) return false;
for (int i = 0; i < transforms->size(); ++i) {
auto transform = transforms->Get(i);
if (transform->source_id() == field_id &&
transform->transform_type() ==
FbIcebergTransformType::FbIcebergTransformType_IDENTITY) {
return true;
}
}
return false;
}
Status FileMetadataUtils::AdjustFieldIdForMigratedPartitionedTables(int *fieldID) const {
DCHECK(file_desc_ != nullptr);
DCHECK(scan_node_->hdfs_table()->IsIcebergTable());
DCHECK(fieldID != nullptr);
using namespace org::apache::impala::fb;
auto transforms = IcebergPartitionTransforms();
if (transforms == nullptr || transforms->size() == 0) return Status::OK();
vector<int> sourceIDs;
sourceIDs.reserve(transforms->size());
for (auto transform : *transforms) {
if (transform->transform_type() !=
FbIcebergTransformType::FbIcebergTransformType_IDENTITY) {
return Status(Substitute("$0 has invalid partition transform: $1",
file_desc_->filename, transform->transform_type()));
}
sourceIDs.push_back(transform->source_id());
}
DCHECK_EQ(sourceIDs.size(), transforms->size());
sort(sourceIDs.begin(), sourceIDs.end());
string errorMsg = Substitute(
"Migrated file $0 has unexpected schema or partitioning.", file_desc_->filename);
// Field IDs must be consecutive.
for (int i = 0; i < sourceIDs.size() - 1; ++i) {
if (sourceIDs[i+1] - sourceIDs[i] != 1) {
return Status(errorMsg);
}
}
int fileFieldID = *fieldID;
if (sourceIDs.front() == fileFieldID) {
// Partition columns are not stored in the data file (as expected).
// In this case we need to adjust fieldID (as it was calculated based on
// the file schema only).
*fieldID += sourceIDs.size();
} else if (sourceIDs.back() == fileFieldID - 1) {
// Partition columns are stored in the data file, no need to adjust fieldID.
} else {
// Some partitions are stored, some aren't let's raise an error for this mess.
return Status(errorMsg);
}
return Status::OK();
}
bool FileMetadataUtils::NeedDataInFile(const SlotDescriptor* slot_desc) {
if (IsValuePartitionCol(slot_desc)) return false;
if (slot_desc->IsVirtual()) return false;
return true;
}
} // namespace impala