blob: c14c24ad52a0daa4e390208aab26674bf2c0bb72 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "exec/iceberg-delete-sink-base.h"
#include "exec/output-partition.h"
#include "exec/table-sink-base.h"
#include "runtime/string-value.h"
#include <unordered_map>
namespace impala {
class MemPool;
/// IcebergBufferedDeleteSink buffers the Iceberg position delete records in its
/// internal data structures. In FlushFinal() it sorts the buffered records (per
/// partition) and writes them out to position delete files (writing one partition
/// at a time). So the main difference between IcebergBufferedDeleteSink and
/// IcebergDeleteSink is that IcebergBufferedDeleteSink doesn't assume a sorted
/// input of delete records.
class IcebergBufferedDeleteSink : public IcebergDeleteSinkBase {
public:
IcebergBufferedDeleteSink(TDataSinkId sink_id,
const IcebergDeleteSinkConfig& sink_config,
RuntimeState* state);
/// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
/// Opens output_exprs and partition_key_exprs.
Status Open(RuntimeState* state) override;
/// Buffers incoming row batches in 'current_partition_'.
Status Send(RuntimeState* state, RowBatch* batch) override;
/// Writes the buffered records to position delete files in the correct order.
Status FlushFinal(RuntimeState* state) override;
/// Releases buffers and merges the member 'dml_exec_state_' to
/// 'state->dml_exec_state()'.
void Close(RuntimeState* state) override;
std::string DebugString() const override;
private:
/// Contains partition spec id and the encoded partition values.
typedef std::pair<int32_t, std::string> PartitionInfo;
/// Contains the filepaths and the corresponding file positions of deleted records.
/// It's necessary to use a std::map so we can get back the file paths in order.
typedef std::map<StringValue, std::vector<int64_t>> FilePositions;
/// Nested iterator class to conveniently iterate over a FilePositions object.
class FilePositionsIterator;
/// Retreives partition information from 'row'.
PartitionInfo GetPartitionInfo(TupleRow* row);
/// Retreives delete record data from 'row'.
std::pair<impala_udf::StringVal, int64_t> GetDeleteRecord(TupleRow* row);
/// Iterates over 'batch' and stores delete records in 'partitions_to_file_positions_'.
Status BufferDeleteRecords(RowBatch* batch);
/// Sorts the buffered records.
void SortBufferedRecords();
/// Logs the buffered records at VLOG_ROW level.
void VLogBufferedRecords();
/// Verifies that there are no duplicates in the buffered delete records. It assumes
/// that SortBufferedRecords() has been called already.
Status VerifyBufferedRecords();
/// Writes all buffered delete records to position delete files.
Status FlushBufferedRecords(RuntimeState* state);
/// Registers the referenced data files in dml_exec_state_
void RegisterDataFilesInDmlExecState();
/// Initializes an empty output batch.
Status InitializeOutputRowBatch(RowBatch* batch);
/// Uses 'iterator' to write delete records from 'partitions_to_file_positions_'
/// to 'batch'.
Status GetNextRowBatch(RowBatch* batch, FilePositionsIterator* iterator);
/// Writes a single delete record <filepah, offset> to 'row'.
void WriteRow(const StringValue& filepath, int64_t offset, TupleRow* row);
/// Tries to allocate a buffer with size 'buffer_size'. Returns error when cannot
/// serve the request.
Status TryAllocateUnalignedBuffer(int buffer_size, uint8_t** buffer);
/// Sets the 'current_partition_' based on 'spec_id' and 'partitions'.
Status SetCurrentPartition(RuntimeState* state, int32_t spec_id,
const std::string& partitions);
/// Timer for 'SortBufferedRecords()'.
RuntimeProfile::Counter* position_sort_timer_;
/// Timer for 'FlushBufferedRecords()'.
RuntimeProfile::Counter* flush_timer_;
/// Buffer pool to serve allocation requests.
std::unique_ptr<MemPool> buffered_delete_pool_;
/// The sink writes partitions one-by-one.
std::unique_ptr<OutputPartition> current_partition_;
/// We collect the delete records from the input row batches in this member.
std::unordered_map<PartitionInfo, FilePositions> partitions_to_file_positions_;
/// This sink has its own DmlExecState object because in the context of UPADTEs we
/// cannot modify the same DmlExecState object simultaneously (from the INSERT and
/// DELETE sinks). It is merged into state->dml_exec_state() in Close().
DmlExecState dml_exec_state_;
};
}