blob: 2a43971f5d9c812a8ceb32fb6627e4afbcc805f4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <jni.h>
#include "webhdfs.h"
#include "hdfs_http_client.h"
#include "hdfs_http_query.h"
#include "hdfs_json_parser.h"
#include "jni_helper.h"
#include "exception.h"
#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration"
#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
static void initFileinfo(hdfsFileInfo *fileInfo) {
if (fileInfo) {
fileInfo->mKind = kObjectKindFile;
fileInfo->mName = NULL;
fileInfo->mLastMod = 0;
fileInfo->mSize = 0;
fileInfo->mReplication = 0;
fileInfo->mBlockSize = 0;
fileInfo->mOwner = NULL;
fileInfo->mGroup = NULL;
fileInfo->mPermissions = 0;
fileInfo->mLastAccess = 0;
}
}
static webhdfsBuffer *initWebHdfsBuffer() {
webhdfsBuffer *buffer = (webhdfsBuffer *) calloc(1, sizeof(webhdfsBuffer));
if (!buffer) {
fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n");
return NULL;
}
buffer->remaining = 0;
buffer->offset = 0;
buffer->wbuffer = NULL;
buffer->closeFlag = 0;
buffer->openFlag = 0;
pthread_mutex_init(&buffer->writeMutex, NULL);
pthread_cond_init(&buffer->newwrite_or_close, NULL);
pthread_cond_init(&buffer->transfer_finish, NULL);
return buffer;
}
static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, size_t length) {
if (buffer && length > 0) {
pthread_mutex_lock(&wb->writeMutex);
wb->wbuffer = buffer;
wb->offset = 0;
wb->remaining = length;
pthread_cond_signal(&wb->newwrite_or_close);
while (wb->remaining != 0) {
pthread_cond_wait(&wb->transfer_finish, &wb->writeMutex);
}
pthread_mutex_unlock(&wb->writeMutex);
}
return wb;
}
static void freeWebhdfsBuffer(webhdfsBuffer *buffer) {
if (buffer) {
int des = pthread_cond_destroy(&buffer->newwrite_or_close);
if (des == EBUSY) {
fprintf(stderr, "The condition newwrite_or_close is still referenced!\n");
} else if (des == EINVAL) {
fprintf(stderr, "The condition newwrite_or_close is invalid!\n");
}
des = pthread_cond_destroy(&buffer->transfer_finish);
if (des == EBUSY) {
fprintf(stderr, "The condition transfer_finish is still referenced!\n");
} else if (des == EINVAL) {
fprintf(stderr, "The condition transfer_finish is invalid!\n");
}
if (des == EBUSY) {
fprintf(stderr, "The condition close_clean is still referenced!\n");
} else if (des == EINVAL) {
fprintf(stderr, "The condition close_clean is invalid!\n");
}
des = pthread_mutex_destroy(&buffer->writeMutex);
if (des == EBUSY) {
fprintf(stderr, "The mutex is still locked or referenced!\n");
}
free(buffer);
buffer = NULL;
}
}
static void freeWebFileHandle(struct webhdfsFileHandle * handle) {
if (handle) {
freeWebhdfsBuffer(handle->uploadBuffer);
if (handle->datanode) {
free(handle->datanode);
}
if (handle->absPath) {
free(handle->absPath);
}
free(handle);
handle = NULL;
}
}
struct hdfsBuilder *hdfsNewBuilder(void)
{
struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
if (!bld) {
return NULL;
}
hdfsSetWorkingDirectory(bld, "/");
return bld;
}
void hdfsFreeBuilder(struct hdfsBuilder *bld)
{
if (bld && bld->workingDir) {
free(bld->workingDir);
}
free(bld);
}
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
{
if (bld) {
bld->forceNewInstance = 1;
}
}
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
{
if (bld) {
bld->nn = nn;
bld->nn_jni = nn;
}
}
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
{
if (bld) {
bld->port = port;
}
}
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
{
if (bld) {
bld->userName = userName;
}
}
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
const char *kerbTicketCachePath)
{
if (bld) {
bld->kerbTicketCachePath = kerbTicketCachePath;
}
}
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user)
{
struct hdfsBuilder* bld = hdfsNewBuilder();
if (!bld) {
return NULL;
}
hdfsBuilderSetNameNode(bld, nn);
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderSetUserName(bld, user);
return hdfsBuilderConnect(bld);
}
hdfsFS hdfsConnect(const char* nn, tPort port)
{
return hdfsConnectAsUser(nn, port, NULL);
}
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port)
{
struct hdfsBuilder* bld = (struct hdfsBuilder *) hdfsConnect(nn, port);
if (!bld) {
return NULL;
}
hdfsBuilderSetForceNewInstance(bld);
return bld;
}
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
const char *user)
{
struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld)
return NULL;
hdfsBuilderSetNameNode(bld, host);
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderSetUserName(bld, user);
hdfsBuilderSetForceNewInstance(bld);
return hdfsBuilderConnect(bld);
}
const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
char *buf, size_t bufLen);
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
{
if (!bld) {
return NULL;
}
// if the hostname is null for the namenode, set it to localhost
//only handle bld->nn
if (bld->nn == NULL) {
bld->nn = "localhost";
} else {
/* check whether the hostname of the namenode (nn in hdfsBuilder) has already contained the port */
const char *lastColon = rindex(bld->nn, ':');
if (lastColon && (strspn(lastColon + 1, "0123456789") == strlen(lastColon + 1))) {
fprintf(stderr, "port %d was given, but URI '%s' already "
"contains a port!\n", bld->port, bld->nn);
char *newAddr = (char *)malloc(strlen(bld->nn) - strlen(lastColon) + 1);
if (!newAddr) {
return NULL;
}
strncpy(newAddr, bld->nn, strlen(bld->nn) - strlen(lastColon));
newAddr[strlen(bld->nn) - strlen(lastColon)] = '\0';
free(bld->nn);
bld->nn = newAddr;
}
}
/* if the namenode is "default" and/or the port of namenode is 0, get the default namenode/port by using JNI */
if (bld->port == 0 || !strcasecmp("default", bld->nn)) {
JNIEnv *env = 0;
jobject jHDFSConf = NULL, jAddress = NULL;
jvalue jVal;
jthrowable jthr = NULL;
int ret = 0;
char buf[512];
//Get the JNIEnv* corresponding to current thread
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
free(bld);
bld = NULL;
return NULL;
}
// jHDFSConf = new HDFSConfiguration();
jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress",
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
jHDFSConf);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done; //free(bld), deleteReference for jHDFSConf
}
jAddress = jVal.l;
if (bld->port == 0) {
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
JAVA_INETSOCKETADDRESS, "getPort", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
bld->port = jVal.i;
}
if (!strcasecmp("default", bld->nn)) {
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
bld->nn = (const char*) ((*env)->GetStringUTFChars(env, jVal.l, NULL));
}
done:
destroyLocalReference(env, jHDFSConf);
destroyLocalReference(env, jAddress);
if (ret) { //if there is error/exception, we free the builder and return NULL
free(bld);
bld = NULL;
}
}
//for debug
fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port);
return bld;
}
int hdfsDisconnect(hdfsFS fs)
{
if (fs == NULL) {
errno = EBADF;
return -1;
} else {
free(fs);
fs = NULL;
}
return 0;
}
char *getAbsolutePath(hdfsFS fs, const char *path) {
if (fs == NULL || path == NULL) {
return NULL;
}
char *absPath = NULL;
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
if ('/' != *path && bld->workingDir) {
absPath = (char *)malloc(strlen(bld->workingDir) + strlen(path) + 1);
if (!absPath) {
return NULL;
}
absPath = strcpy(absPath, bld->workingDir);
absPath = strcat(absPath, path);
return absPath;
} else {
absPath = (char *)malloc(strlen(path) + 1);
if (!absPath) {
return NULL;
}
absPath = strcpy(absPath, path);
return absPath;
}
}
int hdfsCreateDirectory(hdfsFS fs, const char* path)
{
if (fs == NULL || path == NULL) {
return -1;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url = NULL;
Response resp = NULL;
int ret = 0;
if(!((url = prepareMKDIR(bld->nn, bld->port, absPath, bld->userName))
&& (resp = launchMKDIR(url))
&& (parseMKDIR(resp->body->content)))) {
ret = -1;
}
freeResponse(resp);
free(url);
free(absPath);
return ret;
}
int hdfsChmod(hdfsFS fs, const char* path, short mode)
{
if (fs == NULL || path == NULL) {
return -1;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url=NULL;
Response resp = NULL;
int ret = 0;
if(!((url = prepareCHMOD(bld->nn, bld->port, absPath, (int)mode, bld->userName))
&& (resp = launchCHMOD(url))
&& (parseCHMOD(resp->header->content, resp->body->content)))) {
ret = -1;
}
freeResponse(resp);
free(absPath);
free(url);
return ret;
}
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
{
if (fs == NULL || path == NULL) {
return -1;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url=NULL;
Response resp = NULL;
int ret = 0;
if(!((url = prepareCHOWN(bld->nn, bld->port, absPath, owner, group, bld->userName))
&& (resp = launchCHOWN(url))
&& (parseCHOWN(resp->header->content, resp->body->content)))) {
ret = -1;
}
freeResponse(resp);
free(absPath);
free(url);
return ret;
}
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
{
if (fs == NULL || oldPath == NULL || newPath == NULL) {
return -1;
}
char *oldAbsPath = getAbsolutePath(fs, oldPath);
if (!oldAbsPath) {
return -1;
}
char *newAbsPath = getAbsolutePath(fs, newPath);
if (!newAbsPath) {
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url=NULL;
Response resp = NULL;
int ret = 0;
if(!((url = prepareRENAME(bld->nn, bld->port, oldAbsPath, newAbsPath, bld->userName))
&& (resp = launchRENAME(url))
&& (parseRENAME(resp->body->content)))) {
ret = -1;
}
freeResponse(resp);
free(oldAbsPath);
free(newAbsPath);
free(url);
return ret;
}
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
{
if (fs == NULL || path == NULL) {
return NULL;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return NULL;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url=NULL;
Response resp = NULL;
int numEntries = 0;
int ret = 0;
hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
if (!fileInfo) {
ret = -1;
goto done;
}
initFileinfo(fileInfo);
if(!((url = prepareGFS(bld->nn, bld->port, absPath, bld->userName))
&& (resp = launchGFS(url))
&& (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries)))) {
ret = -1;
goto done;
}
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret == 0) {
return fileInfo;
} else {
free(fileInfo);
return NULL;
}
}
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
{
if (fs == NULL || path == NULL) {
return NULL;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return NULL;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url = NULL;
Response resp = NULL;
int ret = 0;
hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
if (!fileInfo) {
ret = -1;
goto done;
}
if(!((url = prepareLS(bld->nn, bld->port, absPath, bld->userName))
&& (resp = launchLS(url))
&& (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries)))) {
ret = -1;
goto done;
}
done:
freeResponse(resp);
free(absPath);
free(url);
if (ret == 0) {
return fileInfo;
} else {
free(fileInfo);
return NULL;
}
}
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
{
if (fs == NULL || path == NULL) {
return -1;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url = NULL;
Response resp = NULL;
int ret = 0;
if(!((url = prepareSETREPLICATION(bld->nn, bld->port, absPath, replication, bld->userName))
&& (resp = launchSETREPLICATION(url))
&& (parseSETREPLICATION(resp->body->content)))) {
ret = -1;
}
freeResponse(resp);
free(absPath);
free(url);
return ret;
}
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
{
//Free the mName, mOwner, and mGroup
int i;
for (i=0; i < numEntries; ++i) {
if (hdfsFileInfo[i].mName) {
free(hdfsFileInfo[i].mName);
}
if (hdfsFileInfo[i].mOwner) {
free(hdfsFileInfo[i].mOwner);
}
if (hdfsFileInfo[i].mGroup) {
free(hdfsFileInfo[i].mGroup);
}
}
//Free entire block
free(hdfsFileInfo);
hdfsFileInfo = NULL;
}
int hdfsDelete(hdfsFS fs, const char* path, int recursive)
{
if (fs == NULL || path == NULL) {
return -1;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url = NULL;
Response resp = NULL;
int ret = 0;
if(!((url = prepareDELETE(bld->nn, bld->port, absPath, recursive, bld->userName))
&& (resp = launchDELETE(url))
&& (parseDELETE(resp->body->content)))) {
ret = -1;
}
freeResponse(resp);
free(absPath);
free(url);
return ret;
}
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
{
if (fs == NULL || path == NULL) {
return -1;
}
char *absPath = getAbsolutePath(fs, path);
if (!absPath) {
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url = NULL;
Response resp = NULL;
int ret = 0;
if(!((url = prepareUTIMES(bld->nn, bld->port, absPath, mtime, atime, bld->userName))
&& (resp = launchUTIMES(url))
&& (parseUTIMES(resp->header->content, resp->body->content)))) {
ret = -1;
}
freeResponse(resp);
free(absPath);
free(url);
return ret;
}
int hdfsExists(hdfsFS fs, const char *path)
{
hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path);
if (fileInfo) {
hdfsFreeFileInfo(fileInfo, 1);
return 0;
} else {
return -1;
}
}
typedef struct {
char *url;
webhdfsBuffer *uploadBuffer;
int flags;
Response resp;
} threadData;
static void freeThreadData(threadData *data) {
if (data) {
if (data->url) {
free(data->url);
}
if (data->resp) {
freeResponse(data->resp);
}
//the uploadBuffer would be freed by freeWebFileHandle()
free(data);
data = NULL;
}
}
static void *writeThreadOperation(void *v) {
threadData *data = (threadData *) v;
if (data->flags & O_APPEND) {
data->resp = launchDnAPPEND(data->url, data->uploadBuffer);
} else {
data->resp = launchDnWRITE(data->url, data->uploadBuffer);
}
return data;
}
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blockSize)
{
/*
* the original version of libhdfs based on JNI store a fsinputstream/fsoutputstream in the hdfsFile
* in libwebhdfs that is based on webhdfs, we store (absolute_path, buffersize, replication, blocksize) in it
*/
if (fs == NULL || path == NULL) {
return NULL;
}
int accmode = flags & O_ACCMODE;
if (accmode == O_RDWR) {
fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
errno = ENOTSUP;
return NULL;
}
if ((flags & O_CREAT) && (flags & O_EXCL)) {
fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
}
hdfsFile hdfsFileHandle = (hdfsFile) calloc(1, sizeof(struct hdfsFile_internal));
if (!hdfsFileHandle) {
return NULL;
}
int ret = 0;
hdfsFileHandle->flags = flags;
hdfsFileHandle->type = accmode == O_RDONLY ? INPUT : OUTPUT;
hdfsFileHandle->offset = 0;
struct webhdfsFileHandle *webhandle = (struct webhdfsFileHandle *) calloc(1, sizeof(struct webhdfsFileHandle));
if (!webhandle) {
ret = -1;
goto done;
}
webhandle->bufferSize = bufferSize;
webhandle->replication = replication;
webhandle->blockSize = blockSize;
webhandle->absPath = getAbsolutePath(fs, path);
if (!webhandle->absPath) {
ret = -1;
goto done;
}
hdfsFileHandle->file = webhandle;
//for write/append, need to connect to the namenode
//and get the url of corresponding datanode
if (hdfsFileHandle->type == OUTPUT) {
webhandle->uploadBuffer = initWebHdfsBuffer();
if (!webhandle->uploadBuffer) {
ret = -1;
goto done;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
char *url = NULL;
Response resp = NULL;
int append = flags & O_APPEND;
int create = append ? 0 : 1;
//if create: send create request to NN
if (create) {
url = prepareNnWRITE(bld->nn, bld->port, webhandle->absPath, bld->userName, webhandle->replication, webhandle->blockSize);
} else if (append) {
url = prepareNnAPPEND(bld->nn, bld->port, webhandle->absPath, bld->userName);
}
if (!url) {
fprintf(stderr,
"fail to create the url connecting to namenode for file creation/appending\n");
ret = -1;
goto done;
}
if (create) {
resp = launchNnWRITE(url);
} else if (append) {
resp = launchNnAPPEND(url);
}
if (!resp) {
fprintf(stderr,
"fail to get the response from namenode for file creation/appending\n");
free(url);
ret = -1;
goto done;
}
int parseRet = 0;
if (create) {
parseRet = parseNnWRITE(resp->header->content, resp->body->content);
} else if (append) {
parseRet = parseNnAPPEND(resp->header->content, resp->body->content);
}
if (!parseRet) {
fprintf(stderr,
"fail to parse the response from namenode for file creation/appending\n");
free(url);
freeResponse(resp);
ret = -1;
goto done;
}
free(url);
url = parseDnLoc(resp->header->content);
if (!url) {
fprintf(stderr,
"fail to get the datanode url from namenode for file creation/appending\n");
freeResponse(resp);
ret = -1;
return NULL;
}
freeResponse(resp);
//store the datanode url in the file handle
webhandle->datanode = strdup(url);
//create a new thread for performing the http transferring
threadData *data = (threadData *) calloc(1, sizeof(threadData));
if (!data) {
ret = -1;
goto done;
}
data->url = strdup(url);
data->flags = flags;
data->uploadBuffer = webhandle->uploadBuffer;
free(url);
ret = pthread_create(&webhandle->connThread, NULL, writeThreadOperation, data);
if (ret) {
fprintf(stderr, "Failed to create the writing thread.\n");
} else {
webhandle->uploadBuffer->openFlag = 1;
}
}
done:
if (ret == 0) {
return hdfsFileHandle;
} else {
freeWebFileHandle(webhandle);
free(hdfsFileHandle);
return NULL;
}
}
tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length)
{
if (length == 0) {
return 0;
}
if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) {
return -1;
}
struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) {
resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length);
return length;
} else {
fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath);
return -1;
}
}
int hdfsCloseFile(hdfsFS fs, hdfsFile file)
{
int ret = 0;
fprintf(stderr, "to close file...\n");
if (file->type == OUTPUT) {
void *respv;
threadData *tdata;
struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex));
wfile->uploadBuffer->closeFlag = 1;
pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close);
pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex));
//waiting for the writing thread to terminate
ret = pthread_join(wfile->connThread, &respv);
if (ret) {
fprintf(stderr, "Error (code %d) when pthread_join.\n", ret);
}
//parse the response
tdata = (threadData *) respv;
if (!tdata) {
fprintf(stderr, "Response from the writing thread is NULL.\n");
ret = -1;
}
if (file->flags & O_APPEND) {
parseDnAPPEND(tdata->resp->header->content, tdata->resp->body->content);
} else {
parseDnWRITE(tdata->resp->header->content, tdata->resp->body->content);
}
//free the threaddata
freeThreadData(tdata);
}
fprintf(stderr, "To clean the webfilehandle...\n");
if (file) {
freeWebFileHandle(file->file);
free(file);
file = NULL;
fprintf(stderr, "Cleaned the webfilehandle...\n");
}
return ret;
}
int hdfsFileIsOpenForRead(hdfsFile file)
{
return (file->type == INPUT);
}
int hdfsFileIsOpenForWrite(hdfsFile file)
{
return (file->type == OUTPUT);
}
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
{
if (length == 0) {
return 0;
}
if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || length < 0) {
errno = EINVAL;
return -1;
}
struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
struct webhdfsFileHandle *webFile = (struct webhdfsFileHandle *) file->file;
char *url = NULL;
Response resp = NULL;
int openResult = -1;
resp = (Response) calloc(1, sizeof(*resp));
if (!resp) {
return -1;
}
resp->header = initResponseBuffer();
resp->body = initResponseBuffer();
resp->body->content = buffer;
resp->body->remaining = length;
if (!((url = prepareOPEN(bld->nn, bld->port, webFile->absPath, bld->userName, file->offset, length))
&& (resp = launchOPEN(url, resp))
&& ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) {
free(url);
freeResponseBuffer(resp->header);
if (openResult == 0) {
return 0;
} else {
return -1;
}
}
size_t readSize = resp->body->offset;
file->offset += readSize;
freeResponseBuffer(resp->header);
free(resp->body);
free(resp);
free(url);
return readSize;
}
int hdfsAvailable(hdfsFS fs, hdfsFile file)
{
if (!file || !fs) {
return -1;
}
struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
if (!wf) {
return -1;
}
hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
if (fileInfo) {
int available = (int)(fileInfo->mSize - file->offset);
hdfsFreeFileInfo(fileInfo, 1);
return available;
} else {
return -1;
}
}
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos)
{
if (!fs || !file || desiredPos < 0) {
return -1;
}
struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
if (!wf) {
return -1;
}
hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
int ret = 0;
if (fileInfo) {
if (fileInfo->mSize < desiredPos) {
errno = ENOTSUP;
fprintf(stderr,
"hdfsSeek for %s failed since the desired position %lld is beyond the size of the file %lld\n",
wf->absPath, desiredPos, fileInfo->mSize);
ret = -1;
} else {
file->offset = desiredPos;
}
hdfsFreeFileInfo(fileInfo, 1);
return ret;
} else {
return -1;
}
}
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length)
{
if (!fs || !file || file->type != INPUT || position < 0 || !buffer || length < 0) {
return -1;
}
file->offset = position;
return hdfsRead(fs, file, buffer, length);
}
tOffset hdfsTell(hdfsFS fs, hdfsFile file)
{
if (!file) {
return -1;
}
return file->offset;
}
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize)
{
if (fs == NULL || buffer == NULL || bufferSize <= 0) {
return NULL;
}
struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
if (bld->workingDir) {
strncpy(buffer, bld->workingDir, bufferSize);
}
return buffer;
}
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
{
if (fs == NULL || path == NULL) {
return -1;
}
struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
free(bld->workingDir);
bld->workingDir = (char *)malloc(strlen(path) + 1);
if (!(bld->workingDir)) {
return -1;
}
strcpy(bld->workingDir, path);
return 0;
}
void hdfsFreeHosts(char ***blockHosts)
{
int i, j;
for (i=0; blockHosts[i]; i++) {
for (j=0; blockHosts[i][j]; j++) {
free(blockHosts[i][j]);
}
free(blockHosts[i]);
}
free(blockHosts);
}
/* not useful for libwebhdfs */
int hdfsFileUsesDirectRead(hdfsFile file)
{
/* return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); */
fprintf(stderr, "hdfsFileUsesDirectRead is no longer useful for libwebhdfs.\n");
return -1;
}
/* not useful for libwebhdfs */
void hdfsFileDisableDirectRead(hdfsFile file)
{
/* file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; */
fprintf(stderr, "hdfsFileDisableDirectRead is no longer useful for libwebhdfs.\n");
}
/* not useful for libwebhdfs */
int hdfsHFlush(hdfsFS fs, hdfsFile file)
{
return 0;
}
/* not useful for libwebhdfs */
int hdfsFlush(hdfsFS fs, hdfsFile file)
{
return 0;
}
char*** hdfsGetHosts(hdfsFS fs, const char* path,
tOffset start, tOffset length)
{
fprintf(stderr, "hdfsGetHosts is not but will be supported by libwebhdfs yet.\n");
return NULL;
}
tOffset hdfsGetCapacity(hdfsFS fs)
{
fprintf(stderr, "hdfsGetCapacity is not but will be supported by libwebhdfs.\n");
return -1;
}
tOffset hdfsGetUsed(hdfsFS fs)
{
fprintf(stderr, "hdfsGetUsed is not but will be supported by libwebhdfs yet.\n");
return -1;
}