#pragma once
#include "exec/data-sink.h"
#include "runtime/descriptors.h"
namespace impala {
class TupleRow;
class TableSinkBaseConfig : public DataSinkConfig {
void Close() override;
/// Expressions for computing the target partitions to which a row is written.
std::vector<ScalarExpr*> partition_key_exprs_;
~TableSinkBaseConfig() override {}
class TableSinkBase : public DataSink {
TableSinkBase(TDataSinkId sink_id, const TableSinkBaseConfig& sink_config,
const std::string& name, RuntimeState* state) :
DataSink(sink_id, sink_config, name, state),
partition_key_exprs_(sink_config.partition_key_exprs_) {}
virtual bool is_overwrite() const { return false; }
virtual bool is_result_sink() const { return false; }
virtual int64_t write_id() const { return -1; }
virtual std::string staging_dir() const { return ""; }
virtual int skip_header_line_count() const { return 0; }
virtual TSortingOrder::type sorting_order() const = 0;
virtual const vector<int32_t>& sort_columns() const {
static vector<int32_t> dummy;
return dummy;
virtual const std::map<string, int64_t>& GetParquetBloomFilterColumns() const {
static std::map<string, int64_t> dummy;
return dummy;
Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
Status Open(RuntimeState* state) override;
void Close(RuntimeState* state) override;
RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; }
RuntimeProfile::Counter* encode_timer() { return encode_timer_; }
RuntimeProfile::Counter* hdfs_write_timer() { return hdfs_write_timer_; }
RuntimeProfile::Counter* compress_timer() { return compress_timer_; }
virtual std::string DebugString() const = 0;
/// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by
/// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of the rows in
/// the current batch to insert into the partition. The PartitionPair owns the
/// OutputPartition via a unique_ptr so that the memory is freed as soon as the
/// PartitionPair is removed from the map. This is important, because the
/// PartitionPairs can have different lifetimes. For example, a clustered insert into a
/// partitioned table iterates over the partitions, so only one PartitionPairs is
/// in the map at any given time.
typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> PartitionPair;
typedef boost::unordered_map<std::string, PartitionPair> PartitionMap;
/// Returns TRUE if the target table is transactional.
bool IsTransactional() const { return IsHiveAcid() || IsIceberg(); }
virtual bool IsHiveAcid() const { return false; }
/// Initialises the filenames of a given output partition, and opens the temporary file.
/// The caller of this function must already have filled partition-related information
/// in 'output_partition', such as 'iceberg_spec_id', 'partition_name',
/// 'raw_partition_names', 'external_partition_name' for table types where these fields
/// are applicable.
/// If the partition will not have any rows added to it, empty_partition must be true.
Status InitOutputPartition(RuntimeState* state,
const HdfsPartitionDescriptor& partition_descriptor,
OutputPartition* output_partition, bool empty_partition) WARN_UNUSED_RESULT;
/// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition.
/// The Hdfs directory is created from the target table's base Hdfs dir,
/// the partition_key_names_ and the evaluated partition_key_exprs_.
/// The Hdfs file name is the unique_id_str_.
void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
OutputPartition* output);
/// Returns the ith partition name of the table.
std::string GetPartitionName(int i);
// Directory names containing partition-key values need to be UrlEncoded, in
// particular to avoid problems when '/' is part of the key value (which might
// occur, for example, with date strings). Hive will URL decode the value
// transparently when Impala's frontend asks the metastore for partition key values,
// which makes it particularly important that we use the same encoding as Hive. It's
// also not necessary to encode the values when writing partition metadata. You can
// check this with 'show partitions <tbl>' in Hive, followed by a select from a
// decoded partition key value.
std::string UrlEncodePartitionValue(const std::string& raw_str);
/// Add a temporary file to an output partition. Files are created in a
/// temporary directory and then moved to the real partition directory by the
/// coordinator in a finalization step. The temporary file's current location
/// and final destination are recorded in the state parameter.
/// If this function fails, the tmp file is cleaned up.
Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition)
/// Updates runtime stats of HDFS with rows written, then closes the file associated
/// with the partition by calling ClosePartitionFile()
Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition,
bool is_delete = false, DmlExecState* dml_exec_state = nullptr) WARN_UNUSED_RESULT;
/// Writes all rows in 'batch' referenced by the row index vector in 'indices' to the
/// partition's writer. If 'indices' is empty, then it writes all rows in 'batch'.
Status WriteRowsToPartition(
RuntimeState* state, RowBatch* batch, OutputPartition* partition,
const std::vector<int32_t>& indices = {})
/// Closes the hdfs file for this partition as well as the writer.
Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition)
// Returns TRUE if the staging step should be skipped for this partition. This allows
// for faster INSERT query completion time for the S3A filesystem as the coordinator
// does not have to copy the file(s) from the staging locaiton to the final location. We
// do not skip for INSERT OVERWRITEs because the coordinator will delete all files in
// the final location before moving the staged files there, so we cannot write directly
// to the final location and need to write to the temporary staging location.
bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition);
/// Returns TRUE for Iceberg tables.
bool IsIceberg() const { return table_desc_->IsIcebergTable(); }
/// Returns TRUE if an external output directory was provided.
bool HasExternalOutputDir() { return !external_output_dir_.empty(); }
/// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs,
/// evaluated against 'row'. The generated string is much shorter than the full Hdfs
/// file name.
void GetHashTblKey(const TupleRow* row,
const std::vector<ScalarExprEvaluator*>& evals, std::string* key);
/// Table id resolved in Prepare() to set tuple_desc_;
TableId table_id_;
/// string representation of the unique fragment instance id. Used for per-partition
/// Hdfs file names, and for tmp Hdfs directories. Set in Prepare();
std::string unique_id_str_;
/// Descriptor of target table. Set in Prepare().
const HdfsTableDescriptor* table_desc_ = nullptr;
/// The partition descriptor used when creating new partitions from this sink.
/// Currently we don't support multi-format sinks.
const HdfsPartitionDescriptor* prototype_partition_;
/// Expressions for computing the target partitions to which a row is written.
const std::vector<ScalarExpr*>& partition_key_exprs_;
std::vector<ScalarExprEvaluator*> partition_key_expr_evals_;
/// Subset of partition_key_expr_evals_ which are not constant. Set in Prepare().
/// Used for generating the string key of hash_tbl_.
std::vector<ScalarExprEvaluator*> dynamic_partition_key_expr_evals_;
/// Stores the current partition during clustered inserts across subsequent row batches.
/// Only set if 'input_is_clustered_' is true.
PartitionPair* current_clustered_partition_ = nullptr;
/// Stores the current partition key during clustered inserts across subsequent row
/// batches. Only set if 'input_is_clustered_' is true.
std::string current_clustered_partition_key_;
/// The directory in which an external FE expects results to be written to.
std::string external_output_dir_;
RuntimeProfile::Counter* partitions_created_counter_;
RuntimeProfile::Counter* files_created_counter_;
RuntimeProfile::Counter* rows_inserted_counter_;
RuntimeProfile::Counter* bytes_written_counter_;
/// Time spent converting tuple to on disk format.
RuntimeProfile::Counter* encode_timer_;
/// Time spent writing to hdfs
RuntimeProfile::Counter* hdfs_write_timer_;
/// Time spent compressing data
RuntimeProfile::Counter* compress_timer_;