blob: 10817fbd001c3f47865c5eac89a2b903ee43d9ce [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "gutil/strings/substitute.h"
#include "common/names.h"
#include "common/status.h"
#include "runtime/io/hdfs-monitored-ops.h"
#include "util/debug-util.h"
#include "util/hdfs-util.h"
#include "util/time.h"
namespace impala {
namespace io {
// Timeout for HDFS operations. This defaults to 5 minutes
DEFINE_uint64(hdfs_operation_timeout_sec, 300, "Maximum time, in seconds, that an "
"HDFS operation should wait before timing out and failing.");
Status HdfsMonitor::Init(int32_t num_threads) {
// The thread pool sets its queue size to be equal to the number of threads.
// TODO: is there a better queue size?
hdfs_worker_pool_.reset(new SynchronousThreadPool("hdfs monitor",
"hdfs_monitor_", num_threads, num_threads));
return hdfs_worker_pool_->Init();
}
class OpenHdfsFileOp : public SynchronousWorkItem {
public:
// To guarantee the appropriate lifetime, the 'fname' argument is copied. The 'fs'
// is safe, because it is kept open until the process terminates (see hdfs-fs-cache.h).
OpenHdfsFileOp(const hdfsFS& fs, const std::string* fname, int flags,
uint64_t blocksize)
: fs_(fs), fname_(*fname), flags_(flags), blocksize_(blocksize) {}
// Run hdfsOpenFile() and return error status if hdfsOpenFile() fails. If hdfsOpenFile()
// succeeds, it is valid to call GetFileHandle().
virtual Status Execute() override;
virtual std::string GetDescription() override;
// If the handle is successfully opened, GetHandle() will return the handle.
hdfsFile GetFileHandle() { return hdfs_file_; }
private:
const hdfsFS& fs_;
const std::string fname_;
int flags_;
uint64_t blocksize_;
hdfsFile hdfs_file_;
};
Status OpenHdfsFileOp::Execute() {
hdfs_file_ = hdfsOpenFile(fs_, fname_.data(), flags_, 0, 0, blocksize_);
if (hdfs_file_ == nullptr) {
// GetHdfsErrorMsg references thread local state to get error information, so it
// must happen in the same thread as the hdfsOpenFile().
return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
GetHdfsErrorMsg("Failed to open HDFS file ", fname_));
}
return Status::OK();
}
std::string OpenHdfsFileOp::GetDescription() {
return Substitute("hdfsOpenFile() for $0 at backend $1", fname_, GetBackendString());
}
Status HdfsMonitor::OpenHdfsFileWithTimeout(const hdfsFS& fs, const std::string* fname,
int flags, uint64_t blocksize, hdfsFile* fid_out) {
std::shared_ptr<OpenHdfsFileOp> op(new OpenHdfsFileOp(fs, fname, flags, blocksize));
// Submit the operation to the synchronous thread pool. If this times out, it returns
// an error status.
Status status = hdfs_worker_pool_->SynchronousOffer(op,
FLAGS_hdfs_operation_timeout_sec * MILLIS_PER_SEC);
RETURN_IF_ERROR(status);
// The operation didn't time out, and there wasn't an error. Get the handle.
*fid_out = op->GetFileHandle();
return Status::OK();
}
}
}