blob: 4f3ba3d7c0206a0a3ae5efdb42edf2545e743b24 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <stdio.h>
#include "runtime/io/disk-io-mgr-internal.h"
#include "runtime/io/local-file-reader.h"
#include "runtime/io/request-ranges.h"
#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
#include "common/names.h"
#ifndef NDEBUG
DECLARE_int32(stress_disk_read_delay_ms);
#endif
namespace impala {
namespace io {
Status LocalFileReader::Open(bool use_file_handle_cache) {
unique_lock<SpinLock> fs_lock(lock_);
RETURN_IF_ERROR(scan_range_->cancel_status_);
if (file_ != nullptr)
return Status::OK();
file_ = fopen(scan_range_->file(), "r");
if (file_ == nullptr) {
return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
Substitute("Could not open file: $0: $1", *scan_range_->file_string(),
GetStrErrMsg()));
}
ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
return Status::OK();
}
Status LocalFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset,
uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
DCHECK(scan_range_->read_in_flight());
DCHECK_GE(bytes_to_read, 0);
// Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
#ifndef NDEBUG
if (FLAGS_stress_disk_read_delay_ms > 0) {
SleepForMs(FLAGS_stress_disk_read_delay_ms);
}
#endif
unique_lock<SpinLock> fs_lock(lock_);
RETURN_IF_ERROR(scan_range_->cancel_status_);
*eof = false;
*bytes_read = 0;
DCHECK(file_ != nullptr);
if (fseek(file_, file_offset, SEEK_SET) == -1) {
fclose(file_);
file_ = nullptr;
return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
Substitute("Could not seek to $0 for file: $1: $2",
scan_range_->offset(), *scan_range_->file_string(), GetStrErrMsg()));
}
{
ScopedHistogramTimer read_timer(queue->read_latency());
*bytes_read = fread(buffer, 1, bytes_to_read, file_);
}
DCHECK_GE(*bytes_read, 0);
DCHECK_LE(*bytes_read, bytes_to_read);
queue->read_size()->Update(*bytes_read);
if (*bytes_read < bytes_to_read) {
if (ferror(file_) != 0) {
return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
Substitute("Error reading from $0"
"at byte offset: $1: $2", file_,
file_offset, GetStrErrMsg()));
}
// On Linux, we should only get partial reads from block devices on error or eof.
DCHECK(feof(file_) != 0);
*eof = true;
}
return Status::OK();
}
void LocalFileReader::CachedFile(uint8_t** data, int64_t* length) {
*data = nullptr;
*length = 0;
}
void LocalFileReader::Close() {
unique_lock<SpinLock> fs_lock(lock_);
if (file_ == nullptr) return;
fclose(file_);
file_ = nullptr;
ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
return;
}
}
}