blob: 53d5c7aa4291db46f100db2ff96fdd1594e8889a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/hdfs-bulk-ops.h"
#include <vector>
#include "util/debug-util.h"
#include "util/error-util.h"
#include "util/hdfs-util.h"
#include "common/names.h"
using namespace impala;
HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, HdfsOperationSet* op_set)
: op_(op), src_(src), op_set_(op_set) {
DCHECK(op != RENAME);
DCHECK(!src_.empty());
}
HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, const std::string& dst,
HdfsOperationSet* op_set) : op_(op), src_(src), dst_(dst), op_set_(op_set) {
DCHECK(op == RENAME || op == MOVE);
DCHECK(!src_.empty());
DCHECK(!dst_.empty());
}
HdfsOp::HdfsOp(HdfsOpType op, const string& src, short permissions,
HdfsOperationSet* op_set)
: op_(op), src_(src), permissions_(permissions), op_set_(op_set) {
DCHECK(op == CHMOD);
DCHECK(!src_.empty());
}
// Required for ThreadPool
HdfsOp::HdfsOp() { }
void HdfsOp::AddError(const string& error_msg) const {
stringstream ss;
ss << "Hdfs op (";
switch (op_) {
case DELETE:
ss << "DELETE " << src_;
break;
case CREATE_DIR:
ss << "CREATE_DIR " << src_;
break;
case RENAME:
ss << "RENAME " << src_ << " TO " << dst_;
break;
case MOVE:
ss << "MOVE " << src_ << " TO " << dst_;
break;
case DELETE_THEN_CREATE:
ss << "DELETE_THEN_CREATE " << src_;
break;
case CHMOD:
ss << "CHMOD " << src_ << " " << oct << permissions_;
break;
}
ss << ") failed, error was: " << error_msg;
op_set_->AddError(ss.str(), this);
}
void HdfsOp::Execute() const {
if (op_set_->ShouldAbort()) return;
int err = 0;
hdfsFS src_connection;
Status connection_status = HdfsFsCache::instance()->GetConnection(src_, &src_connection,
op_set_->connection_cache());
if (!connection_status.ok()) {
AddError(connection_status.GetDetail());
op_set_->MarkOneOpDone();
return;
}
switch (op_) {
case DELETE:
err = hdfsDelete(src_connection, src_.c_str(), 1);
VLOG_FILE << "hdfsDelete() file=" << src_;
break;
case CREATE_DIR:
err = hdfsCreateDirectory(src_connection, src_.c_str());
VLOG_FILE << "hdfsCreateDirectory() file=" << src_;
break;
case RENAME:
err = hdfsRename(src_connection, src_.c_str(), dst_.c_str());
VLOG_FILE << "hdfsRename() src_file=" << src_ << " dst_file=" << dst_;
break;
case MOVE:
hdfsFS dst_connection;
connection_status = HdfsFsCache::instance()->GetConnection(dst_, &dst_connection,
op_set_->connection_cache());
if (!connection_status.ok()) break;
err = hdfsMove(src_connection, src_.c_str(), dst_connection, dst_.c_str());
VLOG_FILE << "hdfsMove() src_file=" << src_ << " dst_file=" << dst_;
break;
case DELETE_THEN_CREATE:
err = hdfsDelete(src_connection, src_.c_str(), 1);
VLOG_FILE << "hdfsDelete() file=" << src_;
if (err != -1) {
err = hdfsCreateDirectory(src_connection, src_.c_str());
VLOG_FILE << "hdfsCreateDirectory() file=" << src_;
}
break;
case CHMOD:
err = hdfsChmod(src_connection, src_.c_str(), permissions_);
VLOG_FILE << "hdfsChmod() file=" << src_;
break;
}
if (err == -1 || !connection_status.ok()) {
string error_msg =
connection_status.ok() ? GetHdfsErrorMsg("", src_) : connection_status.GetDetail();
AddError(error_msg);
}
op_set_->MarkOneOpDone();
}
// Utility method to convert from a thread-pool signature to HdfsOp::Execute
void HdfsThreadPoolHelper(int thread_id, const HdfsOp& op) {
op.Execute();
}
// Factory method to create a new thread pool (required because only this file has access
// to the definition of HdfsThreadPoolHelper).
HdfsOpThreadPool* impala::CreateHdfsOpThreadPool(const string& name, uint32_t num_threads,
uint32_t max_queue_length) {
return new HdfsOpThreadPool(name, "hdfs-worker", num_threads,
max_queue_length, &HdfsThreadPoolHelper);
}
HdfsOperationSet::HdfsOperationSet(HdfsFsCache::HdfsFsMap* connection_cache)
: connection_cache_(connection_cache) { }
bool HdfsOperationSet::Execute(ThreadPool<HdfsOp>* pool, bool abort_on_error) {
{
lock_guard<mutex> l(errors_lock_);
abort_on_error_ = abort_on_error;
}
int64_t num_ops = ops_.size();
if (num_ops == 0) return true;
ops_complete_barrier_.reset(new CountingBarrier(num_ops));
for (const HdfsOp& op: ops_) pool->Offer(op);
ops_complete_barrier_->Wait();
return errors().size() == 0;
}
void HdfsOperationSet::Add(HdfsOpType op, const string& src) {
ops_.push_back(HdfsOp(op, src, this));
}
void HdfsOperationSet::Add(HdfsOpType op, const string& src, const string& dst) {
ops_.push_back(HdfsOp(op, src, dst, this));
}
void HdfsOperationSet::Add(HdfsOpType op, const string& src, short permissions) {
ops_.push_back(HdfsOp(op, src, permissions, this));
}
void HdfsOperationSet::AddError(const string& err, const HdfsOp* op) {
lock_guard<mutex> l(errors_lock_);
errors_.push_back(make_pair(op, err));
}
void HdfsOperationSet::MarkOneOpDone() {
discard_result(ops_complete_barrier_->Notify());
}
bool HdfsOperationSet::ShouldAbort() {
lock_guard<mutex> l(errors_lock_);
return abort_on_error_ && !errors_.empty();
}