blob: 103ed4e626e5c8c110d587f2fc701c8455f5dc30 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_users.h"
#include "hdfs/hdfs.h"
#include "util/tree.h"
#include <inttypes.h>
#include <limits.h>
#include <poll.h>
#include <search.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <utime.h>
#define HADOOP_FUSE_CONNECTION_TIMEOUT "hadoop.fuse.connection.timeout"
#define HADOOP_FUSE_TIMER_PERIOD "hadoop.fuse.timer.period"
/** Length of the buffer needed by asctime_r */
#define TIME_STR_LEN 26
struct hdfsConn;
static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
static void hdfsConnExpiry(void);
static void* hdfsConnExpiryThread(void *v);
RB_HEAD(hdfsConnTree, hdfsConn);
enum authConf {
struct hdfsConn {
RB_ENTRY(hdfsConn) entry;
/** How many threads are currently using this hdfsConnection object */
int64_t refcnt;
/** The username used to make this connection. Dynamically allocated. */
char *usrname;
/** Kerberos ticket cache path, or NULL if this is not a kerberized
* connection. Dynamically allocated. */
char *kpath;
/** mtime of the kpath, if the kpath is non-NULL */
time_t kPathMtime;
/** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
long kPathMtimeNs;
/** The cached libhdfs fs instance */
hdfsFS fs;
/** Nonzero if this hdfs connection needs to be closed as soon as possible.
* If this is true, the connection has been removed from the tree. */
int condemned;
/** Number of times we should run the expiration timer on this connection
* before removing it. */
int expirationCount;
RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);
/** Current cached libhdfs connections */
static struct hdfsConnTree gConnTree;
/** The URI used to make our connections. Dynamically allocated. */
static char *gUri;
/** The port used to make our connections, or 0. */
static int gPort;
/** Lock which protects gConnTree and gConnectTimer->active */
static pthread_mutex_t gConnMutex;
/** Type of authentication configured */
static enum authConf gHdfsAuthConf;
/** FUSE connection timer expiration period */
static int32_t gTimerPeriod;
/** FUSE connection expiry period */
static int32_t gExpiryPeriod;
/** FUSE timer expiration thread */
static pthread_t gTimerThread;
* Find out what type of authentication the system administrator
* has configured.
* @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
static enum authConf discoverAuthConf(void)
int ret;
char *val = NULL;
enum authConf authConf;
if (ret)
else if (!val)
else if (!strcmp(val, "kerberos"))
return authConf;
int fuseConnectInit(const char *nnUri, int port)
int ret;
ret = hdfsConfGetInt(HADOOP_FUSE_TIMER_PERIOD, &gTimerPeriod);
if (ret) {
fprintf(stderr, "Unable to determine the configured value for %s.",
return -EINVAL;
if (gTimerPeriod < 1) {
fprintf(stderr, "Invalid value %d given for %s.\n",
return -EINVAL;
ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
if (ret) {
fprintf(stderr, "Unable to determine the configured value for %s.",
return -EINVAL;
if (gExpiryPeriod < 1) {
fprintf(stderr, "Invalid value %d given for %s.\n",
return -EINVAL;
gHdfsAuthConf = discoverAuthConf();
if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
fprintf(stderr, "Unable to determine the configured value for %s.",
return -EINVAL;
gPort = port;
gUri = strdup(nnUri);
if (!gUri) {
fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
return -ENOMEM;
ret = pthread_mutex_init(&gConnMutex, NULL);
if (ret) {
fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
return -ret;
ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
if (ret) {
fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
return -ret;
fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
"expiry period %d\n", gTimerPeriod, gExpiryPeriod);
return 0;
* Compare two libhdfs connections by username and Kerberos ticket cache path
* @param a The first libhdfs connection
* @param b The second libhdfs connection
* @return -1, 0, or 1 depending on a < b, a ==b, a > b
static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
int rc = strcmp(a->usrname, b->usrname);
if (rc) return rc;
return gHdfsAuthConf == AUTH_CONF_KERBEROS && strcmp(a->kpath, b->kpath);
* Find a libhdfs connection by username
* @param usrname The username to look up
* @param kpath The Kerberos ticket cache file path
* @return The connection, or NULL if none could be found
static struct hdfsConn* hdfsConnFind(const char *usrname, const char *kpath)
struct hdfsConn exemplar;
memset(&exemplar, 0, sizeof(exemplar));
exemplar.usrname = (char*)usrname;
exemplar.kpath = (char*)kpath;
return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
* Free the resource associated with a libhdfs connection.
* You must remove the connection from the tree before calling this function.
* @param conn The libhdfs connection
static void hdfsConnFree(struct hdfsConn *conn)
int ret;
ret = hdfsDisconnect(conn->fs);
if (ret) {
fprintf(stderr, "hdfsConnFree(username=%s): "
"hdfsDisconnect failed with error %d\n",
(conn->usrname ? conn->usrname : "(null)"), ret);
* Convert a time_t to a string.
* @param sec time in seconds since the epoch
* @param buf (out param) output buffer
* @param bufLen length of output buffer
* @return 0 on success; ENAMETOOLONG if the provided buffer was
* too short
static int timeToStr(time_t sec, char *buf, size_t bufLen)
struct tm tm, *out;
size_t l;
if (bufLen < TIME_STR_LEN) {
out = localtime_r(&sec, &tm);
asctime_r(out, buf);
// strip trailing newline
l = strlen(buf);
if (l != 0)
buf[l - 1] = '\0';
return 0;
* Check an HDFS connection's Kerberos path.
* If the mtime of the Kerberos ticket cache file has changed since we first
* opened the connection, mark the connection as condemned and remove it from
* the hdfs connection tree.
* @param conn The HDFS connection
static int hdfsConnCheckKpath(const struct hdfsConn *conn)
int ret;
struct stat st;
char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];
if (stat(conn->kpath, &st) < 0) {
ret = errno;
if (ret == ENOENT) {
fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
"ticket cache file '%s' has disappeared. Condemning the "
"connection.\n", conn->usrname, conn->kpath);
} else {
fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
"failed with error code %d. Pessimistically condemning the "
"connection.\n", conn->usrname, conn->kpath, ret);
return -ret;
if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
(st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
"has changed from '%s' to '%s'. Condemning the connection "
"because our cached Kerberos credentials have probably "
"changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
return -EINTERNAL;
return 0;
* Cache expiration logic.
* This function is called periodically by the cache expiration thread. For
* each FUSE connection not currently in use (refcnt == 0) it will decrement the
* expirationCount for that connection. Once the expirationCount reaches 0 for
* a connection, it can be garbage collected.
* We also check to see if the Kerberos credentials have changed. If so, the
* connecton is immediately condemned, even if it is currently in use.
static void hdfsConnExpiry(void)
struct hdfsConn *conn, *tmpConn;
RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
if (conn->kpath) {
if (hdfsConnCheckKpath(conn)) {
conn->condemned = 1;
RB_REMOVE(hdfsConnTree, &gConnTree, conn);
if (conn->refcnt == 0) {
/* If the connection is not in use by any threads, delete it
* immediately. If it is still in use by some threads, the last
* thread using it will clean it up later inside hdfsConnRelease. */
if (conn->refcnt == 0) {
/* If the connection is not currently in use by a thread, check to see if
* it ought to be removed because it's too old. */
if (conn->expirationCount <= 0) {
if (conn->condemned) {
fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
"as %s is still in the tree!\n", conn->usrname);
fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
"%s because it's now too old.\n", conn->usrname);
RB_REMOVE(hdfsConnTree, &gConnTree, conn);
// The Kerberos FILE: prefix. This indicates that the kerberos ticket cache
// specifier is a file. (Note that we also assume that the specifier is a file
// if no prefix is present.)
// Length of the Kerberos file prefix, which is equal to the string size in
// bytes minus 1 (because we don't count the null terminator in the length.)
* Find the Kerberos ticket cache path.
* This function finds the Kerberos ticket cache path from the thread ID and
* user ID of the process making the request.
* Normally, the ticket cache path is in a well-known location in /tmp.
* However, it's possible that the calling process could set the KRB5CCNAME
* environment variable, indicating that its Kerberos ticket cache is at a
* non-default location. We try to handle this possibility by reading the
* process' environment here. This will be allowed if we have root
* capabilities, or if our UID is the same as the remote process' UID.
* Note that we don't check to see if the cache file actually exists or not.
* We're just trying to find out where it would be if it did exist.
* @param path (out param) the path to the ticket cache file
* @param pathLen length of the path buffer
static void findKerbTicketCachePath(struct fuse_context *ctx,
char *path, size_t pathLen)
FILE *fp = NULL;
static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
int c = '\0', pathIdx = 0, keyIdx = 0;
size_t KRB5CCNAME_LEN = strlen(KRB5CCNAME + 1) + 1;
// /proc/<tid>/environ contains the remote process' environment. It is
// exposed to us as a series of KEY=VALUE pairs, separated by NULL bytes.
snprintf(path, pathLen, "/proc/%d/environ", ctx->pid);
fp = fopen(path, "r");
if (!fp)
goto done;
while (1) {
if (c == EOF)
goto done;
if (keyIdx == KRB5CCNAME_LEN) {
if (pathIdx >= pathLen - 1)
goto done;
if (c == '\0')
goto done;
path[pathIdx++] = c;
} else if (KRB5CCNAME[keyIdx++] != c) {
keyIdx = 0;
c = fgetc(fp);
if (fp)
if (pathIdx == 0) {
snprintf(path, pathLen, "/tmp/krb5cc_%d", ctx->uid);
} else {
path[pathIdx] = '\0';
if (strncmp(path, KRB_FILE_PREFIX, KRB_FILE_PREFIX_LEN) == 0) {
fprintf(stderr, "stripping " KRB_FILE_PREFIX " from the front of "
memmove(path, path + KRB_FILE_PREFIX_LEN,
strlen(path + KRB_FILE_PREFIX_LEN) + 1);
* Create a new libhdfs connection.
* @param usrname Username to use for the new connection
* @param ctx FUSE context to use for the new connection
* @param out (out param) the new libhdfs connection
* @return 0 on success; error code otherwise
static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
struct hdfsConn **out)
struct hdfsBuilder *bld = NULL;
char kpath[PATH_MAX] = { 0 };
struct hdfsConn *conn = NULL;
int ret;
struct stat st;
conn = calloc(1, sizeof(struct hdfsConn));
if (!conn) {
fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
ret = -ENOMEM;
goto error;
bld = hdfsNewBuilder();
if (!bld) {
fprintf(stderr, "Unable to create hdfs builder\n");
ret = -ENOMEM;
goto error;
/* We always want to get a new FileSystem instance here-- that's why we call
* hdfsBuilderSetForceNewInstance. Otherwise the 'cache condemnation' logic
* in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
* existing cached connection which we wanted to get rid of.
hdfsBuilderSetNameNode(bld, gUri);
if (gPort) {
hdfsBuilderSetNameNodePort(bld, gPort);
if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
if (stat(kpath, &st) < 0) {
fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
"file '%s'. Did you remember to kinit for UID %d?\n",
kpath, ctx->uid);
ret = -EACCES;
goto error;
conn->kPathMtime = st.st_mtim.tv_sec;
conn->kPathMtimeNs = st.st_mtim.tv_nsec;
hdfsBuilderSetKerbTicketCachePath(bld, kpath);
conn->kpath = strdup(kpath);
if (!conn->kpath) {
fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
ret = -ENOMEM;
goto error;
} else {
// earlier the username was set to the builder always, but due to
// HADOOP-9747 if we specify the username in case of kerberos authentication
// the username will be used as the principal name, and that will conflict
// with ticket cache based authentication as we have the OS user name here
// not the real kerberos principal name. So with SIMPLE auth we pass on the
// OS username still, and the UGI will use that as the username, but with
// kerberos authentication we do not pass in the OS username and let the
// authentication happen with the principal who's ticket is in the ticket
// cache. (HDFS-15034 is still a possible improvement for SIMPLE AUTH.)
hdfsBuilderSetUserName(bld, usrname);
conn->usrname = strdup(usrname);
if (!conn->usrname) {
fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
ret = -ENOMEM;
goto error;
conn->fs = hdfsBuilderConnect(bld);
bld = NULL;
if (!conn->fs) {
ret = errno;
fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
"error code %d\n", usrname, ret);
goto error;
RB_INSERT(hdfsConnTree, &gConnTree, conn);
*out = conn;
return 0;
if (bld) {
if (conn) {
return ret;
* Get a libhdfs connection.
* If there is an existing connection, it will be reused. If not, a new one
* will be created.
* You must call hdfsConnRelease on the connection you get back!
* @param usrname The username to use
* @param ctx The FUSE context to use (contains UID, PID of requestor)
* @param conn (out param) The HDFS connection
* @return 0 on success; error code otherwise
static int fuseConnect(const char *usrname, struct fuse_context *ctx,
struct hdfsConn **out)
int ret;
struct hdfsConn* conn;
char kpath[PATH_MAX] = { 0 };
if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
conn = hdfsConnFind(usrname, kpath);
if (!conn) {
ret = fuseNewConnect(usrname, ctx, &conn);
if (ret) {
fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
"error code %d\n", usrname, ret);
return ret;
conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
if (conn->expirationCount < 2)
conn->expirationCount = 2;
*out = conn;
return 0;
int fuseConnectAsThreadUid(struct hdfsConn **conn)
struct fuse_context *ctx;
char *usrname;
int ret;
ctx = fuse_get_context();
usrname = getUsername(ctx->uid);
if (!usrname) {
ERROR("fuseConnectAsThreadUid(): failed to get username for uid %"PRId64
"\n", (uint64_t)ctx->uid);
return EIO;
ret = fuseConnect(usrname, ctx, conn);
return ret;
int fuseConnectTest(void)
int ret;
struct hdfsConn *conn;
if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
// TODO: call some method which can tell us whether the FS exists. In order
// to implement this, we have to add a method to FileSystem in order to do
// this without valid Kerberos authentication. See HDFS-3674 for details.
return 0;
ret = fuseNewConnect("root", NULL, &conn);
if (ret) {
fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
return ret;
return 0;
struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
return conn->fs;
void hdfsConnRelease(struct hdfsConn *conn)
if ((conn->refcnt == 0) && (conn->condemned)) {
fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
/* Notice that we're not removing the connection from gConnTree here.
* If the connection is condemned, it must have already been removed from
* the tree, so that no other threads start using it.
* Get the monotonic time.
* Unlike the wall-clock time, monotonic time only ever goes forward. If the
* user adjusts the time, the monotonic time will not be affected.
* @return The monotonic time
static time_t getMonotonicTime(void)
int res;
struct timespec ts;
res = clock_gettime(CLOCK_MONOTONIC, &ts);
if (res)
return ts.tv_sec;
* FUSE connection expiration thread
static void* hdfsConnExpiryThread(void *v)
time_t nextTime, curTime;
int waitTime;
nextTime = getMonotonicTime() + gTimerPeriod;
while (1) {
curTime = getMonotonicTime();
if (curTime >= nextTime) {
nextTime = curTime + gTimerPeriod;
waitTime = (nextTime - curTime) * 1000;
poll(NULL, 0, waitTime);
return NULL;