blob: 0395fd34df5bd8ca7e2752db76f9069b32c3c558 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <stdio.h>
#include <algorithm>
#include "runtime/io/disk-io-mgr-internal.h"
#include "runtime/io/local-file-writer.h"
#include "runtime/io/request-ranges.h"
#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
#include "common/names.h"
namespace impala {
namespace io {
Status LocalFileWriter::Open() {
lock_guard<mutex> lock(lock_);
if (file_ != nullptr) return Status::OK();
return io_mgr_->local_file_system_->OpenForWrite(
file_path_, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR, &file_);
}
Status LocalFileWriter::Write(WriteRange* range, int64_t* written_bytes) {
lock_guard<mutex> lock(lock_);
if (file_ == nullptr) {
return Status(Substitute("File handle of $0 has been closed.", file_path_));
}
// Use write() instead of fwrite(), because the file handle is shared by multiple write
// ranges before the file is full and closed, during the period, if using fwrite(), it
// is not safe to read the file because the buffer fwrite() is using may not flush to
// the disk. But write() is safe to read immediately after the write because it writes
// without a buffer.
RETURN_IF_ERROR(io_mgr_->local_file_system_->Write(fileno(file_), range));
ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(range->len());
range->SetOffset(written_bytes_);
written_bytes_ += range->len();
*written_bytes = written_bytes_;
return Status::OK();
}
Status LocalFileWriter::Close() {
lock_guard<mutex> lock(lock_);
if (file_ == nullptr) return Status::OK();
RETURN_IF_ERROR(io_mgr_->local_file_system_->Fclose(file_, file_path_));
file_ = nullptr;
return Status::OK();
}
Status LocalFileWriter::WriteOne(WriteRange* write_range) {
DCHECK(write_range != nullptr);
Status ret_status = Status::OK();
// Do not need to acquire the lock_ because we open a new file handle in WriteOne
// for writing, instead of sharing the same file handle.
FILE* file_handle = nullptr;
Status close_status = Status::OK();
DiskQueue* queue = io_mgr_->disk_queues_[write_range->disk_id()];
{
ScopedHistogramTimer write_timer(queue->write_latency());
ret_status = io_mgr_->local_file_system_->OpenForWrite(
write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR, &file_handle);
if (!ret_status.ok()) goto end;
ret_status = io_mgr_->WriteRangeHelper(file_handle, write_range);
close_status = io_mgr_->local_file_system_->Fclose(file_handle, write_range->file());
if (ret_status.ok() && !close_status.ok()) ret_status = close_status;
}
end:
if (ret_status.ok()) {
queue->write_size()->Update(write_range->len());
} else {
queue->write_io_err()->Increment(1);
}
return ret_status;
}
} // namespace io
} // namespace impala