blob: aa3db3d4af44cbb14f0e0c17dc61319e4556c077 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "exception.h"
#include "hdfs.h"
#include "hdfs_http_client.h"
#include "hdfs_http_query.h"
#include "hdfs_json_parser.h"
#include "jni_helper.h"
#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration"
#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
struct hdfsBuilder {
int forceNewInstance;
const char *nn;
tPort port;
const char *kerbTicketCachePath;
const char *userName;
};
/**
* The information required for accessing webhdfs,
* including the network address of the namenode and the user name
*
* Unlike the string in hdfsBuilder, the strings in this structure are
* dynamically allocated. This structure will not be freed until we disconnect
* from HDFS.
*/
struct hdfs_internal {
char *nn;
tPort port;
char *userName;
/**
* Working directory -- stored with a trailing slash.
*/
char *workingDir;
};
/**
* The 'file-handle' to a file in hdfs.
*/
struct hdfsFile_internal {
struct webhdfsFileHandle* file;
enum hdfsStreamType type; /* INPUT or OUTPUT */
int flags; /* Flag indicate read/create/append etc. */
tOffset offset; /* Current offset position in the file */
};
/**
* Create, initialize and return a webhdfsBuffer
*/
static int initWebHdfsBuffer(struct webhdfsBuffer **webhdfsBuffer)
{
int ret = 0;
struct webhdfsBuffer *buffer = calloc(1, sizeof(struct webhdfsBuffer));
if (!buffer) {
fprintf(stderr,
"ERROR: fail to allocate memory for webhdfsBuffer.\n");
return ENOMEM;
}
ret = pthread_mutex_init(&buffer->writeMutex, NULL);
if (ret) {
fprintf(stderr, "ERROR: fail in pthread_mutex_init for writeMutex "
"in initWebHdfsBuffer, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
ret = pthread_cond_init(&buffer->newwrite_or_close, NULL);
if (ret) {
fprintf(stderr,
"ERROR: fail in pthread_cond_init for newwrite_or_close "
"in initWebHdfsBuffer, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
ret = pthread_cond_init(&buffer->transfer_finish, NULL);
if (ret) {
fprintf(stderr,
"ERROR: fail in pthread_cond_init for transfer_finish "
"in initWebHdfsBuffer, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
done:
if (ret) {
free(buffer);
return ret;
}
*webhdfsBuffer = buffer;
return 0;
}
/**
* Reset the webhdfsBuffer. This is used in a block way
* when hdfsWrite is called with a new buffer to write.
* The writing thread in libcurl will be waken up to continue writing,
* and the caller of this function is blocked waiting for writing to finish.
*
* @param wb The handle of the webhdfsBuffer
* @param buffer The buffer provided by user to write
* @param length The length of bytes to write
* @return Updated webhdfsBuffer.
*/
static struct webhdfsBuffer *resetWebhdfsBuffer(struct webhdfsBuffer *wb,
const char *buffer, size_t length)
{
if (buffer && length > 0) {
pthread_mutex_lock(&wb->writeMutex);
wb->wbuffer = buffer;
wb->offset = 0;
wb->remaining = length;
pthread_cond_signal(&wb->newwrite_or_close);
while (wb->remaining != 0) {
pthread_cond_wait(&wb->transfer_finish, &wb->writeMutex);
}
pthread_mutex_unlock(&wb->writeMutex);
}
return wb;
}
/**
* Free the webhdfsBuffer and destroy its pthread conditions/mutex
* @param buffer The webhdfsBuffer to free
*/
static void freeWebhdfsBuffer(struct webhdfsBuffer *buffer)
{
int ret = 0;
if (buffer) {
ret = pthread_cond_destroy(&buffer->newwrite_or_close);
if (ret) {
fprintf(stderr,
"WARN: fail in pthread_cond_destroy for newwrite_or_close "
"in freeWebhdfsBuffer, <%d>: %s.\n",
ret, hdfs_strerror(ret));
errno = ret;
}
ret = pthread_cond_destroy(&buffer->transfer_finish);
if (ret) {
fprintf(stderr,
"WARN: fail in pthread_cond_destroy for transfer_finish "
"in freeWebhdfsBuffer, <%d>: %s.\n",
ret, hdfs_strerror(ret));
errno = ret;
}
ret = pthread_mutex_destroy(&buffer->writeMutex);
if (ret) {
fprintf(stderr,
"WARN: fail in pthread_mutex_destroy for writeMutex "
"in freeWebhdfsBuffer, <%d>: %s.\n",
ret, hdfs_strerror(ret));
errno = ret;
}
free(buffer);
buffer = NULL;
}
}
/**
* To free the webhdfsFileHandle, which includes a webhdfsBuffer and strings
* @param handle The webhdfsFileHandle to free
*/
static void freeWebFileHandle(struct webhdfsFileHandle * handle)
{
if (!handle)
return;
freeWebhdfsBuffer(handle->uploadBuffer);
free(handle->datanode);
free(handle->absPath);
free(handle);
}
static const char *maybeNull(const char *str)
{
return str ? str : "(NULL)";
}
/** To print a hdfsBuilder as string */
static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
char *buf, size_t bufLen)
{
int strlength = snprintf(buf, bufLen, "nn=%s, port=%d, "
"kerbTicketCachePath=%s, userName=%s",
maybeNull(bld->nn), bld->port,
maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
if (strlength < 0 || strlength >= bufLen) {
fprintf(stderr, "failed to print a hdfsBuilder as string.\n");
return NULL;
}
return buf;
}
/**
* Free a hdfs_internal handle
* @param fs The hdfs_internal handle to free
*/
static void freeWebHdfsInternal(struct hdfs_internal *fs)
{
if (fs) {
free(fs->nn);
free(fs->userName);
free(fs->workingDir);
}
}
struct hdfsBuilder *hdfsNewBuilder(void)
{
struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
if (!bld) {
errno = ENOMEM;
return NULL;
}
return bld;
}
void hdfsFreeBuilder(struct hdfsBuilder *bld)
{
free(bld);
}
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
{
// We don't cache instances in libwebhdfs, so this is not applicable.
}
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
{
if (bld) {
bld->nn = nn;
}
}
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
{
if (bld) {
bld->port = port;
}
}
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
{
if (bld) {
bld->userName = userName;
}
}
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
const char *kerbTicketCachePath)
{
if (bld) {
bld->kerbTicketCachePath = kerbTicketCachePath;
}
}
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user)
{
struct hdfsBuilder* bld = hdfsNewBuilder();
if (!bld) {
return NULL;
}
hdfsBuilderSetNameNode(bld, nn);
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderSetUserName(bld, user);
return hdfsBuilderConnect(bld);
}
hdfsFS hdfsConnect(const char* nn, tPort port)
{
return hdfsConnectAsUser(nn, port, NULL);
}
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port)
{
return hdfsConnect(nn, port);
}
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
const char *user)
{
struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld)
return NULL;
hdfsBuilderSetNameNode(bld, host);
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderSetUserName(bld, user);
hdfsBuilderSetForceNewInstance(bld);
return hdfsBuilderConnect(bld);
}
/**
* To retrieve the default configuration value for NameNode's hostName and port
* TODO: This function currently is using JNI,
* we need to do this without using JNI (HDFS-3917)
*
* @param bld The hdfsBuilder handle
* @param port Used to get the default value for NameNode's port
* @param nn Used to get the default value for NameNode's hostName
* @return 0 for success and non-zero value for failure
*/
static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port,
char **nn)
{
JNIEnv *env = 0;
jobject jHDFSConf = NULL, jAddress = NULL;
jstring jHostName = NULL;
jvalue jVal;
jthrowable jthr = NULL;
int ret = 0;
char buf[512];
env = getJNIEnv();
if (!env) {
return EINTERNAL;
}
jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL,
HADOOP_NAMENODE, "getHttpAddress",
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
jHDFSConf);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jAddress = jVal.l;
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
JAVA_INETSOCKETADDRESS, "getPort", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
*port = jVal.i;
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
JAVA_INETSOCKETADDRESS,
"getHostName", "()Ljava/lang/String;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jHostName = jVal.l;
jthr = newCStr(env, jHostName, nn);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
done:
destroyLocalReference(env, jHDFSConf);
destroyLocalReference(env, jAddress);
destroyLocalReference(env, jHostName);
return ret;
}
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
{
struct hdfs_internal *fs = NULL;
int ret = 0;
if (!bld) {
ret = EINVAL;
goto done;
}
if (bld->nn == NULL) {
// In the JNI version of libhdfs this returns a LocalFileSystem.
ret = ENOTSUP;
goto done;
}
fs = calloc(1, sizeof(*fs));
if (!fs) {
ret = ENOMEM;
goto done;
}
// If the namenode is "default" and/or the port of namenode is 0,
// get the default namenode/port
if (bld->port == 0 || !strcasecmp("default", bld->nn)) {
ret = retrieveDefaults(bld, &fs->port, &fs->nn);
if (ret)
goto done;
} else {
fs->port = bld->port;
fs->nn = strdup(bld->nn);
if (!fs->nn) {
ret = ENOMEM;
goto done;
}
}
if (bld->userName) {
// userName may be NULL
fs->userName = strdup(bld->userName);
if (!fs->userName) {
ret = ENOMEM;
goto done;
}
}
// The working directory starts out as root.
fs->workingDir = strdup("/");
if (!fs->workingDir) {
ret = ENOMEM;
goto done;
}
// For debug
fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port);
done:
free(bld);
if (ret) {
freeWebHdfsInternal(fs);
errno = ret;
return NULL;
}
return fs;
}
int hdfsDisconnect(hdfsFS fs)
{
if (fs == NULL) {
errno = EINVAL;
return -1;
}
freeWebHdfsInternal(fs);
return 0;
}
/**
* Based on the working directory stored in hdfsFS,
* generate the absolute path for the given path
*
* @param fs The hdfsFS handle which stores the current working directory
* @param path The given path which may not be an absolute path
* @param absPath To hold generated absolute path for the given path
* @return 0 on success, non-zero value indicating error
*/
static int getAbsolutePath(hdfsFS fs, const char *path, char **absPath)
{
char *tempPath = NULL;
size_t absPathLen;
int strlength;
if (path[0] == '/') {
// Path is already absolute.
tempPath = strdup(path);
if (!tempPath) {
return ENOMEM;
}
*absPath = tempPath;
return 0;
}
// Prepend the workingDir to the path.
absPathLen = strlen(fs->workingDir) + strlen(path) + 1;
tempPath = malloc(absPathLen);
if (!tempPath) {
return ENOMEM;
}
strlength = snprintf(tempPath, absPathLen, "%s%s", fs->workingDir, path);
if (strlength < 0 || strlength >= absPathLen) {
free(tempPath);
return EIO;
}
*absPath = tempPath;
return 0;
}
int hdfsCreateDirectory(hdfsFS fs, const char* path)
{
char *url = NULL, *absPath = NULL;
struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
ret = createUrlForMKDIR(fs->nn, fs->port, absPath, fs->userName, &url);
if (ret) {
goto done;
}
ret = launchMKDIR(url, &resp);
if (ret) {
goto done;
}
ret = parseMKDIR(resp->body->content);
done:
freeResponse(resp);
free(url);
free(absPath);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsChmod(hdfsFS fs, const char* path, short mode)
{
char *absPath = NULL, *url = NULL;
struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
ret = createUrlForCHMOD(fs->nn, fs->port, absPath, (int) mode,
fs->userName, &url);
if (ret) {
goto done;
}
ret = launchCHMOD(url, &resp);
if (ret) {
goto done;
}
ret = parseCHMOD(resp->header->content, resp->body->content);
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
{
int ret = 0;
char *absPath = NULL, *url = NULL;
struct Response *resp = NULL;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
ret = createUrlForCHOWN(fs->nn, fs->port, absPath,
owner, group, fs->userName, &url);
if (ret) {
goto done;
}
ret = launchCHOWN(url, &resp);
if (ret) {
goto done;
}
ret = parseCHOWN(resp->header->content, resp->body->content);
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
{
char *oldAbsPath = NULL, *newAbsPath = NULL, *url = NULL;
int ret = 0;
struct Response *resp = NULL;
if (fs == NULL || oldPath == NULL || newPath == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, oldPath, &oldAbsPath);
if (ret) {
goto done;
}
ret = getAbsolutePath(fs, newPath, &newAbsPath);
if (ret) {
goto done;
}
ret = createUrlForRENAME(fs->nn, fs->port, oldAbsPath,
newAbsPath, fs->userName, &url);
if (ret) {
goto done;
}
ret = launchRENAME(url, &resp);
if (ret) {
goto done;
}
ret = parseRENAME(resp->body->content);
done:
freeResponse(resp);
free(oldAbsPath);
free(newAbsPath);
free(url);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
/**
* Get the file status for a given path.
*
* @param fs hdfsFS handle containing
* NameNode hostName/port information
* @param path Path for file
* @param printError Whether or not to print out error information
* (mainly remote FileNotFoundException)
* @return File information for the given path
*/
static hdfsFileInfo *hdfsGetPathInfoImpl(hdfsFS fs, const char* path,
int printError)
{
char *absPath = NULL;
char *url=NULL;
struct Response *resp = NULL;
int ret = 0;
hdfsFileInfo *fileInfo = NULL;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
if (!fileInfo) {
ret = ENOMEM;
goto done;
}
fileInfo->mKind = kObjectKindFile;
ret = createUrlForGetFileStatus(fs->nn, fs->port, absPath,
fs->userName, &url);
if (ret) {
goto done;
}
ret = launchGFS(url, &resp);
if (ret) {
goto done;
}
ret = parseGFS(resp->body->content, fileInfo, printError);
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret == 0) {
return fileInfo;
} else {
free(fileInfo);
errno = ret;
return NULL;
}
}
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
{
return hdfsGetPathInfoImpl(fs, path, 1);
}
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
{
char *url = NULL, *absPath = NULL;
struct Response *resp = NULL;
int ret = 0;
hdfsFileInfo *fileInfo = NULL;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
fileInfo = calloc(1, sizeof(*fileInfo));
if (!fileInfo) {
ret = ENOMEM;
goto done;
}
ret = createUrlForLS(fs->nn, fs->port, absPath, fs->userName, &url);
if (ret) {
goto done;
}
ret = launchLS(url, &resp);
if (ret) {
goto done;
}
ret = parseLS(resp->body->content, &fileInfo, numEntries);
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret == 0) {
return fileInfo;
} else {
errno = ret;
return NULL;
}
}
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
{
char *url = NULL, *absPath = NULL;
struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
ret = createUrlForSETREPLICATION(fs->nn, fs->port, absPath,
replication, fs->userName, &url);
if (ret) {
goto done;
}
ret = launchSETREPLICATION(url, &resp);
if (ret) {
goto done;
}
ret = parseSETREPLICATION(resp->body->content);
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
{
int i;
for (i = 0; i < numEntries; ++i) {
free(hdfsFileInfo[i].mName);
free(hdfsFileInfo[i].mOwner);
free(hdfsFileInfo[i].mGroup);
}
free(hdfsFileInfo);
}
int hdfsDelete(hdfsFS fs, const char* path, int recursive)
{
char *url = NULL, *absPath = NULL;
struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
ret = createUrlForDELETE(fs->nn, fs->port, absPath,
recursive, fs->userName, &url);
if (ret) {
goto done;
}
ret = launchDELETE(url, &resp);
if (ret) {
goto done;
}
ret = parseDELETE(resp->body->content);
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
{
char *url = NULL, *absPath = NULL;
struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
ret = getAbsolutePath(fs, path, &absPath);
if (ret) {
goto done;
}
ret = createUrlForUTIMES(fs->nn, fs->port, absPath, mtime, atime,
fs->userName, &url);
if (ret) {
goto done;
}
ret = launchUTIMES(url, &resp);
if (ret) {
goto done;
}
ret = parseUTIMES(resp->header->content, resp->body->content);
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsExists(hdfsFS fs, const char *path)
{
hdfsFileInfo *fileInfo = hdfsGetPathInfoImpl(fs, path, 0);
if (!fileInfo) {
// (errno will have been set by hdfsGetPathInfo)
return -1;
}
hdfsFreeFileInfo(fileInfo, 1);
return 0;
}
/**
* The information hold by the thread which writes data to hdfs through http
*/
typedef struct {
char *url; /* the url of the target datanode for writing*/
struct webhdfsBuffer *uploadBuffer; /* buffer storing data to write */
int flags; /* flag indicating writing mode: create or append */
struct Response *resp; /* response from the target datanode */
} threadData;
/**
* Free the threadData struct instance,
* including the response and url contained in it
* @param data The threadData instance to free
*/
static void freeThreadData(threadData *data)
{
if (data) {
if (data->url) {
free(data->url);
}
if (data->resp) {
freeResponse(data->resp);
}
// The uploadBuffer would be freed by freeWebFileHandle()
free(data);
data = NULL;
}
}
/**
* The action of the thread that writes data to
* the target datanode for hdfsWrite.
* The writing can be either create or append, which is specified by flag
*/
static void *writeThreadOperation(void *v)
{
int ret = 0;
threadData *data = v;
if (data->flags & O_APPEND) {
ret = launchDnAPPEND(data->url, data->uploadBuffer, &(data->resp));
} else {
ret = launchDnWRITE(data->url, data->uploadBuffer, &(data->resp));
}
if (ret) {
fprintf(stderr, "Failed to write to datanode %s, <%d>: %s.\n",
data->url, ret, hdfs_strerror(ret));
}
return data;
}
/**
* Free the memory associated with a webHDFS file handle.
*
* No other resources will be freed.
*
* @param file The webhdfs file handle
*/
static void freeFileInternal(hdfsFile file)
{
if (!file)
return;
freeWebFileHandle(file->file);
free(file);
}
/**
* Helper function for opening a file for OUTPUT.
*
* As part of the open process for OUTPUT files, we have to connect to the
* NameNode and get the URL of the corresponding DataNode.
* We also create a background thread here for doing I/O.
*
* @param webhandle The webhandle being opened
* @return 0 on success; error code otherwise
*/
static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file)
{
struct webhdfsFileHandle *webhandle = file->file;
struct Response *resp = NULL;
int append, ret = 0;
char *nnUrl = NULL, *dnUrl = NULL;
threadData *data = NULL;
ret = initWebHdfsBuffer(&webhandle->uploadBuffer);
if (ret) {
goto done;
}
append = file->flags & O_APPEND;
if (!append) {
// If we're not appending, send a create request to the NN
ret = createUrlForNnWRITE(fs->nn, fs->port, webhandle->absPath,
fs->userName, webhandle->replication,
webhandle->blockSize, &nnUrl);
} else {
ret = createUrlForNnAPPEND(fs->nn, fs->port, webhandle->absPath,
fs->userName, &nnUrl);
}
if (ret) {
fprintf(stderr, "Failed to create the url connecting to namenode "
"for file creation/appending, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
if (!append) {
ret = launchNnWRITE(nnUrl, &resp);
} else {
ret = launchNnAPPEND(nnUrl, &resp);
}
if (ret) {
fprintf(stderr, "fail to get the response from namenode for "
"file creation/appending, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
if (!append) {
ret = parseNnWRITE(resp->header->content, resp->body->content);
} else {
ret = parseNnAPPEND(resp->header->content, resp->body->content);
}
if (ret) {
fprintf(stderr, "fail to parse the response from namenode for "
"file creation/appending, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
ret = parseDnLoc(resp->header->content, &dnUrl);
if (ret) {
fprintf(stderr, "fail to get the datanode url from namenode "
"for file creation/appending, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
//store the datanode url in the file handle
webhandle->datanode = strdup(dnUrl);
if (!webhandle->datanode) {
ret = ENOMEM;
goto done;
}
//create a new thread for performing the http transferring
data = calloc(1, sizeof(*data));
if (!data) {
ret = ENOMEM;
goto done;
}
data->url = strdup(dnUrl);
if (!data->url) {
ret = ENOMEM;
goto done;
}
data->flags = file->flags;
data->uploadBuffer = webhandle->uploadBuffer;
ret = pthread_create(&webhandle->connThread, NULL,
writeThreadOperation, data);
if (ret) {
fprintf(stderr, "ERROR: failed to create the writing thread "
"in hdfsOpenOutputFileImpl, <%d>: %s.\n",
ret, hdfs_strerror(ret));
goto done;
}
webhandle->uploadBuffer->openFlag = 1;
done:
freeResponse(resp);
free(nnUrl);
free(dnUrl);
if (ret) {
errno = ret;
if (data) {
free(data->url);
free(data);
}
}
return ret;
}
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blockSize)
{
int ret = 0;
int accmode = flags & O_ACCMODE;
struct webhdfsFileHandle *webhandle = NULL;
hdfsFile file = NULL;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
if (accmode == O_RDWR) {
// TODO: the original libhdfs has very hackish support for this; should
// we do the same? It would actually be a lot easier in libwebhdfs
// since the protocol isn't connection-oriented.
fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
ret = ENOTSUP;
goto done;
}
if ((flags & O_CREAT) && (flags & O_EXCL)) {
fprintf(stderr,
"WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
}
file = calloc(1, sizeof(struct hdfsFile_internal));
if (!file) {
ret = ENOMEM;
goto done;
}
file->flags = flags;
file->type = accmode == O_RDONLY ? INPUT : OUTPUT;
file->offset = 0;
webhandle = calloc(1, sizeof(struct webhdfsFileHandle));
if (!webhandle) {
ret = ENOMEM;
goto done;
}
webhandle->bufferSize = bufferSize;
webhandle->replication = replication;
webhandle->blockSize = blockSize;
ret = getAbsolutePath(fs, path, &webhandle->absPath);
if (ret) {
goto done;
}
file->file = webhandle;
// If open for write/append,
// open and keep the connection with the target datanode for writing
if (file->type == OUTPUT) {
ret = hdfsOpenOutputFileImpl(fs, file);
if (ret) {
goto done;
}
}
done:
if (ret) {
if (file) {
freeFileInternal(file); // Also frees webhandle
} else {
freeWebFileHandle(webhandle);
}
errno = ret;
return NULL;
}
return file;
}
tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length)
{
if (length == 0) {
return 0;
}
if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) {
errno = EBADF;
return -1;
}
struct webhdfsFileHandle *wfile = file->file;
if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) {
resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length);
return length;
} else {
fprintf(stderr,
"Error: have not opened the file %s for writing yet.\n",
wfile->absPath);
errno = EBADF;
return -1;
}
}
int hdfsCloseFile(hdfsFS fs, hdfsFile file)
{
void *respv = NULL;
threadData *tdata = NULL;
int ret = 0;
struct webhdfsFileHandle *wfile = NULL;
if (file->type == OUTPUT) {
wfile = file->file;
pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex));
wfile->uploadBuffer->closeFlag = 1;
pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close);
pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex));
// Waiting for the writing thread to terminate
ret = pthread_join(wfile->connThread, &respv);
if (ret) {
fprintf(stderr, "Error when pthread_join in hdfsClose, <%d>: %s.\n",
ret, hdfs_strerror(ret));
}
// Parse the response
tdata = respv;
if (!tdata || !(tdata->resp)) {
fprintf(stderr,
"ERROR: response from the writing thread is NULL.\n");
ret = EIO;
}
if (file->flags & O_APPEND) {
ret = parseDnAPPEND(tdata->resp->header->content,
tdata->resp->body->content);
} else {
ret = parseDnWRITE(tdata->resp->header->content,
tdata->resp->body->content);
}
// Free the threaddata
freeThreadData(tdata);
}
freeFileInternal(file);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsFileIsOpenForRead(hdfsFile file)
{
return (file->type == INPUT);
}
int hdfsFileIsOpenForWrite(hdfsFile file)
{
return (file->type == OUTPUT);
}
static int hdfsReadImpl(hdfsFS fs, hdfsFile file, void* buffer, tSize off,
tSize length, tSize *numRead)
{
int ret = 0;
char *url = NULL;
struct Response *resp = NULL;
if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL ||
length < 0) {
ret = EINVAL;
goto done;
}
if (length == 0) {
// Special case: the user supplied a buffer of zero length, so there is
// nothing to do.
*numRead = 0;
goto done;
}
resp = calloc(1, sizeof(*resp)); // resp is actually a pointer type
if (!resp) {
ret = ENOMEM;
goto done;
}
ret = initResponseBuffer(&(resp->header));
if (ret) {
goto done;
}
ret = initResponseBuffer(&(resp->body));
if (ret) {
goto done;
}
memset(buffer, 0, length);
resp->body->content = buffer;
resp->body->remaining = length;
ret = createUrlForOPEN(fs->nn, fs->port, file->file->absPath,
fs->userName, off, length, &url);
if (ret) {
goto done;
}
ret = launchOPEN(url, resp);
if (ret) {
goto done;
}
ret = parseOPEN(resp->header->content, resp->body->content);
if (ret == -1) {
// Special case: if parseOPEN returns -1, we asked for a byte range
// with outside what the file contains. In this case, hdfsRead and
// hdfsPread return 0, meaning end-of-file.
*numRead = 0;
} else if (ret == 0) {
*numRead = (tSize) resp->body->offset;
}
done:
if (resp) {
freeResponseBuffer(resp->header);
free(resp->body);
}
free(resp);
free(url);
return ret;
}
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
{
int ret = 0;
tSize numRead = 0;
ret = hdfsReadImpl(fs, file, buffer, (tSize) file->offset,
length, &numRead);
if (ret > 0) { // ret == -1 means end of file
errno = ret;
return -1;
}
file->offset += numRead;
return numRead;
}
int hdfsAvailable(hdfsFS fs, hdfsFile file)
{
/* We actually always block when reading from webhdfs, currently. So the
* number of bytes that can be read without blocking is currently 0.
*/
return 0;
}
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos)
{
struct webhdfsFileHandle *wf;
hdfsFileInfo *fileInfo = NULL;
int ret = 0;
if (!fs || !file || (file->type == OUTPUT) || (desiredPos < 0)) {
ret = EINVAL;
goto done;
}
wf = file->file;
if (!wf) {
ret = EINVAL;
goto done;
}
fileInfo = hdfsGetPathInfo(fs, wf->absPath);
if (!fileInfo) {
ret = errno;
goto done;
}
if (desiredPos > fileInfo->mSize) {
fprintf(stderr,
"hdfsSeek for %s failed since the desired position %" PRId64
" is beyond the size of the file %" PRId64 "\n",
wf->absPath, desiredPos, fileInfo->mSize);
ret = ENOTSUP;
goto done;
}
file->offset = desiredPos;
done:
if (fileInfo) {
hdfsFreeFileInfo(fileInfo, 1);
}
if (ret) {
errno = ret;
return -1;
}
return 0;
}
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
void* buffer, tSize length)
{
int ret;
tSize numRead = 0;
if (position < 0) {
errno = EINVAL;
return -1;
}
ret = hdfsReadImpl(fs, file, buffer, (tSize) position, length, &numRead);
if (ret > 0) {
errno = ret;
return -1;
}
return numRead;
}
tOffset hdfsTell(hdfsFS fs, hdfsFile file)
{
if (!file) {
errno = EINVAL;
return -1;
}
return file->offset;
}
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize)
{
int strlength;
if (fs == NULL || buffer == NULL || bufferSize <= 0) {
errno = EINVAL;
return NULL;
}
strlength = snprintf(buffer, bufferSize, "%s", fs->workingDir);
if (strlength >= bufferSize) {
errno = ENAMETOOLONG;
return NULL;
} else if (strlength < 0) {
errno = EIO;
return NULL;
}
return buffer;
}
/** Replace "//" with "/" in path */
static void normalizePath(char *path)
{
int i = 0, j = 0, sawslash = 0;
for (i = j = sawslash = 0; path[i] != '\0'; i++) {
if (path[i] != '/') {
sawslash = 0;
path[j++] = path[i];
} else if (path[i] == '/' && !sawslash) {
sawslash = 1;
path[j++] = '/';
}
}
path[j] = '\0';
}
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
{
char *newWorkingDir = NULL;
size_t strlenPath = 0, newWorkingDirLen = 0;
int strlength;
if (fs == NULL || path == NULL) {
errno = EINVAL;
return -1;
}
strlenPath = strlen(path);
if (strlenPath < 1) {
errno = EINVAL;
return -1;
}
// the max string length of the new working dir is
// (length of old working dir) + (length of given path) + strlen("/") + 1
newWorkingDirLen = strlen(fs->workingDir) + strlenPath + 2;
newWorkingDir = malloc(newWorkingDirLen);
if (!newWorkingDir) {
errno = ENOMEM;
return -1;
}
strlength = snprintf(newWorkingDir, newWorkingDirLen, "%s%s%s",
(path[0] == '/') ? "" : fs->workingDir,
path, (path[strlenPath - 1] == '/') ? "" : "/");
if (strlength < 0 || strlength >= newWorkingDirLen) {
free(newWorkingDir);
errno = EIO;
return -1;
}
if (strstr(path, "//")) {
// normalize the path by replacing "//" with "/"
normalizePath(newWorkingDir);
}
free(fs->workingDir);
fs->workingDir = newWorkingDir;
return 0;
}
void hdfsFreeHosts(char ***blockHosts)
{
int i, j;
for (i=0; blockHosts[i]; i++) {
for (j=0; blockHosts[i][j]; j++) {
free(blockHosts[i][j]);
}
free(blockHosts[i]);
}
free(blockHosts);
}
tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
{
errno = ENOTSUP;
return -1;
}
int hdfsFileUsesDirectRead(hdfsFile file)
{
return 0; // webhdfs never performs direct reads.
}
void hdfsFileDisableDirectRead(hdfsFile file)
{
// webhdfs never performs direct reads
}
int hdfsHFlush(hdfsFS fs, hdfsFile file)
{
if (file->type != OUTPUT) {
errno = EINVAL;
return -1;
}
// TODO: block until our write buffer is flushed (HDFS-3952)
return 0;
}
int hdfsFlush(hdfsFS fs, hdfsFile file)
{
if (file->type != OUTPUT) {
errno = EINVAL;
return -1;
}
// TODO: block until our write buffer is flushed (HDFS-3952)
return 0;
}
char*** hdfsGetHosts(hdfsFS fs, const char* path,
tOffset start, tOffset length)
{
errno = ENOTSUP;
return NULL;
}
tOffset hdfsGetCapacity(hdfsFS fs)
{
errno = ENOTSUP;
return -1;
}
tOffset hdfsGetUsed(hdfsFS fs)
{
errno = ENOTSUP;
return -1;
}
int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
{
errno = ENOTSUP;
return -1;
}
int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
{
errno = ENOTSUP;
return -1;
}