| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/hdfs-table-sink.h" |
| #include "exec/exec-node.h" |
| #include "exec/hdfs-table-writer.h" |
| #include "exec/hdfs-text-table-writer.h" |
| #include "exec/parquet/hdfs-parquet-table-writer.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "exprs/scalar-expr.h" |
| #include "gen-cpp/ImpalaInternalService_constants.h" |
| #include "gutil/stringprintf.h" |
| #include "runtime/hdfs-fs-cache.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/raw-value.inline.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/string-value.inline.h" |
| #include "util/coding-util.h" |
| #include "util/hdfs-util.h" |
| #include "util/impalad-metrics.h" |
| #include "util/metrics.h" |
| |
| #include <limits> |
| #include <vector> |
| #include <sstream> |
| #include <gutil/strings/substitute.h> |
| #include <hdfs.h> |
| #include <boost/scoped_ptr.hpp> |
| #include <boost/date_time/posix_time/posix_time.hpp> |
| #include <stdlib.h> |
| |
| #include "gen-cpp/ImpalaInternalService_constants.h" |
| |
| #include "common/names.h" |
| |
| using boost::posix_time::microsec_clock; |
| using boost::posix_time::ptime; |
| using namespace strings; |
| |
| namespace impala { |
| |
| Status HdfsTableSinkConfig::Init( |
| const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) { |
| RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state)); |
| DCHECK(tsink_->__isset.table_sink); |
| DCHECK(tsink_->table_sink.__isset.hdfs_table_sink); |
| RETURN_IF_ERROR( |
| ScalarExpr::Create(tsink_->table_sink.hdfs_table_sink.partition_key_exprs, |
| *input_row_desc_, state, &partition_key_exprs_)); |
| return Status::OK(); |
| } |
| |
| DataSink* HdfsTableSinkConfig::CreateSink(RuntimeState* state) const { |
| TDataSinkId sink_id = state->fragment().idx; |
| return state->obj_pool()->Add( |
| new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state)); |
| } |
| |
| HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config, |
| const THdfsTableSink& hdfs_sink, RuntimeState* state) |
| : TableSinkBase(sink_id, sink_config, "HdfsTableSink", state), |
| skip_header_line_count_( |
| hdfs_sink.__isset.skip_header_line_count ? hdfs_sink.skip_header_line_count : 0), |
| overwrite_(hdfs_sink.overwrite), |
| input_is_clustered_(hdfs_sink.input_is_clustered), |
| sort_columns_(hdfs_sink.sort_columns), |
| sorting_order_((TSortingOrder::type)hdfs_sink.sorting_order), |
| is_result_sink_(hdfs_sink.is_result_sink) { |
| if (hdfs_sink.__isset.write_id) { |
| write_id_ = hdfs_sink.write_id; |
| DCHECK_GT(write_id_, 0); |
| } |
| |
| // Optional output path provided by an external FE |
| if (hdfs_sink.__isset.external_output_dir) { |
| external_output_dir_ = hdfs_sink.external_output_dir; |
| } |
| |
| if (hdfs_sink.__isset.external_output_partition_depth) { |
| external_output_partition_depth_ = hdfs_sink.external_output_partition_depth; |
| } |
| |
| if (hdfs_sink.__isset.parquet_bloom_filter_col_info) { |
| parquet_bloom_filter_columns_ = hdfs_sink.parquet_bloom_filter_col_info; |
| } |
| } |
| |
| Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker)); |
| unique_id_str_ = PrintId(state->fragment_instance_id(), "-"); |
| |
| // Resolve table id and set input tuple descriptor. |
| table_desc_ = static_cast<const HdfsTableDescriptor*>( |
| state->desc_tbl().GetTableDescriptor(table_id_)); |
| |
| if (table_desc_ == nullptr) { |
| stringstream error_msg; |
| error_msg << "Failed to get table descriptor for table id: " << table_id_; |
| return Status(error_msg.str()); |
| } |
| |
| staging_dir_ = Substitute("$0/_impala_insert_staging/$1", table_desc_->hdfs_base_dir(), |
| PrintId(state->query_id(), "_")); |
| |
| // Sanity check. |
| if (!IsIceberg()) { |
| DCHECK_LE(partition_key_expr_evals_.size(), table_desc_->num_cols()) |
| << DebugString(); |
| DCHECK_EQ(partition_key_expr_evals_.size(), table_desc_->num_clustering_cols()) |
| << DebugString(); |
| } |
| DCHECK_GE(output_expr_evals_.size(), |
| table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString(); |
| |
| return Status::OK(); |
| } |
| |
| void HdfsTableSink::BuildPartitionDescMap() { |
| for (const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc: |
| table_desc_->partition_descriptors()) { |
| // Build a map whose key is computed from the value of dynamic partition keys for a |
| // particular partition, and whose value is the descriptor for that partition. |
| |
| // True if this partition might be written to, false otherwise. |
| // A partition may be written to iff: |
| // For all partition key exprs e, either: |
| // 1. e is not constant |
| // 2. The value supplied by the query for this partition key is equal to e's |
| // constant value. |
| // Only relevant partitions are remembered in partition_descriptor_map_. |
| bool relevant_partition = true; |
| HdfsPartitionDescriptor* partition = id_to_desc.second; |
| DCHECK_EQ(partition->partition_key_value_evals().size(), |
| partition_key_expr_evals_.size()); |
| vector<ScalarExprEvaluator*> dynamic_partition_key_value_evals; |
| for (size_t i = 0; i < partition_key_expr_evals_.size(); ++i) { |
| // Remember non-constant partition key exprs for building hash table of Hdfs files |
| DCHECK(&partition_key_expr_evals_[i]->root() == partition_key_exprs_[i]); |
| if (!partition_key_exprs_[i]->is_constant()) { |
| dynamic_partition_key_value_evals.push_back( |
| partition->partition_key_value_evals()[i]); |
| } else { |
| // Deal with the following: one partition has (year=2009, month=3); another has |
| // (year=2010, month=3). |
| // A query like: INSERT INTO TABLE... PARTITION(year=2009) SELECT month FROM... |
| // would lead to both partitions having the same key modulo ignored constant |
| // partition keys. So only keep a reference to the partition which matches |
| // partition_key_values for constant values, since only that is written to. |
| void* table_partition_key_value = |
| partition->partition_key_value_evals()[i]->GetValue(nullptr); |
| void* target_partition_key_value = |
| partition_key_expr_evals_[i]->GetValue(nullptr); |
| if (table_partition_key_value == nullptr |
| && target_partition_key_value == nullptr) { |
| continue; |
| } |
| if (table_partition_key_value == nullptr |
| || target_partition_key_value == nullptr |
| || !RawValue::Eq(table_partition_key_value, target_partition_key_value, |
| partition_key_expr_evals_[i]->root().type())) { |
| relevant_partition = false; |
| break; |
| } |
| } |
| } |
| if (relevant_partition) { |
| string key; |
| // Pass nullptr as row, since all of these expressions are constant, and can |
| // therefore be evaluated without a valid row context. |
| GetHashTblKey(nullptr, dynamic_partition_key_value_evals, &key); |
| DCHECK(partition_descriptor_map_.find(key) == partition_descriptor_map_.end()) |
| << "Partitions with duplicate 'static' keys found during INSERT"; |
| partition_descriptor_map_[key] = partition; |
| } |
| } |
| } |
| |
| Status HdfsTableSink::Open(RuntimeState* state) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| RETURN_IF_ERROR(TableSinkBase::Open(state)); |
| if (!IsIceberg()) BuildPartitionDescMap(); |
| return Status::OK(); |
| } |
| |
| Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) { |
| DCHECK_GT(batch->num_rows(), 0); |
| DCHECK(!dynamic_partition_key_expr_evals_.empty()); |
| DCHECK(input_is_clustered_); |
| |
| // Initialize the clustered partition and key. |
| if (current_clustered_partition_ == nullptr) { |
| const TupleRow* current_row = batch->GetRow(0); |
| GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, |
| ¤t_clustered_partition_key_); |
| RETURN_IF_ERROR(GetOutputPartition(state, current_row, |
| current_clustered_partition_key_, ¤t_clustered_partition_, false)); |
| } |
| |
| // Compare the last row of the batch to the last current partition key. If they match, |
| // then all the rows in the batch have the same key and can be written as a whole. |
| string last_row_key; |
| GetHashTblKey(batch->GetRow(batch->num_rows() - 1), |
| dynamic_partition_key_expr_evals_, &last_row_key); |
| if (last_row_key == current_clustered_partition_key_) { |
| DCHECK(current_clustered_partition_->second.empty()); |
| RETURN_IF_ERROR(WriteRowsToPartition(state, batch, |
| current_clustered_partition_->first.get())); |
| return Status::OK(); |
| } |
| |
| // Not all rows in this batch match the previously written partition key, so we process |
| // them individually. |
| for (int i = 0; i < batch->num_rows(); ++i) { |
| const TupleRow* current_row = batch->GetRow(i); |
| |
| string key; |
| GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key); |
| |
| if (current_clustered_partition_key_ != key) { |
| DCHECK(current_clustered_partition_ != nullptr); |
| // Done with previous partition - write rows and close. |
| if (!current_clustered_partition_->second.empty()) { |
| RETURN_IF_ERROR(WriteRowsToPartition(state, batch, |
| current_clustered_partition_->first.get(), |
| current_clustered_partition_->second)); |
| current_clustered_partition_->second.clear(); |
| } |
| RETURN_IF_ERROR(FinalizePartitionFile(state, |
| current_clustered_partition_->first.get())); |
| if (current_clustered_partition_->first->writer.get() != nullptr) { |
| current_clustered_partition_->first->writer->Close(); |
| } |
| partition_keys_to_output_partitions_.erase(current_clustered_partition_key_); |
| current_clustered_partition_key_ = std::move(key); |
| RETURN_IF_ERROR(GetOutputPartition(state, current_row, |
| current_clustered_partition_key_, ¤t_clustered_partition_, false)); |
| } |
| #ifdef DEBUG |
| string debug_row_key; |
| GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &debug_row_key); |
| DCHECK_EQ(current_clustered_partition_key_, debug_row_key); |
| #endif |
| DCHECK(current_clustered_partition_ != nullptr); |
| current_clustered_partition_->second.push_back(i); |
| } |
| // Write final set of rows to the partition but keep its file open. |
| RETURN_IF_ERROR(WriteRowsToPartition(state, batch, |
| current_clustered_partition_->first.get(), current_clustered_partition_->second)); |
| current_clustered_partition_->second.clear(); |
| return Status::OK(); |
| } |
| |
| Status HdfsTableSink::ConstructPartitionInfo( |
| const TupleRow* row, |
| OutputPartition* output_partition) { |
| DCHECK(output_partition != nullptr); |
| DCHECK(output_partition->raw_partition_names.empty()); |
| |
| stringstream url_encoded_partition_name_ss; |
| stringstream external_partition_name_ss; |
| for (int i = 0; i < partition_key_expr_evals_.size(); ++i) { |
| stringstream raw_partition_key_value_ss; |
| stringstream encoded_partition_key_value_ss; |
| |
| raw_partition_key_value_ss << GetPartitionName(i) << "="; |
| encoded_partition_key_value_ss << GetPartitionName(i) << "="; |
| |
| void* value = partition_key_expr_evals_[i]->GetValue(row); |
| if (value == nullptr) { |
| raw_partition_key_value_ss << table_desc_->null_partition_key_value(); |
| encoded_partition_key_value_ss << table_desc_->null_partition_key_value(); |
| } else { |
| string value_str; |
| partition_key_expr_evals_[i]->PrintValue(value, &value_str); |
| |
| raw_partition_key_value_ss << value_str; |
| |
| string part_key_value = UrlEncodePartitionValue(value_str); |
| encoded_partition_key_value_ss << part_key_value; |
| } |
| if (i < partition_key_expr_evals_.size() - 1) encoded_partition_key_value_ss << "/"; |
| |
| url_encoded_partition_name_ss << encoded_partition_key_value_ss.str(); |
| if (HasExternalOutputDir() && i >= external_output_partition_depth_) { |
| external_partition_name_ss << encoded_partition_key_value_ss.str(); |
| } |
| |
| output_partition->raw_partition_names.push_back(raw_partition_key_value_ss.str()); |
| } |
| |
| output_partition->partition_name = url_encoded_partition_name_ss.str(); |
| output_partition->external_partition_name = external_partition_name_ss.str(); |
| if (IsIceberg()) { |
| // Use default partition spec id. |
| output_partition->iceberg_spec_id = table_desc_->IcebergSpecId(); |
| } |
| return Status::OK(); |
| } |
| |
| inline const HdfsPartitionDescriptor* HdfsTableSink::GetPartitionDescriptor( |
| const string& key) { |
| if (IsIceberg()) return table_desc_->partition_descriptors().begin()->second; |
| const HdfsPartitionDescriptor* ret = prototype_partition_; |
| PartitionDescriptorMap::const_iterator it = partition_descriptor_map_.find(key); |
| if (it != partition_descriptor_map_.end()) ret = it->second; |
| return ret; |
| } |
| |
| inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const TupleRow* row, |
| const string& key, PartitionPair** partition_pair, bool no_more_rows) { |
| DCHECK(row != nullptr || key == ROOT_PARTITION_KEY); |
| PartitionMap::iterator existing_partition; |
| existing_partition = partition_keys_to_output_partitions_.find(key); |
| if (existing_partition == partition_keys_to_output_partitions_.end()) { |
| // Create a new OutputPartition, and add it to partition_keys_to_output_partitions. |
| const HdfsPartitionDescriptor* partition_descriptor = GetPartitionDescriptor(key); |
| std::unique_ptr<OutputPartition> partition(new OutputPartition()); |
| // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/" |
| // etc. |
| RETURN_IF_ERROR(ConstructPartitionInfo(row, partition.get())); |
| Status status = |
| InitOutputPartition(state, *partition_descriptor, partition.get(), |
| no_more_rows); |
| if (!status.ok()) { |
| // We failed to create the output partition successfully. Clean it up now |
| // as it is not added to partition_keys_to_output_partitions_ so won't be |
| // cleaned up in Close(). |
| if (partition->writer.get() != nullptr) partition->writer->Close(); |
| return status; |
| } |
| |
| // Indicate that temporary directory is to be deleted after execution. |
| bool clean_up_staging_dir = |
| !no_more_rows && !ShouldSkipStaging(state, partition.get()); |
| |
| // Save the partition name so that the coordinator can create the partition |
| // directory structure if needed. |
| state->dml_exec_state()->AddPartition( |
| partition->partition_name, partition_descriptor->id(), |
| &table_desc_->hdfs_base_dir(), |
| clean_up_staging_dir ? &partition->tmp_hdfs_dir_name : nullptr); |
| |
| partition_keys_to_output_partitions_[key].first = std::move(partition); |
| *partition_pair = &partition_keys_to_output_partitions_[key]; |
| } else { |
| // Use existing output_partition partition. |
| *partition_pair = &existing_partition->second; |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| expr_results_pool_->Clear(); |
| RETURN_IF_ERROR(state->CheckQueryState()); |
| // We don't do any work for an empty batch. |
| if (batch->num_rows() == 0) return Status::OK(); |
| |
| // If there are no partition keys then just pass the whole batch to one partition. |
| if (dynamic_partition_key_expr_evals_.empty()) { |
| // If there are no dynamic keys just use an empty key. |
| PartitionPair* partition_pair; |
| RETURN_IF_ERROR( |
| GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, &partition_pair, false)); |
| DCHECK(partition_pair->second.empty()); |
| RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair->first.get())); |
| } else if (input_is_clustered_) { |
| RETURN_IF_ERROR(WriteClusteredRowBatch(state, batch)); |
| } else { |
| for (int i = 0; i < batch->num_rows(); ++i) { |
| const TupleRow* current_row = batch->GetRow(i); |
| |
| string key; |
| GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key); |
| PartitionPair* partition_pair = nullptr; |
| RETURN_IF_ERROR( |
| GetOutputPartition(state, current_row, key, &partition_pair, false)); |
| partition_pair->second.push_back(i); |
| } |
| for (PartitionMap::value_type& partition : partition_keys_to_output_partitions_) { |
| PartitionPair& partition_pair = partition.second; |
| if (!partition_pair.second.empty()) { |
| RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair.first.get(), |
| partition_pair.second)); |
| partition_pair.second.clear(); |
| } |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status HdfsTableSink::FlushFinal(RuntimeState* state) { |
| DCHECK(!closed_); |
| SCOPED_TIMER(profile()->total_time_counter()); |
| |
| if (dynamic_partition_key_expr_evals_.empty()) { |
| // Make sure we create an output partition even if the input is empty because we need |
| // it to delete the existing data for 'insert overwrite'. |
| PartitionPair* dummy; |
| RETURN_IF_ERROR(GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, &dummy, true)); |
| } |
| |
| // Close Hdfs files, and update stats in runtime state. |
| for (PartitionMap::iterator cur_partition = |
| partition_keys_to_output_partitions_.begin(); |
| cur_partition != partition_keys_to_output_partitions_.end(); |
| ++cur_partition) { |
| RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first.get())); |
| } |
| // Returns OK if there is no debug action. |
| return DebugAction(state->query_options(), "FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL"); |
| } |
| |
| void HdfsTableSink::Close(RuntimeState* state) { |
| if (closed_) return; |
| SCOPED_TIMER(profile()->total_time_counter()); |
| for (PartitionMap::iterator cur_partition = |
| partition_keys_to_output_partitions_.begin(); |
| cur_partition != partition_keys_to_output_partitions_.end(); |
| ++cur_partition) { |
| if (cur_partition->second.first->writer.get() != nullptr) { |
| cur_partition->second.first->writer->Close(); |
| } |
| Status close_status = ClosePartitionFile(state, cur_partition->second.first.get()); |
| if (!close_status.ok()) state->LogError(close_status.msg()); |
| } |
| partition_keys_to_output_partitions_.clear(); |
| TableSinkBase::Close(state); |
| closed_ = true; |
| } |
| |
| string HdfsTableSink::DebugString() const { |
| stringstream out; |
| out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false") |
| << " table_desc=" << table_desc_->DebugString() |
| << " partition_key_exprs=" |
| << ScalarExpr::DebugString(partition_key_exprs_) |
| << " output_exprs=" << ScalarExpr::DebugString(output_exprs_) |
| << " write_id=" << write_id_ |
| << ")"; |
| return out.str(); |
| } |
| |
| } |