blob: 0edc1bab53b593eb4a1cae599d9e5c99fe8e7cc7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <unordered_map>
#include "runtime/descriptors.h"
#include "runtime/tuple.h"
#include "runtime/tuple-row.h"
namespace impala {
/// Helper class for collecting position delete records and efficiently
/// serializing them to outbound row batches. The serialization algorithm
/// is the following:
/// for each entry in 'file_to_positions_':
/// write out the file path
/// for each position for the current file path:
/// write out a tuple: (StringValue object pointing to the file path, position)
/// E.g.:
/// +----------------+-------------+--------+-------------+--------+-----+
/// | File path | StringValue | BigInt | StringValue | BigInt | ... |
/// +----------------+-------------+--------+-------------+--------+-----+
/// | /.../a.parquet | ptr, len | 42 | ptr, len | 43 | ... |
/// +----------------+-------------+--------+-------------+--------+-----+
///
/// IcebergPositionDeleteCollector tracks the memory of the file paths, but the
/// positions are stored in a std::vector<int64_t> untracked (since it only store records
/// up to a KrpcDataStreamSender::Channel's capacity, the memory consumption should
/// not be too significant).
class IcebergPositionDeleteCollector {
public:
IcebergPositionDeleteCollector(TupleDescriptor* desc) {
desc_ = desc;
DCHECK_EQ(desc_->slots().size(), 2);
SlotDescriptor* file_path_desc = desc_->slots()[0];
SlotDescriptor* position_desc = desc_->slots()[1];
DCHECK(file_path_desc->type().IsVarLenStringType());
DCHECK(position_desc->type().type == TYPE_BIGINT);
file_path_offset_ = file_path_desc->tuple_offset();
pos_offset_ = position_desc->tuple_offset();
row_count_ = 0;
insert_it_ = file_to_positions_.end();
}
void Init(MemTracker* parent_mem_tracker) {
pool_ = std::make_unique<MemPool>(parent_mem_tracker);
}
Status AddRow(TupleRow* row) {
Tuple* tuple = row->GetTuple(0);
StringValue* filename_value = tuple->GetStringSlot(file_path_offset_);
int64_t pos = *tuple->GetBigIntSlot(pos_offset_);
RETURN_IF_ERROR(SetInsertIterator(filename_value));
insert_it_->second.push_back(pos);
++row_count_;
return Status::OK();
}
int RowCount() const { return row_count_; }
void Close() {
Reset();
pool_->FreeAll();
}
/// Serializes the collected position delete records into 'dest',
/// then resets the internal structures of 'this'.
Status Serialize(OutboundRowBatch* dest) {
dest->tuple_offsets_.clear();
int64_t tuple_data_size = TupleDataSize();
dest->tuple_data_.resize(tuple_data_size);
char* tuple_data = const_cast<char*>(dest->tuple_data_.data());
Ubsan::MemSet(tuple_data, 0, tuple_data_size);
int offset = 0;
for (const auto& [path, positions] : file_to_positions_) {
int path_start = offset;
int path_len = path.Len();
Ubsan::MemCpy(tuple_data + offset, path.Ptr(), path_len);
offset += path_len;
for (int64_t pos : positions) {
dest->tuple_offsets_.push_back(offset);
Tuple* t = reinterpret_cast<Tuple*>(tuple_data + offset);
StringValue* sv = t->GetStringSlot(file_path_offset_);
sv->Assign(reinterpret_cast<char*>(path_start), path_len);
int64_t* pos_slot = t->GetBigIntSlot(pos_offset_);
DCHECK_GE(pos, 0);
*pos_slot = pos;
offset += desc_->byte_size();
}
}
DCHECK_EQ(offset, tuple_data_size);
Reset();
return Status::OK();
}
private:
void Reset() {
file_to_positions_.clear();
insert_it_ = file_to_positions_.end();
pool_->Clear();
row_count_ = 0;
}
int64_t TupleDataSize() const {
int64_t total_size = 0;
for (const auto& [filename, positions] : file_to_positions_) {
total_size += filename.Len();
total_size += positions.size() * desc_->byte_size();
}
return total_size;
}
Status SetInsertIterator(StringValue* filename_value) {
if (insert_it_ != file_to_positions_.end() &&
insert_it_->first == *filename_value) {
return Status::OK();
}
insert_it_ = file_to_positions_.find(*filename_value);
if (insert_it_ != file_to_positions_.end()) {
return Status::OK();
}
StringValue::SimpleString ss = filename_value->ToSimpleString();
char* ptr = reinterpret_cast<char*>(pool_->TryAllocate(ss.len));
if (UNLIKELY(ptr == nullptr)) {
return Status(strings::Substitute(
"Could not allocate $0 bytes in IcebergPositionDeleteChannel.", ss.len));
}
Ubsan::MemCpy(ptr, ss.ptr, ss.len);
StringValue sv(ptr, ss.len);
insert_it_ = file_to_positions_.insert(insert_it_, {sv, {}});
return Status::OK();
}
std::unique_ptr<MemPool> pool_;
TupleDescriptor* desc_;
int file_path_offset_;
int pos_offset_;
int row_count_;
using FileToPositionsMap = std::unordered_map<StringValue, vector<int64_t>>;
FileToPositionsMap file_to_positions_;
FileToPositionsMap::iterator insert_it_;
};
}