blob: ff7ef71d8bd5781cd15e7cf1d454dda6863dacf3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/hdfs-fs-cache.h"
#include <boost/thread/locks.hpp>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "util/debug-util.h"
#include "util/error-util.h"
#include "util/hdfs-util.h"
#include "util/test-info.h"
#include "util/os-util.h"
#include "common/names.h"
using namespace strings;
namespace impala {
DEFINE_string(s3a_access_key_cmd, "", "A Unix command whose output returns the "
"access key to S3, i.e. \"fs.s3a.access.key\".");
DEFINE_string(s3a_secret_key_cmd, "", "A Unix command whose output returns the "
"secret key to S3, i.e. \"fs.s3a.secret.key\".");
scoped_ptr<HdfsFsCache> HdfsFsCache::instance_;
string HdfsFsCache::s3a_access_key_;
string HdfsFsCache::s3a_secret_key_;
Status HdfsFsCache::Init() {
DCHECK(HdfsFsCache::instance_.get() == NULL);
HdfsFsCache::instance_.reset(new HdfsFsCache());
if (!FLAGS_s3a_access_key_cmd.empty() && !FLAGS_s3a_secret_key_cmd.empty()) {
if (!RunShellProcess(FLAGS_s3a_access_key_cmd, &s3a_access_key_, true)) {
return Status(Substitute("Could not run command '$0' to retrieve S3 Access Key. "
"Impala will not be able to access S3.", FLAGS_s3a_access_key_cmd));
LOG(INFO) << "S3 Access Key retrieval command '" << FLAGS_s3a_access_key_cmd
<< "' executed successfully.";
if (!RunShellProcess(FLAGS_s3a_secret_key_cmd, &s3a_secret_key_, true)) {
return Status(Substitute("Could not run command '$0' to retrieve S3 Access Key. "
"Impala will not be able to access S3.", FLAGS_s3a_secret_key_cmd));
LOG(INFO) << "S3 Secret Key retrieval command '" << FLAGS_s3a_secret_key_cmd
<< "' executed successfully.";
return Status::OK();
Status HdfsFsCache::GetConnection(const string& path, hdfsFS* fs,
HdfsFsMap* local_cache) {
string err;
const string& namenode = GetNameNodeFromPath(path, &err);
if (!err.empty()) return Status(err);
// First, check the local cache to avoid taking the global lock.
if (local_cache != NULL) {
HdfsFsMap::iterator local_iter = local_cache->find(namenode);
if (local_iter != local_cache->end()) {
*fs = local_iter->second;
return Status::OK();
// Otherwise, check the global cache.
lock_guard<mutex> l(lock_);
HdfsFsMap::iterator i = fs_map_.find(namenode);
if (i == fs_map_.end()) {
hdfsBuilder* hdfs_builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(hdfs_builder, namenode.c_str());
if (!s3a_access_key_.empty()) {
// Use a new instance of the filesystem object to be sure that it picks up the
// configuration changes we're going to make. Without this call, a cached
// filesystem object is used which is unaffected by calls to
// hdfsBuilderConfSetStr(). This is unexpected behavior in the HDFS API, but is
// unlikely to change.
hdfsBuilderConfSetStr(hdfs_builder, "fs.s3a.access.key", s3a_access_key_.c_str());
hdfsBuilderConfSetStr(hdfs_builder, "fs.s3a.secret.key", s3a_secret_key_.c_str());
*fs = hdfsBuilderConnect(hdfs_builder);
if (*fs == NULL) {
return Status(GetHdfsErrorMsg("Failed to connect to FS: ", namenode));
fs_map_.insert(make_pair(namenode, *fs));
} else {
*fs = i->second;
DCHECK(*fs != NULL);
// Populate the local cache for the next lookup.
if (local_cache != NULL) {
local_cache->insert(make_pair(namenode, *fs));
return Status::OK();
Status HdfsFsCache::GetLocalConnection(hdfsFS* fs) {
return GetConnection("file:///", fs);
string HdfsFsCache::GetNameNodeFromPath(const string& path, string* err) {
string namenode;
const string local_fs("file:/");
size_t n = path.find("://");
if (n == string::npos) {
if (, local_fs.length(), local_fs) == 0) {
// Hadoop Path routines strip out consecutive /'s, so recognize 'file:/blah'.
namenode = "file:///";
} else {
// Path is not qualified, so use the default FS.
namenode = "default";
} else if (n == 0) {
*err = Substitute("Path missing scheme: $0", path);
} else {
// Path is qualified, i.e. "scheme://authority/path/to/file". Extract
// "scheme://authority/".
n = path.find('/', n + 3);
if (n == string::npos) {
*err = Substitute("Path missing '/' after authority: $0", path);
} else {
// Include the trailing '/' for local filesystem case, i.e. "file:///".
namenode = path.substr(0, n + 1);
return namenode;