/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_users.h" 
#include "hdfs/hdfs.h"
#include "util/tree.h"

#include <inttypes.h>
#include <limits.h>
#include <poll.h>
#include <search.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <utime.h>

#define FUSE_CONN_DEFAULT_TIMER_PERIOD      5
#define FUSE_CONN_DEFAULT_EXPIRY_PERIOD     (5 * 60)
#define HADOOP_SECURITY_AUTHENTICATION      "hadoop.security.authentication"
#define HADOOP_FUSE_CONNECTION_TIMEOUT      "hadoop.fuse.connection.timeout"
#define HADOOP_FUSE_TIMER_PERIOD            "hadoop.fuse.timer.period"

/** Length of the buffer needed by asctime_r */
#define TIME_STR_LEN 26

struct hdfsConn;

static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
static void hdfsConnExpiry(void);
static void* hdfsConnExpiryThread(void *v);

RB_HEAD(hdfsConnTree, hdfsConn);

enum authConf {
    AUTH_CONF_UNKNOWN,
    AUTH_CONF_KERBEROS,
    AUTH_CONF_OTHER,
};

struct hdfsConn {
  RB_ENTRY(hdfsConn) entry;
  /** How many threads are currently using this hdfsConnection object */
  int64_t refcnt;
  /** The username used to make this connection.  Dynamically allocated. */
  char *usrname;
  /** Kerberos ticket cache path, or NULL if this is not a kerberized
   * connection.  Dynamically allocated. */
  char *kpath;
  /** mtime of the kpath, if the kpath is non-NULL */
  time_t kPathMtime;
  /** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
  long kPathMtimeNs;
  /** The cached libhdfs fs instance */
  hdfsFS fs;
  /** Nonzero if this hdfs connection needs to be closed as soon as possible.
   * If this is true, the connection has been removed from the tree. */
  int condemned;
  /** Number of times we should run the expiration timer on this connection
   * before removing it. */
  int expirationCount;
};

RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);

/** Current cached libhdfs connections */
static struct hdfsConnTree gConnTree;

/** The URI used to make our connections.  Dynamically allocated. */
static char *gUri;

/** The port used to make our connections, or 0. */
static int gPort;

/** Lock which protects gConnTree and gConnectTimer->active */
static pthread_mutex_t gConnMutex;

/** Type of authentication configured */
static enum authConf gHdfsAuthConf;

/** FUSE connection timer expiration period */
static int32_t gTimerPeriod;

/** FUSE connection expiry period */
static int32_t gExpiryPeriod;

/** FUSE timer expiration thread */
static pthread_t gTimerThread;

/** 
 * Find out what type of authentication the system administrator
 * has configured.
 *
 * @return     the type of authentication, or AUTH_CONF_UNKNOWN on error.
 */
static enum authConf discoverAuthConf(void)
{
  int ret;
  char *val = NULL;
  enum authConf authConf;

  ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val);
  if (ret)
    authConf = AUTH_CONF_UNKNOWN;
  else if (!val)
    authConf = AUTH_CONF_OTHER;
  else if (!strcmp(val, "kerberos"))
    authConf = AUTH_CONF_KERBEROS;
  else
    authConf = AUTH_CONF_OTHER;
  free(val);
  return authConf;
}

int fuseConnectInit(const char *nnUri, int port)
{
  int ret;

  gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;
  ret = hdfsConfGetInt(HADOOP_FUSE_TIMER_PERIOD, &gTimerPeriod);
  if (ret) {
    fprintf(stderr, "Unable to determine the configured value for %s.",
          HADOOP_FUSE_TIMER_PERIOD);
    return -EINVAL;
  }
  if (gTimerPeriod < 1) {
    fprintf(stderr, "Invalid value %d given for %s.\n",
          gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD);
    return -EINVAL;
  }
  gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD;
  ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
  if (ret) {
    fprintf(stderr, "Unable to determine the configured value for %s.",
          HADOOP_FUSE_CONNECTION_TIMEOUT);
    return -EINVAL;
  }
  if (gExpiryPeriod < 1) {
    fprintf(stderr, "Invalid value %d given for %s.\n",
          gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT);
    return -EINVAL;
  }
  gHdfsAuthConf = discoverAuthConf();
  if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
    fprintf(stderr, "Unable to determine the configured value for %s.",
          HADOOP_SECURITY_AUTHENTICATION);
    return -EINVAL;
  }
  gPort = port;
  gUri = strdup(nnUri);
  if (!gUri) {
    fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
    return -ENOMEM;
  }
  ret = pthread_mutex_init(&gConnMutex, NULL);
  if (ret) {
    free(gUri);
    fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
            ret);
    return -ret;
  }
  RB_INIT(&gConnTree);
  ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
  if (ret) {
    free(gUri);
    pthread_mutex_destroy(&gConnMutex);
    fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
            ret);
    return -ret;
  }
  fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
          "expiry period %d\n", gTimerPeriod, gExpiryPeriod);
  return 0;
}

/**
 * Compare two libhdfs connections by username and Kerberos ticket cache path
 *
 * @param a                The first libhdfs connection
 * @param b                The second libhdfs connection
 *
 * @return                 -1, 0, or 1 depending on a < b, a ==b, a > b
 */
static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
{
  int rc = strcmp(a->usrname, b->usrname);
  if (rc) return rc;
  return gHdfsAuthConf == AUTH_CONF_KERBEROS && strcmp(a->kpath, b->kpath);
}

/**
 * Find a libhdfs connection by username
 *
 * @param usrname         The username to look up
 * @param kpath           The Kerberos ticket cache file path
 *
 * @return                The connection, or NULL if none could be found
 */
static struct hdfsConn* hdfsConnFind(const char *usrname, const char *kpath)
{
  struct hdfsConn exemplar;

  memset(&exemplar, 0, sizeof(exemplar));
  exemplar.usrname = (char*)usrname;
  exemplar.kpath = (char*)kpath;
  return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
}

/**
 * Free the resource associated with a libhdfs connection.
 *
 * You must remove the connection from the tree before calling this function.
 *
 * @param conn            The libhdfs connection
 */
static void hdfsConnFree(struct hdfsConn *conn)
{
  int ret;

  ret = hdfsDisconnect(conn->fs);
  if (ret) {
    fprintf(stderr, "hdfsConnFree(username=%s): "
      "hdfsDisconnect failed with error %d\n",
      (conn->usrname ? conn->usrname : "(null)"), ret);
  }
  free(conn->usrname);
  free(conn->kpath);
  free(conn);
}

/**
 * Convert a time_t to a string.
 *
 * @param sec           time in seconds since the epoch
 * @param buf           (out param) output buffer
 * @param bufLen        length of output buffer
 *
 * @return              0 on success; ENAMETOOLONG if the provided buffer was
 *                      too short
 */
static int timeToStr(time_t sec, char *buf, size_t bufLen)
{
  struct tm tm, *out;
  size_t l;

  if (bufLen < TIME_STR_LEN) {
    return -ENAMETOOLONG;
  }
  out = localtime_r(&sec, &tm);
  asctime_r(out, buf);
  // strip trailing newline
  l = strlen(buf);
  if (l != 0)
    buf[l - 1] = '\0';
  return 0;
}

/** 
 * Check an HDFS connection's Kerberos path.
 *
 * If the mtime of the Kerberos ticket cache file has changed since we first
 * opened the connection, mark the connection as condemned and remove it from
 * the hdfs connection tree.
 *
 * @param conn      The HDFS connection
 */
static int hdfsConnCheckKpath(const struct hdfsConn *conn)
{
  int ret;
  struct stat st;
  char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];

  if (stat(conn->kpath, &st) < 0) {
    ret = errno;
    if (ret == ENOENT) {
      fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
              "ticket cache file '%s' has disappeared.  Condemning the "
              "connection.\n", conn->usrname, conn->kpath);
    } else {
      fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
              "failed with error code %d.  Pessimistically condemning the "
              "connection.\n", conn->usrname, conn->kpath, ret);
    }
    return -ret;
  }
  if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
      (st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
    timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
    timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
    fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
            "has changed from '%s' to '%s'.  Condemning the connection "
            "because our cached Kerberos credentials have probably "
            "changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
    return -EINTERNAL;
  }
  return 0;
}

/**
 * Cache expiration logic.
 *
 * This function is called periodically by the cache expiration thread.  For
 * each FUSE connection not currently in use (refcnt == 0) it will decrement the
 * expirationCount for that connection.  Once the expirationCount reaches 0 for
 * a connection, it can be garbage collected.
 *
 * We also check to see if the Kerberos credentials have changed.  If so, the
 * connecton is immediately condemned, even if it is currently in use.
 */
static void hdfsConnExpiry(void)
{
  struct hdfsConn *conn, *tmpConn;

  pthread_mutex_lock(&gConnMutex);
  RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
    if (conn->kpath) {
      if (hdfsConnCheckKpath(conn)) {
        conn->condemned = 1;
        RB_REMOVE(hdfsConnTree, &gConnTree, conn);
        if (conn->refcnt == 0) {
          /* If the connection is not in use by any threads, delete it
           * immediately.  If it is still in use by some threads, the last
           * thread using it will clean it up later inside hdfsConnRelease. */
          hdfsConnFree(conn);
          continue;
        }
      }
    }
    if (conn->refcnt == 0) {
      /* If the connection is not currently in use by a thread, check to see if
       * it ought to be removed because it's too old. */
      conn->expirationCount--;
      if (conn->expirationCount <= 0) {
        if (conn->condemned) {
          fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
                  "as %s is still in the tree!\n", conn->usrname);
        }
        fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
                "%s because it's now too old.\n", conn->usrname);
        RB_REMOVE(hdfsConnTree, &gConnTree, conn);
        hdfsConnFree(conn);
      }
    }
  }
  pthread_mutex_unlock(&gConnMutex);
}

// The Kerberos FILE: prefix.  This indicates that the kerberos ticket cache
// specifier is a file.  (Note that we also assume that the specifier is a file
// if no prefix is present.)
#define KRB_FILE_PREFIX "FILE:"

// Length of the Kerberos file prefix, which is equal to the string size in
// bytes minus 1 (because we don't count the null terminator in the length.)
#define KRB_FILE_PREFIX_LEN (sizeof(KRB_FILE_PREFIX) - 1)

/**
 * Find the Kerberos ticket cache path.
 *
 * This function finds the Kerberos ticket cache path from the thread ID and
 * user ID of the process making the request.
 *
 * Normally, the ticket cache path is in a well-known location in /tmp.
 * However, it's possible that the calling process could set the KRB5CCNAME
 * environment variable, indicating that its Kerberos ticket cache is at a
 * non-default location.  We try to handle this possibility by reading the
 * process' environment here.  This will be allowed if we have root
 * capabilities, or if our UID is the same as the remote process' UID.
 *
 * Note that we don't check to see if the cache file actually exists or not.
 * We're just trying to find out where it would be if it did exist. 
 *
 * @param path          (out param) the path to the ticket cache file
 * @param pathLen       length of the path buffer
 */
static void findKerbTicketCachePath(struct fuse_context *ctx,
                                    char *path, size_t pathLen)
{
  FILE *fp = NULL;
  static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
  int c = '\0', pathIdx = 0, keyIdx = 0;
  size_t KRB5CCNAME_LEN = strlen(KRB5CCNAME + 1) + 1;

  // /proc/<tid>/environ contains the remote process' environment.  It is
  // exposed to us as a series of KEY=VALUE pairs, separated by NULL bytes.
  snprintf(path, pathLen, "/proc/%d/environ", ctx->pid);
  fp = fopen(path, "r");
  if (!fp)
    goto done;
  while (1) {
    if (c == EOF)
      goto done;
    if (keyIdx == KRB5CCNAME_LEN) {
      if (pathIdx >= pathLen - 1)
        goto done;
      if (c == '\0')
        goto done;
      path[pathIdx++] = c;
    } else if (KRB5CCNAME[keyIdx++] != c) {
      keyIdx = 0;
    }
    c = fgetc(fp);
  }

done:
  if (fp)
    fclose(fp);
  if (pathIdx == 0) {
    snprintf(path, pathLen, "/tmp/krb5cc_%d", ctx->uid);
  } else {
    path[pathIdx] = '\0';
  }
  if (strncmp(path, KRB_FILE_PREFIX, KRB_FILE_PREFIX_LEN) == 0) {
    fprintf(stderr, "stripping " KRB_FILE_PREFIX " from the front of "
            "KRB5CCNAME.\n");
    memmove(path, path + KRB_FILE_PREFIX_LEN,
            strlen(path + KRB_FILE_PREFIX_LEN) + 1);
  }
}

/**
 * Create a new libhdfs connection.
 *
 * @param usrname       Username to use for the new connection
 * @param ctx           FUSE context to use for the new connection
 * @param out           (out param) the new libhdfs connection
 *
 * @return              0 on success; error code otherwise
 */
static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
        struct hdfsConn **out)
{
  struct hdfsBuilder *bld = NULL;
  char kpath[PATH_MAX] = { 0 };
  struct hdfsConn *conn = NULL;
  int ret;
  struct stat st;

  conn = calloc(1, sizeof(struct hdfsConn));
  if (!conn) {
    fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
    ret = -ENOMEM;
    goto error;
  }
  bld = hdfsNewBuilder();
  if (!bld) {
    fprintf(stderr, "Unable to create hdfs builder\n");
    ret = -ENOMEM;
    goto error;
  }
  /* We always want to get a new FileSystem instance here-- that's why we call
   * hdfsBuilderSetForceNewInstance.  Otherwise the 'cache condemnation' logic
   * in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
   * existing cached connection which we wanted to get rid of.
   */
  hdfsBuilderSetForceNewInstance(bld);
  hdfsBuilderSetNameNode(bld, gUri);
  if (gPort) {
    hdfsBuilderSetNameNodePort(bld, gPort);
  }
  hdfsBuilderSetUserName(bld, usrname);
  if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
    findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
    if (stat(kpath, &st) < 0) {
      fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
        "file '%s'.  Did you remember to kinit for UID %d?\n",
        kpath, ctx->uid);
      ret = -EACCES;
      goto error;
    }
    conn->kPathMtime = st.st_mtim.tv_sec;
    conn->kPathMtimeNs = st.st_mtim.tv_nsec;
    hdfsBuilderSetKerbTicketCachePath(bld, kpath);
    conn->kpath = strdup(kpath);
    if (!conn->kpath) {
      fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
      ret = -ENOMEM;
      goto error;
    }
  }
  conn->usrname = strdup(usrname);
  if (!conn->usrname) {
    fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
    ret = -ENOMEM;
    goto error;
  }
  conn->fs = hdfsBuilderConnect(bld);
  bld = NULL;
  if (!conn->fs) {
    ret = errno;
    fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
            "error code %d\n", usrname, ret);
    goto error;
  }
  RB_INSERT(hdfsConnTree, &gConnTree, conn);
  *out = conn;
  return 0;

error:
  if (bld) {
    hdfsFreeBuilder(bld);
  }
  if (conn) {
    free(conn->kpath);
    free(conn->usrname);
    free(conn);
  }
  return ret;
}

/**
 * Get a libhdfs connection.
 *
 * If there is an existing connection, it will be reused.  If not, a new one
 * will be created.
 *
 * You must call hdfsConnRelease on the connection you get back!
 *
 * @param usrname    The username to use
 * @param ctx        The FUSE context to use (contains UID, PID of requestor)
 * @param conn       (out param) The HDFS connection
 *
 * @return           0 on success; error code otherwise
 */
static int fuseConnect(const char *usrname, struct fuse_context *ctx,
                struct hdfsConn **out)
{
  int ret;
  struct hdfsConn* conn;

  char kpath[PATH_MAX] = { 0 };
  if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
    findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
  }

  pthread_mutex_lock(&gConnMutex);
  conn = hdfsConnFind(usrname, kpath);
  if (!conn) {
    ret = fuseNewConnect(usrname, ctx, &conn);
    if (ret) {
      pthread_mutex_unlock(&gConnMutex);
      fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
              "error code %d\n", usrname, ret);
      return ret;
    }
  }
  conn->refcnt++;
  conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
  if (conn->expirationCount < 2)
    conn->expirationCount = 2;
  pthread_mutex_unlock(&gConnMutex);
  *out = conn;
  return 0;
}

int fuseConnectAsThreadUid(struct hdfsConn **conn)
{
  struct fuse_context *ctx;
  char *usrname;
  int ret;
  
  ctx = fuse_get_context();
  usrname = getUsername(ctx->uid);
  if (!usrname) {
    ERROR("fuseConnectAsThreadUid(): failed to get username for uid %"PRId64
          "\n", (uint64_t)ctx->uid);
    return EIO;
  }
  ret = fuseConnect(usrname, ctx, conn);
  free(usrname);
  return ret;
}

int fuseConnectTest(void)
{
  int ret;
  struct hdfsConn *conn;

  if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
    // TODO: call some method which can tell us whether the FS exists.  In order
    // to implement this, we have to add a method to FileSystem in order to do
    // this without valid Kerberos authentication.  See HDFS-3674 for details.
    return 0;
  }
  ret = fuseNewConnect("root", NULL, &conn);
  if (ret) {
    fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
    return ret;
  }
  hdfsConnRelease(conn);
  return 0;
}

struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
{
  return conn->fs;
}

void hdfsConnRelease(struct hdfsConn *conn)
{
  pthread_mutex_lock(&gConnMutex);
  conn->refcnt--;
  if ((conn->refcnt == 0) && (conn->condemned)) {
    fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
      conn->usrname);
    /* Notice that we're not removing the connection from gConnTree here.
     * If the connection is condemned, it must have already been removed from
     * the tree, so that no other threads start using it.
     */
    hdfsConnFree(conn);
  }
  pthread_mutex_unlock(&gConnMutex);
}

/**
 * Get the monotonic time.
 *
 * Unlike the wall-clock time, monotonic time only ever goes forward.  If the
 * user adjusts the time, the monotonic time will not be affected.
 *
 * @return        The monotonic time
 */
static time_t getMonotonicTime(void)
{
  int res;
  struct timespec ts;
       
  res = clock_gettime(CLOCK_MONOTONIC, &ts);
  if (res)
    abort();
  return ts.tv_sec;
}

/**
 * FUSE connection expiration thread
 *
 */
static void* hdfsConnExpiryThread(void *v)
{
  time_t nextTime, curTime;
  int waitTime;

  nextTime = getMonotonicTime() + gTimerPeriod;
  while (1) {
    curTime = getMonotonicTime();
    if (curTime >= nextTime) {
      hdfsConnExpiry();
      nextTime = curTime + gTimerPeriod;
    }
    waitTime = (nextTime - curTime) * 1000;
    poll(NULL, 0, waitTime);
  }
  return NULL;
}
