blob: fc4f5fe0ab6b739b04a69945fa991eb3f0cbed5b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/io/disk-file.h"
#include <iostream>
#include <mutex>
#include <thread>
#include "common/names.h"
#include "runtime/io/local-file-writer.h"
#include "util/filesystem-util.h"
#include "util/spinlock.h"
using namespace impala;
using namespace impala::io;
static const Status& DISK_FILE_DELETE_FAILED_INCORRECT_STATUS = Status(ErrorMsg::Init(
TErrorCode::GENERAL, "DiskFile::Delete() failed with incorrect status"));
Status DiskFile::Delete(const unique_lock<shared_mutex>& lock) {
DCHECK(lock.mutex() == &physical_file_lock_ && lock.owns_lock());
Status status = Status::OK();
unique_lock<SpinLock> status_l(status_lock_);
// No support for remote file deletion yet.
if (disk_type_ == DiskFileType::LOCAL_BUFFER || disk_type_ == DiskFileType::LOCAL) {
if (is_deleted(status_l)) return DISK_FILE_DELETE_FAILED_INCORRECT_STATUS;
RETURN_IF_ERROR(FileSystemUtil::RemovePaths({path_}));
SetStatusLocked(io::DiskFileStatus::DELETED, status_l);
}
return status;
}
DiskFile::DiskFile(const string& path, DiskIoMgr* io_mgr)
: path_(path),
disk_type_(DiskFileType::LOCAL),
file_status_(DiskFileStatus::INWRITING),
file_writer_(new LocalFileWriter(io_mgr, path.c_str())),
space_reserved_(true) {}
DiskFile::DiskFile(const string& path, DiskIoMgr* io_mgr, int64_t file_size,
DiskFileType disk_type, const hdfsFS* hdfs_conn)
: path_(path),
file_size_(file_size),
disk_type_(disk_type),
file_status_(DiskFileStatus::INWRITING) {
DCHECK(disk_type != DiskFileType::LOCAL);
if (disk_type == DiskFileType::LOCAL_BUFFER) {
file_writer_.reset(new LocalFileWriter(io_mgr, path.c_str(), file_size));
hdfs_conn_ = nullptr;
space_reserved_.Store(false);
} else {
DCHECK(hdfs_conn != nullptr);
hdfs_conn_ = *hdfs_conn;
space_reserved_.Store(true);
}
}