#include "runtime/dml-exec-state.h"
#include <mutex>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/filesystem.hpp>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "exec/data-sink.h"
#include "util/pretty-printer.h"
#include "util/container-util.h"
#include "util/hdfs-bulk-ops.h"
#include "util/hdfs-util.h"
#include "util/runtime-profile-counters.h"
#include "runtime/descriptors.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/exec-env.h"
#include "gen-cpp/control_service.pb.h"
#include "gen-cpp/ImpalaService_types.h"
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/Frontend_types.h"
#include "common/names.h"
DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
"INSERTs will inherit the permissions of their parent directories");
using namespace impala;
using boost::algorithm::is_any_of;
using boost::algorithm::split;
typedef google::protobuf::Map<string, int64> PerColumnSizePBMap;
typedef google::protobuf::Map<string, DmlPartitionStatusPB> PerPartitionStatusPBMap;
string DmlExecState::OutputPartitionStats(const string& prefix) {
lock_guard<mutex> l(lock_);
const char* indent = " ";
stringstream ss;
ss << prefix;
bool first = true;
for (const PartitionStatusMap::value_type& val: per_partition_status_) {
if (!first) ss << endl;
first = false;
ss << "Partition: ";
const string& partition_key = val.first;
if (partition_key == DataSink::ROOT_PARTITION_KEY) {
ss << "Default" << endl;
} else {
ss << partition_key << endl;
if (val.second.has_num_modified_rows()) {
ss << "NumModifiedRows: " << val.second.num_modified_rows() << endl;
if (!val.second.has_stats()) continue;
const DmlStatsPB& stats = val.second.stats();
if (stats.has_kudu_stats()) {
ss << "NumRowErrors: " << stats.kudu_stats().num_row_errors() << endl;
ss << indent << "BytesWritten: "
<< PrettyPrinter::Print(stats.bytes_written(), TUnit::BYTES);
if (stats.has_parquet_stats()) {
const ParquetDmlStatsPB& parquet_stats = stats.parquet_stats();
ss << endl << indent << "Per Column Sizes:";
for (const PerColumnSizePBMap::value_type& i : parquet_stats.per_column_size()) {
ss << endl << indent << indent << i.first << ": "
<< PrettyPrinter::Print(i.second, TUnit::BYTES);
return ss.str();
void DmlExecState::Update(const DmlExecStatusPB& dml_exec_status) {
lock_guard<mutex> l(lock_);
const PerPartitionStatusPBMap& new_partition_status_map =
for (const PerPartitionStatusPBMap::value_type& part : new_partition_status_map) {
DmlPartitionStatusPB* status = &(per_partition_status_[part.first]);
status->num_modified_rows() + part.second.num_modified_rows());
part.second.kudu_latest_observed_ts(), status->kudu_latest_observed_ts()));
if (part.second.has_stats()) {
MergeDmlStats(part.second.stats(), status->mutable_stats());
dml_exec_status.files_to_move().begin(), dml_exec_status.files_to_move().end());
void DmlExecState::AddFileToMove(const string& file_name, const string& location) {
lock_guard<mutex> l(lock_);
files_to_move_[file_name] = location;
uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
lock_guard<mutex> l(lock_);
uint64_t max_ts = 0;
for (const PartitionStatusMap::value_type& p : per_partition_status_) {
max_ts = max<uint64_t>(max_ts, p.second.kudu_latest_observed_ts());
return max_ts;
int64_t DmlExecState::GetNumModifiedRows() {
lock_guard<mutex> l(lock_);
int64_t result = 0;
for (const PartitionStatusMap::value_type& p : per_partition_status_) {
result += p.second.num_modified_rows();
return result;
bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
lock_guard<mutex> l(lock_);
for (const PartitionStatusMap::value_type& partition : per_partition_status_) {
return catalog_update->created_partitions.size() != 0;
Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
RuntimeProfile* profile) {
lock_guard<mutex> l(lock_);
PermissionCache permissions_cache;
HdfsFsCache::HdfsFsMap filesystem_connection_cache;
HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
// INSERT finalization happens in the five following steps
// 1. If OVERWRITE, remove all the files in the target directory
// 2. Create all the necessary partition directories.
// Loop over all partitions that were updated by this insert, and create the set of
// filesystem operations required to create the correct partition structure on disk.
for (const PartitionStatusMap::value_type& partition : per_partition_status_) {
SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
// INSERT allows writes to tables that have partitions on multiple filesystems.
// So we need to open connections to different filesystems as necessary. We use a
// local connection cache and populate it with one connection per filesystem that the
// partitions are on.
hdfsFS partition_fs_connection;
partition.second.partition_base_dir(), &partition_fs_connection,
// Look up the partition in the descriptor table.
stringstream part_path_ss;
if ( == -1) {
// If this is a non-existant partition, use the default partition location of
// <base_dir>/part_key_1=val/part_key_2=val/...
part_path_ss << params.hdfs_base_dir << "/" << partition.first;
} else {
HdfsPartitionDescriptor* part = hdfs_table->GetPartition(;
DCHECK(part != nullptr)
<< "table_id=" << hdfs_table->id() << " partition_id=" <<;
part_path_ss << part->location();
const string& part_path = part_path_ss.str();
bool is_s3_path = IsS3APath(part_path.c_str());
// If this is an overwrite insert, we will need to delete any updated partitions
if (params.is_overwrite) {
if (partition.first.empty()) {
// If the root directory is written to, then the table must not be partitioned
DCHECK(per_partition_status_.size() == 1);
// We need to be a little more careful, and only delete data files in the root
// because the tmp directories the sink(s) wrote are there also.
// So only delete files in the table directory - all files are treated as data
// files by Hive and Impala, but directories are ignored (and may legitimately
// be used to store permanent non-table data by other applications).
int num_files = 0;
// hfdsListDirectory() only sets errno if there is an error, but it doesn't set
// it to 0 if the call succeed. When there is no error, errno could be any
// value. So need to clear errno before calling it.
// Once HDFS-8407 is fixed, the errno reset won't be needed.
errno = 0;
hdfsFileInfo* existing_files =
hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
if (existing_files == nullptr && errno == EAGAIN) {
errno = 0;
existing_files =
hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
// hdfsListDirectory() returns nullptr not only when there is an error but also
// when the directory is empty(HDFS-8407). Need to check errno to make sure
// the call fails.
if (existing_files == nullptr && errno != 0) {
return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
for (int i = 0; i < num_files; ++i) {
const string filename =
if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
partition_create_ops.Add(DELETE, existing_files[i].mName);
hdfsFreeFileInfo(existing_files, num_files);
} else {
// This is a partition directory, not the root directory; we can delete
// recursively with abandon, after checking that it ever existed.
// TODO: There's a potential race here between checking for the directory
// and a third-party deleting it.
if (FLAGS_insert_inherit_permissions && !is_s3_path) {
// There is no directory structure in S3, so "inheriting" permissions is not
// possible.
// TODO: Try to mimic inheriting permissions for S3.
partition_fs_connection, part_path, &permissions_cache);
// S3 doesn't have a directory structure, so we technically wouldn't need to
// CREATE_DIR on S3. However, libhdfs always checks if a path exists before
// carrying out an operation on that path. So we still need to call CREATE_DIR
// before we access that path due to this limitation.
if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
} else {
// Otherwise just create the directory.
partition_create_ops.Add(CREATE_DIR, part_path);
} else if (!is_s3_path || !s3_skip_insert_staging) {
// If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
// would have already been created by the table sinks.
if (FLAGS_insert_inherit_permissions && !is_s3_path) {
partition_fs_connection, part_path, &permissions_cache);
if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
partition_create_ops.Add(CREATE_DIR, part_path);
SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
if (!partition_create_ops.Execute(
ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
for (const HdfsOperationSet::Error& err : partition_create_ops.errors()) {
// It's ok to ignore errors creating the directories, since they may already
// exist. If there are permission errors, we'll run into them later.
if (err.first->op() != CREATE_DIR) {
return Status(Substitute(
"Error(s) deleting partition directories. First error (of $0) was: $1",
partition_create_ops.errors().size(), err.second));
// 3. Move all tmp files
HdfsOperationSet move_ops(&filesystem_connection_cache);
HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
for (const FileMoveMap::value_type& move : files_to_move_) {
// Empty destination means delete, so this is a directory. These get deleted in a
// separate pass to ensure that we have moved all the contents of the directory first.
if (move.second.empty()) {
VLOG_ROW << "Deleting file: " << move.first;
dir_deletion_ops.Add(DELETE, move.first);
} else {
VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
move_ops.Add(RENAME, move.first, move.second);
} else {
move_ops.Add(MOVE, move.first, move.second);
SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileMoveTimer", "FinalizationTimer"));
if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
stringstream ss;
ss << "Error(s) moving partition files. First error (of "
<< move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
return Status(ss.str());
// 4. Delete temp directories
SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileDeletionTimer", "FinalizationTimer"));
if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
stringstream ss;
ss << "Error(s) deleting staging directories. First error (of "
<< dir_deletion_ops.errors().size() << ") was: "
<< dir_deletion_ops.errors()[0].second;
return Status(ss.str());
// 5. Optionally update the permissions of the created partition directories
// Do this last so that we don't make a dir unwritable before we write to it.
if (FLAGS_insert_inherit_permissions) {
HdfsOperationSet chmod_ops(&filesystem_connection_cache);
for (const PermissionCache::value_type& perm : permissions_cache) {
bool new_dir = perm.second.first;
if (new_dir) {
short permissions = perm.second.second;
VLOG_QUERY << "INSERT created new directory: " << perm.first
<< ", inherited permissions are: " << oct << permissions;
chmod_ops.Add(CHMOD, perm.first, permissions);
if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
stringstream ss;
ss << "Error(s) setting permissions on newly created partition directories. First"
<< " error (of " << chmod_ops.errors().size() << ") was: "
<< chmod_ops.errors()[0].second;
return Status(ss.str());
return Status::OK();
void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
PermissionCache* permissions_cache) {
// Find out if the path begins with a hdfs:// -style prefix, and remove it and the
// location (e.g. host:port) if so.
int scheme_end = path_str.find("://");
string stripped_str;
if (scheme_end != string::npos) {
// Skip past the subsequent location:port/ prefix.
stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
} else {
stripped_str = path_str;
// Get the list of path components, used to build all path prefixes.
vector<string> components;
split(components, stripped_str, is_any_of("/"));
// Build a set of all prefixes (including the complete string) of stripped_path. So
// /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
vector<string> prefixes;
// Stores the current prefix
stringstream accumulator;
for (const string& component : components) {
if (component.empty()) continue;
accumulator << "/" << component;
// Now for each prefix, stat() it to see if a) it exists and b) if so what its
// permissions are. When we meet a directory that doesn't exist, we record the fact that
// we need to create it, and the permissions of its parent dir to inherit.
// Every prefix is recorded in the PermissionCache so we don't do more than one stat()
// for each path. If we need to create the directory, we record it as the pair (true,
// perms) so that the caller can identify which directories need their permissions
// explicitly set.
// Set to the permission of the immediate parent (i.e. the permissions to inherit if the
// current dir doesn't exist).
short permissions = 0;
for (const string& path : prefixes) {
PermissionCache::const_iterator it = permissions_cache->find(path);
if (it == permissions_cache->end()) {
hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
if (info != nullptr) {
// File exists, so fill the cache with its current permissions.
make_pair(path, make_pair(false, info->mPermissions)));
permissions = info->mPermissions;
hdfsFreeFileInfo(info, 1);
} else {
// File doesn't exist, so we need to set its permissions to its immediate parent
// once it's been created.
permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
} else {
permissions = it->second.second;
void DmlExecState::ToProto(DmlExecStatusPB* dml_status) {
lock_guard<mutex> l(lock_);
for (const FileMoveMap::value_type& file : files_to_move_) {
(*dml_status->mutable_files_to_move())[file.first] = file.second;
for (const PartitionStatusMap::value_type& part : per_partition_status_) {
(*dml_status->mutable_per_partition_status())[part.first] = part.second;
void DmlExecState::ToTDmlResult(TDmlResult* dml_result) {
lock_guard<mutex> l(lock_);
int64_t num_row_errors = 0;
bool has_kudu_stats = false;
for (const PartitionStatusMap::value_type& v: per_partition_status_) {
dml_result->rows_modified[v.first] = v.second.num_modified_rows();
if (v.second.has_stats() && v.second.stats().has_kudu_stats()) {
has_kudu_stats = true;
num_row_errors += v.second.stats().kudu_stats().num_row_errors();
if (has_kudu_stats) dml_result->__set_num_row_errors(num_row_errors);
void DmlExecState::AddPartition(
const string& name, int64_t id, const string* base_dir) {
lock_guard<mutex> l(lock_);
DCHECK(per_partition_status_.find(name) == per_partition_status_.end());
DmlPartitionStatusPB status;
status.set_partition_base_dir(base_dir != nullptr ? *base_dir : "");
per_partition_status_.insert(make_pair(name, status));
void DmlExecState::UpdatePartition(const string& partition_name,
int64_t num_modified_rows_delta, const DmlStatsPB* insert_stats) {
lock_guard<mutex> l(lock_);
PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
DCHECK(entry != per_partition_status_.end());
entry->second.num_modified_rows() + num_modified_rows_delta);
if (insert_stats == nullptr) return;
MergeDmlStats(*insert_stats, entry->second.mutable_stats());
void DmlExecState::MergeDmlStats(const DmlStatsPB& src, DmlStatsPB* dst) {
dst->set_bytes_written(dst->bytes_written() + src.bytes_written());
if (src.has_kudu_stats()) {
KuduDmlStatsPB* kudu_stats = dst->mutable_kudu_stats();
kudu_stats->num_row_errors() + src.kudu_stats().num_row_errors());
if (src.has_parquet_stats()) {
if (dst->has_parquet_stats()) {
} else {
*dst->mutable_parquet_stats() = src.parquet_stats();
void DmlExecState::InitForKuduDml() {
// For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
const string& partition_name = DataSink::ROOT_PARTITION_KEY;
lock_guard<mutex> l(lock_);
DCHECK(per_partition_status_.find(partition_name) == per_partition_status_.end());
DmlPartitionStatusPB status;
per_partition_status_.insert(make_pair(partition_name, status));
void DmlExecState::SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
int64_t latest_ts) {
// For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
const string& partition_name = DataSink::ROOT_PARTITION_KEY;
lock_guard<mutex> l(lock_);
PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
DCHECK(entry != per_partition_status_.end());