blob: 741d6c783b62ed687aaecb55239d9bd2aa2fb274 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "filesystem.h"
#include "filehandle.h"
#include "common/namenode_info.h"
#include <functional>
#include <limits>
#include <future>
#include <tuple>
#include <pwd.h>
#include <boost/asio/ip/tcp.hpp>
#include "x-platform/syscall.h"
#define FMT_THIS_ADDR "this=" << (void*)this
namespace hdfs {
static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
static const int kNamenodeProtocolVersion = 1;
using boost::asio::ip::tcp;
static constexpr uint16_t kDefaultPort = 8020;
// forward declarations
const std::string get_effective_user_name(const std::string &);
uint32_t FileSystem::GetDefaultFindMaxDepth() {
return std::numeric_limits<uint32_t>::max();
}
uint16_t FileSystem::GetDefaultPermissionMask() {
return 0755;
}
Status FileSystem::CheckValidPermissionMask(uint16_t permissions) {
if (permissions > 01777) {
std::stringstream errormsg;
errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct
<< std::showbase << permissions << " (should be between 0 and 01777)";
return Status::InvalidArgument(errormsg.str().c_str());
}
return Status::OK();
}
Status FileSystem::CheckValidReplication(uint16_t replication) {
if (replication < 1 || replication > 512) {
std::stringstream errormsg;
errormsg << "CheckValidReplication: argument 'replication' is "
<< replication << " (should be between 1 and 512)";
return Status::InvalidArgument(errormsg.str().c_str());
}
return Status::OK();
}
FileSystem::~FileSystem() {}
/*****************************************************************************
* FILESYSTEM BASE CLASS
****************************************************************************/
FileSystem *FileSystem::New(
IoService *&io_service, const std::string &user_name, const Options &options) {
return new FileSystemImpl(io_service, user_name, options);
}
FileSystem *FileSystem::New(
std::shared_ptr<IoService> io_service, const std::string &user_name, const Options &options) {
return new FileSystemImpl(io_service, user_name, options);
}
FileSystem *FileSystem::New() {
// No, this pointer won't be leaked. The FileSystem takes ownership.
std::shared_ptr<IoService> io_service = IoService::MakeShared();
if(!io_service)
return nullptr;
int thread_count = io_service->InitDefaultWorkers();
if(thread_count < 1)
return nullptr;
std::string user_name = get_effective_user_name("");
Options options;
return new FileSystemImpl(io_service, user_name, options);
}
/*****************************************************************************
* FILESYSTEM IMPLEMENTATION
****************************************************************************/
struct FileSystemImpl::FindSharedState {
//Name pattern (can have wild-cards) to find
const std::string name;
//Maximum depth to recurse after the end of path is reached.
//Can be set to 0 for pure path globbing and ignoring name pattern entirely.
const uint32_t maxdepth;
//Vector of all sub-directories from the path argument (each can have wild-cards)
std::vector<std::string> dirs;
//Callback from Find
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler;
//outstanding_requests is incremented once for every GetListing call.
std::atomic<uint64_t> outstanding_requests;
//Boolean needed to abort all recursion on error or on user command
std::atomic<bool> aborted;
//Shared variables will need protection with a lock
std::mutex lock;
FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_,
uint64_t outstanding_recuests_, bool aborted_)
: name(name_),
maxdepth(maxdepth_),
handler(handler_),
outstanding_requests(outstanding_recuests_),
aborted(aborted_),
lock() {
//Constructing the list of sub-directories
std::stringstream ss(path_);
if(path_.back() != '/'){
ss << "/";
}
for (std::string token; std::getline(ss, token, '/'); ) {
dirs.push_back(token);
}
}
};
struct FileSystemImpl::FindOperationalState {
const std::string path;
const uint32_t depth;
const bool search_path;
FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_)
: path(path_),
depth(depth_),
search_path(search_path_) {
}
};
const std::string get_effective_user_name(const std::string &user_name) {
if (!user_name.empty())
return user_name;
// If no user name was provided, try the HADOOP_USER_NAME and USER environment
// variables
const char * env = getenv("HADOOP_USER_NAME");
if (env) {
return env;
}
env = getenv("USER");
if (env) {
return env;
}
// If running on POSIX, use the currently logged in user
#if defined(_POSIX_VERSION)
uid_t uid = geteuid();
struct passwd *pw = getpwuid(uid);
if (pw && pw->pw_name)
{
return pw->pw_name;
}
#endif
return "unknown_user";
}
FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) :
io_service_(io_service), options_(options),
client_name_(GetRandomClientName()),
nn_(
io_service_, options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion
),
bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>())
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ") called");
// Poor man's move
io_service = nullptr;
unsigned int running_workers = 0;
if(options.io_threads_ < 1) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing default number of worker threads");
running_workers = io_service_->InitDefaultWorkers();
} else {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " << options_.io_threads_ << " worker threads.");
running_workers = io_service->InitWorkers(options_.io_threads_);
}
if(running_workers < 1) {
LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to start worker threads");
}
}
FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) :
io_service_(io_service), options_(options),
client_name_(GetRandomClientName()),
nn_(
io_service_, options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion
),
bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>())
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called");
int worker_thread_count = io_service_->GetWorkerThreadCount();
if(worker_thread_count < 1) {
LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. "
<< "It needs at least 1 worker to connect to an HDFS cluster.")
} else {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << worker_thread_count << " worker threads.");
}
}
FileSystemImpl::~FileSystemImpl() {
LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl("
<< FMT_THIS_ADDR << ") called");
/**
* Note: IoService must be stopped before getting rid of worker threads.
* Once worker threads are joined and deleted the service can be deleted.
**/
io_service_->Stop();
}
void FileSystemImpl::Connect(const std::string &server,
const std::string &service,
const std::function<void(const Status &, FileSystem * fs)> &handler) {
LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR
<< ", server=" << server << ", service="
<< service << ") called");
connect_callback_.SetCallback(handler);
/* IoService::New can return nullptr */
if (!io_service_) {
handler (Status::Error("Null IoService"), this);
}
// DNS lookup here for namenode(s)
std::vector<ResolvedNamenodeInfo> resolved_namenodes;
auto name_service = options_.services.find(server);
if(name_service != options_.services.end()) {
cluster_name_ = name_service->first;
resolved_namenodes = BulkResolve(io_service_, name_service->second);
} else {
cluster_name_ = server + ":" + service;
// tmp namenode info just to get this in the right format for BulkResolve
NamenodeInfo tmp_info;
try {
tmp_info.uri = URI::parse_from_string("hdfs://" + cluster_name_);
} catch (const uri_parse_error& e) {
LOG_ERROR(kFileSystem, << "Unable to use URI for cluster " << cluster_name_);
handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this);
}
resolved_namenodes = BulkResolve(io_service_, {tmp_info});
}
for(unsigned int i=0;i<resolved_namenodes.size();i++) {
LOG_DEBUG(kFileSystem, << "Resolved Namenode");
LOG_DEBUG(kFileSystem, << resolved_namenodes[i].str());
}
nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this](const Status & s) {
connect_callback_.GetCallback()(s, this);
});
}
void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) {
std::string scheme = options_.defaultFS.get_scheme();
if (strcasecmp(scheme.c_str(), "hdfs") != 0) {
std::string error_message;
error_message += "defaultFS of [" + options_.defaultFS.str() + "] is not supported";
handler(Status::InvalidArgument(error_message.c_str()), nullptr);
return;
}
std::string host = options_.defaultFS.get_host();
if (host.empty()) {
handler(Status::InvalidArgument("defaultFS must specify a hostname"), nullptr);
return;
}
int16_t port = options_.defaultFS.get_port_or_default(kDefaultPort);
std::string port_as_string = std::to_string(port);
Connect(host, port_as_string, handler);
}
int FileSystemImpl::AddWorkerThread() {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
<< FMT_THIS_ADDR << ") called."
<< " Existing thread count = " << WorkerThreadCount());
if(!io_service_)
return -1;
io_service_->AddWorkerThread();
return 1;
}
int FileSystemImpl::WorkerThreadCount() {
if(!io_service_) {
return -1;
} else {
return io_service_->GetWorkerThreadCount();
}
}
bool FileSystemImpl::CancelPendingConnect() {
if(connect_callback_.IsCallbackAccessed()) {
// Temp fix for failover hangs, allow CancelPendingConnect to be called so it can push a flag through the RPC engine
LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called after Connect completed");
return nn_.CancelPendingConnect();
}
if(!connect_callback_.IsCallbackSet()) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called before Connect started");
return false;
}
// First invoke callback, then do proper teardown in RpcEngine and RpcConnection
ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) {
LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled FileSystem@" << fs << "::Connect with status: " << stat.ToString());
};
bool callback_swapped = false;
ConnectCallback original_callback = connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped);
if(callback_swapped) {
// Take original callback and invoke it as if it was canceled.
LOG_DEBUG(kFileSystem, << "Swapped in dummy callback. Invoking connect callback with canceled status.");
std::function<void(void)> wrapped_callback = [original_callback, this](){
// handling code expected to check status before dereferenceing 'this'
original_callback(Status::Canceled(), this);
};
io_service_->PostTask(wrapped_callback);
} else {
LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect. It hasn't been invoked yet or may have already completed.")
return false;
}
// Now push cancel down to clean up where possible and make sure the RpcEngine
// won't try to do retries in the background. The rest of the memory cleanup
// happens when this FileSystem is deleted by the user.
return nn_.CancelPendingConnect();
}
void FileSystemImpl::Open(
const std::string &path,
const std::function<void(const Status &, FileHandle *)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetBlockLocations(path, 0, std::numeric_limits<int64_t>::max(), [this, path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
if(!stat.ok()) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString());
if(stat.get_server_exception_type() == Status::kStandbyException) {
LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode");
}
}
handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, io_service_, client_name_, file_info, bad_node_tracker_, event_handlers_)
: nullptr);
});
}
BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
{
BlockLocation result;
result.setCorrupt(locatedBlock.corrupt());
result.setOffset(locatedBlock.offset());
std::vector<DNInfo> dn_info;
dn_info.reserve(locatedBlock.locs_size());
for (const hadoop::hdfs::DatanodeInfoProto & datanode_info: locatedBlock.locs()) {
const hadoop::hdfs::DatanodeIDProto &id = datanode_info.id();
DNInfo newInfo;
if (id.has_ipaddr())
newInfo.setIPAddr(id.ipaddr());
if (id.has_hostname())
newInfo.setHostname(id.hostname());
if (id.has_xferport())
newInfo.setXferPort(id.xferport());
if (id.has_infoport())
newInfo.setInfoPort(id.infoport());
if (id.has_ipcport())
newInfo.setIPCPort(id.ipcport());
if (id.has_infosecureport())
newInfo.setInfoSecurePort(id.infosecureport());
if (datanode_info.has_location())
newInfo.setNetworkLocation(datanode_info.location());
dn_info.push_back(newInfo);
}
result.setDataNodes(dn_info);
if (locatedBlock.has_b()) {
const hadoop::hdfs::ExtendedBlockProto & b=locatedBlock.b();
result.setLength(b.numbytes());
}
return result;
}
void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler)
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
//Protobuf gives an error 'Negative value is not supported'
//if the high bit is set in uint64 in GetBlockLocations
if (IsHighBitSet(offset)) {
handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr);
return;
}
if (IsHighBitSet(length)) {
handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr);
return;
}
auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) {
if (status.ok()) {
auto result = std::make_shared<FileBlockLocation>();
result->setFileLength(fileInfo->file_length_);
result->setLastBlockComplete(fileInfo->last_block_complete_);
result->setUnderConstruction(fileInfo->under_construction_);
std::vector<BlockLocation> blocks;
for (const hadoop::hdfs::LocatedBlockProto & locatedBlock: fileInfo->blocks_) {
auto newLocation = LocatedBlockToBlockLocation(locatedBlock);
blocks.push_back(newLocation);
}
result->setBlockLocations(blocks);
handler(status, result);
} else {
handler(status, std::shared_ptr<FileBlockLocation>());
}
};
nn_.GetBlockLocations(path, offset, length, conversion);
}
void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
const std::function<void(const Status &, const uint64_t &)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetPreferredBlockSize(path, handler);
}
void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
", replication=" << replication << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty"));
return;
}
Status replStatus = FileSystem::CheckValidReplication(replication);
if (!replStatus.ok()) {
handler(replStatus);
return;
}
nn_.SetReplication(path, replication, handler);
}
void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
std::function<void(const Status &)> handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
", mtime=" << mtime << ", atime=" << atime << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty"));
return;
}
nn_.SetTimes(path, mtime, atime, handler);
}
void FileSystemImpl::GetFileInfo(
const std::string &path,
const std::function<void(const Status &, const StatInfo &)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetFileInfo(path, handler);
}
void FileSystemImpl::GetContentSummary(
const std::string &path,
const std::function<void(const Status &, const ContentSummary &)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetContentSummary("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetContentSummary(path, handler);
}
void FileSystemImpl::GetFsStats(
const std::function<void(const Status &, const FsInfo &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::GetFsStats(" << FMT_THIS_ADDR << ") called");
nn_.GetFsStats(handler);
}
/**
* Helper function for recursive GetListing calls.
*
* Some compilers don't like recursive lambdas, so we make the lambda call a
* method, which in turn creates a lambda calling itself.
*/
void FileSystemImpl::GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more,
std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
bool has_next = !stat_infos.empty();
bool get_more = handler(stat, stat_infos, has_more && has_next);
if (get_more && has_more && has_next ) {
auto callback = [this, path, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
GetListingShim(stat, stat_infos, has_more, path, handler);
};
std::string last = stat_infos.back().path;
nn_.GetListing(path, callback, last);
}
}
void FileSystemImpl::GetListing(
const std::string &path,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetListing("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
std::string path_fixed = path;
if(path.back() != '/'){
path_fixed += "/";
}
// Caputure the state and push it into the shim
auto callback = [this, path_fixed, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
GetListingShim(stat, stat_infos, has_more, path_fixed, handler);
};
nn_.GetListing(path_fixed, callback);
}
void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
std::function<void(const Status &)> handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
", permissions=" << permissions << ", createparent=" << createparent << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty"));
return;
}
Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
}
nn_.Mkdirs(path, permissions, createparent, handler);
}
void FileSystemImpl::Delete(const std::string &path, bool recursive,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty"));
return;
}
nn_.Delete(path, recursive, handler);
}
void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
if (oldPath.empty()) {
handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty"));
return;
}
if (newPath.empty()) {
handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty"));
return;
}
nn_.Rename(oldPath, newPath, handler);
}
void FileSystemImpl::SetPermission(const std::string & path,
uint16_t permissions, const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
return;
}
Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
}
nn_.SetPermission(path, permissions, handler);
}
void FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty"));
return;
}
nn_.SetOwner(path, username, groupname, handler);
}
/**
* Helper function for recursive Find calls.
*
* Some compilers don't like recursive lambdas, so we make the lambda call a
* method, which in turn creates a lambda calling itself.
*
* ***High-level explanation***
*
* Since we are allowing to use wild cards in both path and name, we start by expanding the path first.
* Boolean search_path is set to true when we search for the path and false when we search for the name.
* When we search for the path we break the given path pattern into sub-directories. Starting from the
* first sub-directory we list them one-by-one and recursively continue into directories that matched the
* path pattern at the current depth. Directories that are large will be requested to continue sending
* the results. We keep track of the current depth within the path pattern in the 'depth' variable.
* This continues recursively until the depth reaches the end of the path. Next that we start matching
* the name pattern. All directories that we find we recurse now, and all names that match the given name
* pattern are being stored in outputs and later sent back to the user.
*/
void FileSystemImpl::FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more,
std::shared_ptr<FindOperationalState> operational_state, std::shared_ptr<FindSharedState> shared_state) {
//We buffer the outputs then send them back at the end
std::vector<StatInfo> outputs;
//Return on error
if(!stat.ok()){
std::lock_guard<std::mutex> find_lock(shared_state->lock);
//We send true becuase we do not want the user code to exit before all our requests finished
shared_state->handler(stat, outputs, true);
shared_state->aborted = true;
}
if(!shared_state->aborted){
//User did not abort the operation
if (directory_has_more) {
//Directory is large and has more results
//We launch another async call to get more results
shared_state->outstanding_requests++;
auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
FindShim(stat, stat_infos, has_more, operational_state, shared_state);
};
std::string last = stat_infos.back().path;
nn_.GetListing(operational_state->path, callback, last);
}
if(operational_state->search_path && operational_state->depth < shared_state->dirs.size() - 1){
//We are searching for the path and did not reach the end of the path yet
for (StatInfo const& si : stat_infos) {
//If we are at the last depth and it matches both path and name, we need to output it.
if (operational_state->depth == shared_state->dirs.size() - 2
&& XPlatform::Syscall::FnMatch(shared_state->dirs[operational_state->depth + 1], si.path)
&& XPlatform::Syscall::FnMatch(shared_state->name, si.path)) {
outputs.push_back(si);
}
//Skip if not directory
if(si.file_type != StatInfo::IS_DIR) {
continue;
}
//Checking for a match with the path at the current depth
if(XPlatform::Syscall::FnMatch(shared_state->dirs[operational_state->depth + 1], si.path)) {
//Launch a new requests for every matched directory
shared_state->outstanding_requests++;
auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, true); //true because searching for the path
FindShim(stat, stat_infos, has_more, new_current_state, shared_state);
};
nn_.GetListing(si.full_path, callback);
}
}
}
else if(shared_state->maxdepth > operational_state->depth - shared_state->dirs.size() + 1){
//We are searching for the name now and maxdepth has not been reached
for (StatInfo const& si : stat_infos) {
//Launch a new request for every directory
if(si.file_type == StatInfo::IS_DIR) {
shared_state->outstanding_requests++;
auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, false); //false because searching for the name
FindShim(stat, stat_infos, has_more, new_current_state, shared_state);
};
nn_.GetListing(si.full_path, callback);
}
//All names that match the specified name are saved to outputs
if(XPlatform::Syscall::FnMatch(shared_state->name, si.path)) {
outputs.push_back(si);
}
}
}
}
//This section needs a lock to make sure we return the final chunk only once
//and no results are sent after aborted is set
std::lock_guard<std::mutex> find_lock(shared_state->lock);
//Decrement the counter once since we are done with this chunk
shared_state->outstanding_requests--;
if(shared_state->outstanding_requests == 0){
//Send the outputs back to the user and notify that this is the final chunk
shared_state->handler(stat, outputs, false);
} else {
//There will be more results and we are not aborting
if (outputs.size() > 0 && !shared_state->aborted){
//Send the outputs back to the user and notify that there is more
bool user_wants_more = shared_state->handler(stat, outputs, true);
if(!user_wants_more) {
//Abort if user doesn't want more
shared_state->aborted = true;
}
}
}
}
void FileSystemImpl::Find(
const std::string &path, const std::string &name, const uint32_t maxdepth,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::Find("
<< FMT_THIS_ADDR << ", path="
<< path << ", name="
<< name << ") called");
//Populating the operational state, which includes:
//current search path, depth within the path, and the indication that we are currently searching for a path (not name yet).
std::shared_ptr<FindOperationalState> operational_state = std::make_shared<FindOperationalState>(path, 0, true);
//Populating the shared state, which includes:
//vector of sub-directories constructed from path, name to search, handler to use for result returning, outstanding_requests counter, and aborted flag.
std::shared_ptr<FindSharedState> shared_state = std::make_shared<FindSharedState>(path, name, maxdepth, handler, 1, false);
auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more) {
FindShim(stat, stat_infos, directory_has_more, operational_state, shared_state);
};
nn_.GetListing("/", callback);
}
void FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty"));
return;
}
nn_.CreateSnapshot(path, name, handler);
}
void FileSystemImpl::DeleteSnapshot(const std::string &path,
const std::string &name,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty"));
return;
}
if (name.empty()) {
handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty"));
return;
}
nn_.DeleteSnapshot(path, name, handler);
}
void FileSystemImpl::RenameSnapshot(const std::string &path,
const std::string &old_name, const std::string &new_name,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path <<
", old_name=" << old_name << ", new_name=" << new_name << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be empty"));
return;
}
if (old_name.empty()) {
handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' cannot be empty"));
return;
}
if (new_name.empty()) {
handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' cannot be empty"));
return;
}
nn_.RenameSnapshot(path, old_name, new_name, handler);
}
void FileSystemImpl::AllowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty"));
return;
}
nn_.AllowSnapshot(path, handler);
}
void FileSystemImpl::DisallowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty"));
return;
}
nn_.DisallowSnapshot(path, handler);
}
void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
if (event_handlers_) {
event_handlers_->set_fs_callback(callback);
nn_.SetFsEventCallback(callback);
}
}
std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() {
return event_handlers_;
}
Options FileSystemImpl::get_options() {
return options_;
}
std::string FileSystemImpl::get_cluster_name() {
return cluster_name_;
}
}