blob: c6a6cec45bf43f8710ce8b8a8ca3ae896befa3ed [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include "gutil/strings/substitute.h"
#include "runtime/io/disk-io-mgr-internal.h"
#include "runtime/io/hdfs-file-reader.h"
#include "runtime/io/request-context.h"
#include "runtime/io/request-ranges.h"
#include "util/hdfs-util.h"
#include "util/impalad-metrics.h"
#include "common/names.h"
DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
"when performing HDFS read operations. This is necessary to use HDFS hedged reads "
"(assuming the HDFS client is configured to do so).");
#ifndef NDEBUG
DECLARE_int32(stress_disk_read_delay_ms);
#endif
namespace impala {
namespace io {
HdfsFileReader::~HdfsFileReader() {
DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
DCHECK(cached_buffer_ == nullptr) << "Cached buffer was not released.";
}
Status HdfsFileReader::Open(bool use_file_handle_cache) {
unique_lock<SpinLock> hdfs_lock(lock_);
RETURN_IF_ERROR(scan_range_->cancel_status_);
if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
// With file handle caching, the reader does not maintain its own
// hdfs file handle. File handle caching is only used for local files,
// so s3 and remote filesystems should obtain an exclusive file handle
// for each scan range.
if (use_file_handle_cache && expected_local_) return Status::OK();
auto io_mgr = scan_range_->io_mgr_;
// Get a new exclusive file handle.
exclusive_hdfs_fh_ = io_mgr->GetExclusiveHdfsFileHandle(hdfs_fs_,
scan_range_->file_string(), scan_range_->mtime(), scan_range_->reader_);
if (exclusive_hdfs_fh_ == nullptr) {
return Status(TErrorCode::DISK_IO_ERROR,
GetHdfsErrorMsg("Failed to open HDFS file ", *scan_range_->file_string()));
}
if (hdfsSeek(hdfs_fs_, exclusive_hdfs_fh_->file(), scan_range_->offset_) != 0) {
// Destroy the file handle
io_mgr->ReleaseExclusiveHdfsFileHandle(exclusive_hdfs_fh_);
exclusive_hdfs_fh_ = nullptr;
return Status(TErrorCode::DISK_IO_ERROR,
Substitute("Error seeking to $0 in file: $1 $2", scan_range_->offset(),
*scan_range_->file_string(), GetHdfsErrorMsg("")));
}
ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
return Status::OK();
}
Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
DCHECK(scan_range_->read_in_flight());
DCHECK_GE(bytes_to_read, 0);
// Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
#ifndef NDEBUG
if (FLAGS_stress_disk_read_delay_ms > 0) {
SleepForMs(FLAGS_stress_disk_read_delay_ms);
}
#endif
unique_lock<SpinLock> hdfs_lock(lock_);
RETURN_IF_ERROR(scan_range_->cancel_status_);
auto io_mgr = scan_range_->io_mgr_;
auto request_context = scan_range_->reader_;
*eof = false;
*bytes_read = 0;
CachedHdfsFileHandle* borrowed_hdfs_fh = nullptr;
hdfsFile hdfs_file;
// If the reader has an exclusive file handle, use it. Otherwise, borrow
// a file handle from the cache.
if (exclusive_hdfs_fh_ != nullptr) {
hdfs_file = exclusive_hdfs_fh_->file();
} else {
borrowed_hdfs_fh = io_mgr->GetCachedHdfsFileHandle(hdfs_fs_,
scan_range_->file_string(),
scan_range_->mtime(), request_context);
if (borrowed_hdfs_fh == nullptr) {
return Status(TErrorCode::DISK_IO_ERROR,
GetHdfsErrorMsg("Failed to open HDFS file ", *scan_range_->file_string()));
}
hdfs_file = borrowed_hdfs_fh->file();
}
int64_t max_chunk_size = scan_range_->MaxReadChunkSize();
Status status = Status::OK();
{
ScopedTimer<MonotonicStopWatch> req_context_read_timer(
scan_range_->reader_->read_timer_);
while (*bytes_read < bytes_to_read) {
int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
DCHECK_GT(chunk_size, 0);
// The hdfsRead() length argument is an int.
DCHECK_LE(chunk_size, numeric_limits<int>::max());
int current_bytes_read = -1;
// bytes_read_ is only updated after the while loop
int64_t position_in_file = file_offset + *bytes_read;
// ReadFromPosInternal() might fail due to a bad file handle.
// If that was the case, allow for a retry to fix it.
status = ReadFromPosInternal(hdfs_file, position_in_file,
borrowed_hdfs_fh != nullptr, buffer + *bytes_read, chunk_size,
&current_bytes_read);
// Retry if:
// - first read was not successful
// and
// - used a borrowed file handle
if (!status.ok() && borrowed_hdfs_fh != nullptr) {
// The error may be due to a bad file handle. Reopen the file handle and retry.
// Exclude this time from the read timers.
req_context_read_timer.Stop();
RETURN_IF_ERROR(io_mgr->ReopenCachedHdfsFileHandle(hdfs_fs_,
scan_range_->file_string(), scan_range_->mtime(),
request_context, &borrowed_hdfs_fh));
hdfs_file = borrowed_hdfs_fh->file();
req_context_read_timer.Start();
status = ReadFromPosInternal(hdfs_file, position_in_file,
borrowed_hdfs_fh != nullptr, buffer + *bytes_read, chunk_size,
&current_bytes_read);
}
if (!status.ok()) {
break;
}
DCHECK_GT(current_bytes_read, -1);
if (current_bytes_read == 0) {
// No more bytes in the file. The scan range went past the end.
*eof = true;
break;
}
*bytes_read += current_bytes_read;
// Collect and accumulate statistics
GetHdfsStatistics(hdfs_file);
}
}
if (borrowed_hdfs_fh != nullptr) {
io_mgr->ReleaseCachedHdfsFileHandle(scan_range_->file_string(), borrowed_hdfs_fh);
}
return status;
}
Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
bool is_borrowed_fh, uint8_t* buffer, int64_t chunk_size, int* bytes_read) {
// For file handles from the cache, any of the below file operations may fail
// due to a bad file handle.
if (FLAGS_use_hdfs_pread) {
*bytes_read = hdfsPread(hdfs_fs_, hdfs_file, position_in_file, buffer, chunk_size);
if (*bytes_read == -1) {
return Status(TErrorCode::DISK_IO_ERROR,
GetHdfsErrorMsg("Error reading from HDFS file: ",
*scan_range_->file_string()));
}
} else {
// If the file handle is borrowed, it may not be at the appropriate
// location. Seek to the appropriate location.
if (is_borrowed_fh) {
if (hdfsSeek(hdfs_fs_, hdfs_file, position_in_file) != 0) {
return Status(TErrorCode::DISK_IO_ERROR,
Substitute("Error seeking to $0 in file: $1: $2",
position_in_file, *scan_range_->file_string(), GetHdfsErrorMsg("")));
}
}
*bytes_read = hdfsRead(hdfs_fs_, hdfs_file, buffer, chunk_size);
if (*bytes_read == -1) {
return Status(TErrorCode::DISK_IO_ERROR,
GetHdfsErrorMsg("Error reading from HDFS file: ",
*scan_range_->file_string()));
}
}
return Status::OK();
}
void HdfsFileReader::CachedFile(uint8_t** data, int64_t* length) {
{
unique_lock<SpinLock> hdfs_lock(lock_);
DCHECK(cached_buffer_ == nullptr);
DCHECK(exclusive_hdfs_fh_ != nullptr);
cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(),
scan_range_->io_mgr_->cached_read_options(), scan_range_->len());
}
if (cached_buffer_ == nullptr) {
*data = nullptr;
*length = 0;
return;
}
*data = reinterpret_cast<uint8_t*>(
const_cast<void*>(hadoopRzBufferGet(cached_buffer_)));
*length = hadoopRzBufferLength(cached_buffer_);
}
void HdfsFileReader::Close() {
unique_lock<SpinLock> hdfs_lock(lock_);
if (exclusive_hdfs_fh_ != nullptr) {
GetHdfsStatistics(exclusive_hdfs_fh_->file());
if (cached_buffer_ != nullptr) {
hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
cached_buffer_ = nullptr;
}
// Destroy the file handle.
scan_range_->io_mgr_->ReleaseExclusiveHdfsFileHandle(exclusive_hdfs_fh_);
exclusive_hdfs_fh_ = nullptr;
ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
}
if (FLAGS_use_hdfs_pread && IsHdfsPath(scan_range_->file())) {
// Update Hedged Read Metrics.
// We call it only if the --use_hdfs_pread flag is set, to avoid having the
// libhdfs client malloc and free a hdfsHedgedReadMetrics object unnecessarily
// otherwise. 'hedged_metrics' is only set upon success.
// We also avoid calling hdfsGetHedgedReadMetrics() when the file is not on HDFS
// (see HDFS-13417).
struct hdfsHedgedReadMetrics* hedged_metrics;
int success = hdfsGetHedgedReadMetrics(hdfs_fs_, &hedged_metrics);
if (success == 0) {
ImpaladMetrics::HEDGED_READ_OPS->SetValue(hedged_metrics->hedgedReadOps);
ImpaladMetrics::HEDGED_READ_OPS_WIN->SetValue(
hedged_metrics->hedgedReadOpsWin);
hdfsFreeHedgedReadMetrics(hedged_metrics);
}
}
if (num_remote_bytes_ > 0) {
scan_range_->reader_->num_remote_ranges_.Add(1L);
if (expected_local_) {
scan_range_->reader_->unexpected_remote_bytes_.Add(num_remote_bytes_);
VLOG_FILE << "Unexpected remote HDFS read of "
<< PrettyPrinter::Print(num_remote_bytes_, TUnit::BYTES)
<< " for file '" << *scan_range_->file_string() << "'";
}
}
}
void HdfsFileReader::GetHdfsStatistics(hdfsFile hdfs_file) {
struct hdfsReadStatistics* stats;
if (IsHdfsPath(scan_range_->file())) {
int success = hdfsFileGetReadStatistics(hdfs_file, &stats);
if (success == 0) {
scan_range_->reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);
scan_range_->reader_->bytes_read_short_circuit_.Add(
stats->totalShortCircuitBytesRead);
scan_range_->reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
if (stats->totalLocalBytesRead != stats->totalBytesRead) {
num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
}
hdfsFileFreeReadStatistics(stats);
}
hdfsFileClearReadStatistics(hdfs_file);
}
}
void HdfsFileReader::ResetState() {
FileReader::ResetState();
num_remote_bytes_ = 0;
}
string HdfsFileReader::DebugString() const {
return FileReader::DebugString() + Substitute(
" exclusive_hdfs_fh=$0 num_remote_bytes=$1",
exclusive_hdfs_fh_, num_remote_bytes_);
}
}
}