| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <jni.h> |
| #include "webhdfs.h" |
| #include "hdfs_http_client.h" |
| #include "hdfs_http_query.h" |
| #include "hdfs_json_parser.h" |
| #include "jni_helper.h" |
| #include "exception.h" |
| |
| #define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration" |
| #define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode" |
| #define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress" |
| |
| static void initFileinfo(hdfsFileInfo *fileInfo) { |
| if (fileInfo) { |
| fileInfo->mKind = kObjectKindFile; |
| fileInfo->mName = NULL; |
| fileInfo->mLastMod = 0; |
| fileInfo->mSize = 0; |
| fileInfo->mReplication = 0; |
| fileInfo->mBlockSize = 0; |
| fileInfo->mOwner = NULL; |
| fileInfo->mGroup = NULL; |
| fileInfo->mPermissions = 0; |
| fileInfo->mLastAccess = 0; |
| } |
| } |
| |
| static webhdfsBuffer *initWebHdfsBuffer() { |
| webhdfsBuffer *buffer = (webhdfsBuffer *) calloc(1, sizeof(webhdfsBuffer)); |
| if (!buffer) { |
| fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n"); |
| return NULL; |
| } |
| buffer->remaining = 0; |
| buffer->offset = 0; |
| buffer->wbuffer = NULL; |
| buffer->closeFlag = 0; |
| buffer->openFlag = 0; |
| pthread_mutex_init(&buffer->writeMutex, NULL); |
| pthread_cond_init(&buffer->newwrite_or_close, NULL); |
| pthread_cond_init(&buffer->transfer_finish, NULL); |
| return buffer; |
| } |
| |
| static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, size_t length) { |
| if (buffer && length > 0) { |
| pthread_mutex_lock(&wb->writeMutex); |
| wb->wbuffer = buffer; |
| wb->offset = 0; |
| wb->remaining = length; |
| pthread_cond_signal(&wb->newwrite_or_close); |
| while (wb->remaining != 0) { |
| pthread_cond_wait(&wb->transfer_finish, &wb->writeMutex); |
| } |
| pthread_mutex_unlock(&wb->writeMutex); |
| } |
| return wb; |
| } |
| |
| static void freeWebhdfsBuffer(webhdfsBuffer *buffer) { |
| if (buffer) { |
| int des = pthread_cond_destroy(&buffer->newwrite_or_close); |
| if (des == EBUSY) { |
| fprintf(stderr, "The condition newwrite_or_close is still referenced!\n"); |
| } else if (des == EINVAL) { |
| fprintf(stderr, "The condition newwrite_or_close is invalid!\n"); |
| } |
| des = pthread_cond_destroy(&buffer->transfer_finish); |
| if (des == EBUSY) { |
| fprintf(stderr, "The condition transfer_finish is still referenced!\n"); |
| } else if (des == EINVAL) { |
| fprintf(stderr, "The condition transfer_finish is invalid!\n"); |
| } |
| if (des == EBUSY) { |
| fprintf(stderr, "The condition close_clean is still referenced!\n"); |
| } else if (des == EINVAL) { |
| fprintf(stderr, "The condition close_clean is invalid!\n"); |
| } |
| des = pthread_mutex_destroy(&buffer->writeMutex); |
| if (des == EBUSY) { |
| fprintf(stderr, "The mutex is still locked or referenced!\n"); |
| } |
| free(buffer); |
| buffer = NULL; |
| } |
| } |
| |
| static void freeWebFileHandle(struct webhdfsFileHandle * handle) { |
| if (handle) { |
| freeWebhdfsBuffer(handle->uploadBuffer); |
| if (handle->datanode) { |
| free(handle->datanode); |
| } |
| if (handle->absPath) { |
| free(handle->absPath); |
| } |
| free(handle); |
| handle = NULL; |
| } |
| } |
| |
| struct hdfsBuilder *hdfsNewBuilder(void) |
| { |
| struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder)); |
| if (!bld) { |
| return NULL; |
| } |
| hdfsSetWorkingDirectory(bld, "/"); |
| return bld; |
| } |
| |
| void hdfsFreeBuilder(struct hdfsBuilder *bld) |
| { |
| if (bld && bld->workingDir) { |
| free(bld->workingDir); |
| } |
| free(bld); |
| } |
| |
| void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) |
| { |
| if (bld) { |
| bld->forceNewInstance = 1; |
| } |
| } |
| |
| void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) |
| { |
| if (bld) { |
| bld->nn = nn; |
| bld->nn_jni = nn; |
| } |
| } |
| |
| void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) |
| { |
| if (bld) { |
| bld->port = port; |
| } |
| } |
| |
| void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) |
| { |
| if (bld) { |
| bld->userName = userName; |
| } |
| } |
| |
| void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, |
| const char *kerbTicketCachePath) |
| { |
| if (bld) { |
| bld->kerbTicketCachePath = kerbTicketCachePath; |
| } |
| } |
| |
| hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) |
| { |
| struct hdfsBuilder* bld = hdfsNewBuilder(); |
| if (!bld) { |
| return NULL; |
| } |
| hdfsBuilderSetNameNode(bld, nn); |
| hdfsBuilderSetNameNodePort(bld, port); |
| hdfsBuilderSetUserName(bld, user); |
| return hdfsBuilderConnect(bld); |
| } |
| |
| hdfsFS hdfsConnect(const char* nn, tPort port) |
| { |
| return hdfsConnectAsUser(nn, port, NULL); |
| } |
| |
| hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) |
| { |
| struct hdfsBuilder* bld = (struct hdfsBuilder *) hdfsConnect(nn, port); |
| if (!bld) { |
| return NULL; |
| } |
| hdfsBuilderSetForceNewInstance(bld); |
| return bld; |
| } |
| |
| hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, |
| const char *user) |
| { |
| struct hdfsBuilder *bld = hdfsNewBuilder(); |
| if (!bld) |
| return NULL; |
| hdfsBuilderSetNameNode(bld, host); |
| hdfsBuilderSetNameNodePort(bld, port); |
| hdfsBuilderSetUserName(bld, user); |
| hdfsBuilderSetForceNewInstance(bld); |
| return hdfsBuilderConnect(bld); |
| } |
| |
| const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, |
| char *buf, size_t bufLen); |
| |
| hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) |
| { |
| if (!bld) { |
| return NULL; |
| } |
| // if the hostname is null for the namenode, set it to localhost |
| //only handle bld->nn |
| if (bld->nn == NULL) { |
| bld->nn = "localhost"; |
| } else { |
| /* check whether the hostname of the namenode (nn in hdfsBuilder) has already contained the port */ |
| const char *lastColon = rindex(bld->nn, ':'); |
| if (lastColon && (strspn(lastColon + 1, "0123456789") == strlen(lastColon + 1))) { |
| fprintf(stderr, "port %d was given, but URI '%s' already " |
| "contains a port!\n", bld->port, bld->nn); |
| char *newAddr = (char *)malloc(strlen(bld->nn) - strlen(lastColon) + 1); |
| if (!newAddr) { |
| return NULL; |
| } |
| strncpy(newAddr, bld->nn, strlen(bld->nn) - strlen(lastColon)); |
| newAddr[strlen(bld->nn) - strlen(lastColon)] = '\0'; |
| free(bld->nn); |
| bld->nn = newAddr; |
| } |
| } |
| |
| /* if the namenode is "default" and/or the port of namenode is 0, get the default namenode/port by using JNI */ |
| if (bld->port == 0 || !strcasecmp("default", bld->nn)) { |
| JNIEnv *env = 0; |
| jobject jHDFSConf = NULL, jAddress = NULL; |
| jvalue jVal; |
| jthrowable jthr = NULL; |
| int ret = 0; |
| char buf[512]; |
| |
| //Get the JNIEnv* corresponding to current thread |
| env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| free(bld); |
| bld = NULL; |
| return NULL; |
| } |
| |
| // jHDFSConf = new HDFSConfiguration(); |
| jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress", |
| "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;", |
| jHDFSConf); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; //free(bld), deleteReference for jHDFSConf |
| } |
| jAddress = jVal.l; |
| |
| if (bld->port == 0) { |
| jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, |
| JAVA_INETSOCKETADDRESS, "getPort", "()I"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| bld->port = jVal.i; |
| } |
| |
| if (!strcasecmp("default", bld->nn)) { |
| jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, |
| JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| bld->nn = (const char*) ((*env)->GetStringUTFChars(env, jVal.l, NULL)); |
| } |
| |
| done: |
| destroyLocalReference(env, jHDFSConf); |
| destroyLocalReference(env, jAddress); |
| if (ret) { //if there is error/exception, we free the builder and return NULL |
| free(bld); |
| bld = NULL; |
| } |
| } |
| |
| //for debug |
| fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port); |
| return bld; |
| } |
| |
| int hdfsDisconnect(hdfsFS fs) |
| { |
| if (fs == NULL) { |
| errno = EBADF; |
| return -1; |
| } else { |
| free(fs); |
| fs = NULL; |
| } |
| return 0; |
| } |
| |
| char *getAbsolutePath(hdfsFS fs, const char *path) { |
| if (fs == NULL || path == NULL) { |
| return NULL; |
| } |
| char *absPath = NULL; |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| |
| if ('/' != *path && bld->workingDir) { |
| absPath = (char *)malloc(strlen(bld->workingDir) + strlen(path) + 1); |
| if (!absPath) { |
| return NULL; |
| } |
| absPath = strcpy(absPath, bld->workingDir); |
| absPath = strcat(absPath, path); |
| return absPath; |
| } else { |
| absPath = (char *)malloc(strlen(path) + 1); |
| if (!absPath) { |
| return NULL; |
| } |
| absPath = strcpy(absPath, path); |
| return absPath; |
| } |
| } |
| |
| int hdfsCreateDirectory(hdfsFS fs, const char* path) |
| { |
| if (fs == NULL || path == NULL) { |
| return -1; |
| } |
| |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return -1; |
| } |
| |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url = NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| if(!((url = prepareMKDIR(bld->nn, bld->port, absPath, bld->userName)) |
| && (resp = launchMKDIR(url)) |
| && (parseMKDIR(resp->body->content)))) { |
| ret = -1; |
| } |
| |
| freeResponse(resp); |
| free(url); |
| free(absPath); |
| return ret; |
| } |
| |
| int hdfsChmod(hdfsFS fs, const char* path, short mode) |
| { |
| if (fs == NULL || path == NULL) { |
| return -1; |
| } |
| |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return -1; |
| } |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url=NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| if(!((url = prepareCHMOD(bld->nn, bld->port, absPath, (int)mode, bld->userName)) |
| && (resp = launchCHMOD(url)) |
| && (parseCHMOD(resp->header->content, resp->body->content)))) { |
| ret = -1; |
| } |
| |
| freeResponse(resp); |
| free(absPath); |
| free(url); |
| return ret; |
| } |
| |
| int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) |
| { |
| if (fs == NULL || path == NULL) { |
| return -1; |
| } |
| |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return -1; |
| } |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url=NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| if(!((url = prepareCHOWN(bld->nn, bld->port, absPath, owner, group, bld->userName)) |
| && (resp = launchCHOWN(url)) |
| && (parseCHOWN(resp->header->content, resp->body->content)))) { |
| ret = -1; |
| } |
| |
| freeResponse(resp); |
| free(absPath); |
| free(url); |
| return ret; |
| } |
| |
| int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) |
| { |
| if (fs == NULL || oldPath == NULL || newPath == NULL) { |
| return -1; |
| } |
| |
| char *oldAbsPath = getAbsolutePath(fs, oldPath); |
| if (!oldAbsPath) { |
| return -1; |
| } |
| char *newAbsPath = getAbsolutePath(fs, newPath); |
| if (!newAbsPath) { |
| return -1; |
| } |
| |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url=NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| if(!((url = prepareRENAME(bld->nn, bld->port, oldAbsPath, newAbsPath, bld->userName)) |
| && (resp = launchRENAME(url)) |
| && (parseRENAME(resp->body->content)))) { |
| ret = -1; |
| } |
| |
| freeResponse(resp); |
| free(oldAbsPath); |
| free(newAbsPath); |
| free(url); |
| return ret; |
| } |
| |
| hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) |
| { |
| if (fs == NULL || path == NULL) { |
| return NULL; |
| } |
| |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return NULL; |
| } |
| |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url=NULL; |
| Response resp = NULL; |
| int numEntries = 0; |
| int ret = 0; |
| |
| hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo)); |
| if (!fileInfo) { |
| ret = -1; |
| goto done; |
| } |
| initFileinfo(fileInfo); |
| |
| if(!((url = prepareGFS(bld->nn, bld->port, absPath, bld->userName)) |
| && (resp = launchGFS(url)) |
| && (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries)))) { |
| ret = -1; |
| goto done; |
| } |
| |
| done: |
| freeResponse(resp); |
| free(absPath); |
| free(url); |
| |
| if (ret == 0) { |
| return fileInfo; |
| } else { |
| free(fileInfo); |
| return NULL; |
| } |
| } |
| |
| hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) |
| { |
| if (fs == NULL || path == NULL) { |
| return NULL; |
| } |
| |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return NULL; |
| } |
| |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url = NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo)); |
| if (!fileInfo) { |
| ret = -1; |
| goto done; |
| } |
| |
| if(!((url = prepareLS(bld->nn, bld->port, absPath, bld->userName)) |
| && (resp = launchLS(url)) |
| && (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries)))) { |
| ret = -1; |
| goto done; |
| } |
| |
| done: |
| freeResponse(resp); |
| free(absPath); |
| free(url); |
| |
| if (ret == 0) { |
| return fileInfo; |
| } else { |
| free(fileInfo); |
| return NULL; |
| } |
| } |
| |
| int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) |
| { |
| if (fs == NULL || path == NULL) { |
| return -1; |
| } |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return -1; |
| } |
| |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url = NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| if(!((url = prepareSETREPLICATION(bld->nn, bld->port, absPath, replication, bld->userName)) |
| && (resp = launchSETREPLICATION(url)) |
| && (parseSETREPLICATION(resp->body->content)))) { |
| ret = -1; |
| } |
| |
| freeResponse(resp); |
| free(absPath); |
| free(url); |
| return ret; |
| } |
| |
| void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) |
| { |
| //Free the mName, mOwner, and mGroup |
| int i; |
| for (i=0; i < numEntries; ++i) { |
| if (hdfsFileInfo[i].mName) { |
| free(hdfsFileInfo[i].mName); |
| } |
| if (hdfsFileInfo[i].mOwner) { |
| free(hdfsFileInfo[i].mOwner); |
| } |
| if (hdfsFileInfo[i].mGroup) { |
| free(hdfsFileInfo[i].mGroup); |
| } |
| } |
| |
| //Free entire block |
| free(hdfsFileInfo); |
| hdfsFileInfo = NULL; |
| } |
| |
| int hdfsDelete(hdfsFS fs, const char* path, int recursive) |
| { |
| if (fs == NULL || path == NULL) { |
| return -1; |
| } |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return -1; |
| } |
| |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url = NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| if(!((url = prepareDELETE(bld->nn, bld->port, absPath, recursive, bld->userName)) |
| && (resp = launchDELETE(url)) |
| && (parseDELETE(resp->body->content)))) { |
| ret = -1; |
| } |
| |
| freeResponse(resp); |
| free(absPath); |
| free(url); |
| return ret; |
| } |
| |
| int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) |
| { |
| if (fs == NULL || path == NULL) { |
| return -1; |
| } |
| char *absPath = getAbsolutePath(fs, path); |
| if (!absPath) { |
| return -1; |
| } |
| |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url = NULL; |
| Response resp = NULL; |
| int ret = 0; |
| |
| if(!((url = prepareUTIMES(bld->nn, bld->port, absPath, mtime, atime, bld->userName)) |
| && (resp = launchUTIMES(url)) |
| && (parseUTIMES(resp->header->content, resp->body->content)))) { |
| ret = -1; |
| } |
| |
| freeResponse(resp); |
| free(absPath); |
| free(url); |
| return ret; |
| } |
| |
| int hdfsExists(hdfsFS fs, const char *path) |
| { |
| hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path); |
| if (fileInfo) { |
| hdfsFreeFileInfo(fileInfo, 1); |
| return 0; |
| } else { |
| return -1; |
| } |
| } |
| |
| typedef struct { |
| char *url; |
| webhdfsBuffer *uploadBuffer; |
| int flags; |
| Response resp; |
| } threadData; |
| |
| static void freeThreadData(threadData *data) { |
| if (data) { |
| if (data->url) { |
| free(data->url); |
| } |
| if (data->resp) { |
| freeResponse(data->resp); |
| } |
| //the uploadBuffer would be freed by freeWebFileHandle() |
| free(data); |
| data = NULL; |
| } |
| } |
| |
| static void *writeThreadOperation(void *v) { |
| threadData *data = (threadData *) v; |
| if (data->flags & O_APPEND) { |
| data->resp = launchDnAPPEND(data->url, data->uploadBuffer); |
| } else { |
| data->resp = launchDnWRITE(data->url, data->uploadBuffer); |
| } |
| return data; |
| } |
| |
| hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, |
| int bufferSize, short replication, tSize blockSize) |
| { |
| /* |
| * the original version of libhdfs based on JNI store a fsinputstream/fsoutputstream in the hdfsFile |
| * in libwebhdfs that is based on webhdfs, we store (absolute_path, buffersize, replication, blocksize) in it |
| */ |
| if (fs == NULL || path == NULL) { |
| return NULL; |
| } |
| |
| int accmode = flags & O_ACCMODE; |
| if (accmode == O_RDWR) { |
| fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n"); |
| errno = ENOTSUP; |
| return NULL; |
| } |
| |
| if ((flags & O_CREAT) && (flags & O_EXCL)) { |
| fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); |
| } |
| |
| hdfsFile hdfsFileHandle = (hdfsFile) calloc(1, sizeof(struct hdfsFile_internal)); |
| if (!hdfsFileHandle) { |
| return NULL; |
| } |
| int ret = 0; |
| hdfsFileHandle->flags = flags; |
| hdfsFileHandle->type = accmode == O_RDONLY ? INPUT : OUTPUT; |
| hdfsFileHandle->offset = 0; |
| struct webhdfsFileHandle *webhandle = (struct webhdfsFileHandle *) calloc(1, sizeof(struct webhdfsFileHandle)); |
| if (!webhandle) { |
| ret = -1; |
| goto done; |
| } |
| webhandle->bufferSize = bufferSize; |
| webhandle->replication = replication; |
| webhandle->blockSize = blockSize; |
| webhandle->absPath = getAbsolutePath(fs, path); |
| if (!webhandle->absPath) { |
| ret = -1; |
| goto done; |
| } |
| hdfsFileHandle->file = webhandle; |
| |
| //for write/append, need to connect to the namenode |
| //and get the url of corresponding datanode |
| if (hdfsFileHandle->type == OUTPUT) { |
| webhandle->uploadBuffer = initWebHdfsBuffer(); |
| if (!webhandle->uploadBuffer) { |
| ret = -1; |
| goto done; |
| } |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| char *url = NULL; |
| Response resp = NULL; |
| int append = flags & O_APPEND; |
| int create = append ? 0 : 1; |
| |
| //if create: send create request to NN |
| if (create) { |
| url = prepareNnWRITE(bld->nn, bld->port, webhandle->absPath, bld->userName, webhandle->replication, webhandle->blockSize); |
| } else if (append) { |
| url = prepareNnAPPEND(bld->nn, bld->port, webhandle->absPath, bld->userName); |
| } |
| if (!url) { |
| fprintf(stderr, |
| "fail to create the url connecting to namenode for file creation/appending\n"); |
| ret = -1; |
| goto done; |
| } |
| |
| if (create) { |
| resp = launchNnWRITE(url); |
| } else if (append) { |
| resp = launchNnAPPEND(url); |
| } |
| if (!resp) { |
| fprintf(stderr, |
| "fail to get the response from namenode for file creation/appending\n"); |
| free(url); |
| ret = -1; |
| goto done; |
| } |
| |
| int parseRet = 0; |
| if (create) { |
| parseRet = parseNnWRITE(resp->header->content, resp->body->content); |
| } else if (append) { |
| parseRet = parseNnAPPEND(resp->header->content, resp->body->content); |
| } |
| if (!parseRet) { |
| fprintf(stderr, |
| "fail to parse the response from namenode for file creation/appending\n"); |
| free(url); |
| freeResponse(resp); |
| ret = -1; |
| goto done; |
| } |
| |
| free(url); |
| url = parseDnLoc(resp->header->content); |
| if (!url) { |
| fprintf(stderr, |
| "fail to get the datanode url from namenode for file creation/appending\n"); |
| freeResponse(resp); |
| ret = -1; |
| return NULL; |
| } |
| freeResponse(resp); |
| //store the datanode url in the file handle |
| webhandle->datanode = strdup(url); |
| |
| //create a new thread for performing the http transferring |
| threadData *data = (threadData *) calloc(1, sizeof(threadData)); |
| if (!data) { |
| ret = -1; |
| goto done; |
| } |
| data->url = strdup(url); |
| data->flags = flags; |
| data->uploadBuffer = webhandle->uploadBuffer; |
| free(url); |
| ret = pthread_create(&webhandle->connThread, NULL, writeThreadOperation, data); |
| if (ret) { |
| fprintf(stderr, "Failed to create the writing thread.\n"); |
| } else { |
| webhandle->uploadBuffer->openFlag = 1; |
| } |
| } |
| |
| done: |
| if (ret == 0) { |
| return hdfsFileHandle; |
| } else { |
| freeWebFileHandle(webhandle); |
| free(hdfsFileHandle); |
| return NULL; |
| } |
| } |
| |
| tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) |
| { |
| if (length == 0) { |
| return 0; |
| } |
| if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) { |
| return -1; |
| } |
| |
| struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file; |
| if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) { |
| resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length); |
| return length; |
| } else { |
| fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath); |
| return -1; |
| } |
| } |
| |
| int hdfsCloseFile(hdfsFS fs, hdfsFile file) |
| { |
| int ret = 0; |
| fprintf(stderr, "to close file...\n"); |
| if (file->type == OUTPUT) { |
| void *respv; |
| threadData *tdata; |
| struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file; |
| pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex)); |
| wfile->uploadBuffer->closeFlag = 1; |
| pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close); |
| pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex)); |
| |
| //waiting for the writing thread to terminate |
| ret = pthread_join(wfile->connThread, &respv); |
| if (ret) { |
| fprintf(stderr, "Error (code %d) when pthread_join.\n", ret); |
| } |
| //parse the response |
| tdata = (threadData *) respv; |
| if (!tdata) { |
| fprintf(stderr, "Response from the writing thread is NULL.\n"); |
| ret = -1; |
| } |
| if (file->flags & O_APPEND) { |
| parseDnAPPEND(tdata->resp->header->content, tdata->resp->body->content); |
| } else { |
| parseDnWRITE(tdata->resp->header->content, tdata->resp->body->content); |
| } |
| //free the threaddata |
| freeThreadData(tdata); |
| } |
| |
| fprintf(stderr, "To clean the webfilehandle...\n"); |
| if (file) { |
| freeWebFileHandle(file->file); |
| free(file); |
| file = NULL; |
| fprintf(stderr, "Cleaned the webfilehandle...\n"); |
| } |
| return ret; |
| } |
| |
| int hdfsFileIsOpenForRead(hdfsFile file) |
| { |
| return (file->type == INPUT); |
| } |
| |
| int hdfsFileIsOpenForWrite(hdfsFile file) |
| { |
| return (file->type == OUTPUT); |
| } |
| |
| tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) |
| { |
| if (length == 0) { |
| return 0; |
| } |
| if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || length < 0) { |
| errno = EINVAL; |
| return -1; |
| } |
| struct hdfsBuilder *bld = (struct hdfsBuilder *) fs; |
| struct webhdfsFileHandle *webFile = (struct webhdfsFileHandle *) file->file; |
| char *url = NULL; |
| Response resp = NULL; |
| int openResult = -1; |
| |
| resp = (Response) calloc(1, sizeof(*resp)); |
| if (!resp) { |
| return -1; |
| } |
| resp->header = initResponseBuffer(); |
| resp->body = initResponseBuffer(); |
| resp->body->content = buffer; |
| resp->body->remaining = length; |
| |
| if (!((url = prepareOPEN(bld->nn, bld->port, webFile->absPath, bld->userName, file->offset, length)) |
| && (resp = launchOPEN(url, resp)) |
| && ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) { |
| free(url); |
| freeResponseBuffer(resp->header); |
| if (openResult == 0) { |
| return 0; |
| } else { |
| return -1; |
| } |
| } |
| |
| size_t readSize = resp->body->offset; |
| file->offset += readSize; |
| |
| freeResponseBuffer(resp->header); |
| free(resp->body); |
| free(resp); |
| free(url); |
| return readSize; |
| } |
| |
| int hdfsAvailable(hdfsFS fs, hdfsFile file) |
| { |
| if (!file || !fs) { |
| return -1; |
| } |
| struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file; |
| if (!wf) { |
| return -1; |
| } |
| hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath); |
| if (fileInfo) { |
| int available = (int)(fileInfo->mSize - file->offset); |
| hdfsFreeFileInfo(fileInfo, 1); |
| return available; |
| } else { |
| return -1; |
| } |
| } |
| |
| int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) |
| { |
| if (!fs || !file || desiredPos < 0) { |
| return -1; |
| } |
| struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file; |
| if (!wf) { |
| return -1; |
| } |
| hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath); |
| int ret = 0; |
| if (fileInfo) { |
| if (fileInfo->mSize < desiredPos) { |
| errno = ENOTSUP; |
| fprintf(stderr, |
| "hdfsSeek for %s failed since the desired position %lld is beyond the size of the file %lld\n", |
| wf->absPath, desiredPos, fileInfo->mSize); |
| ret = -1; |
| } else { |
| file->offset = desiredPos; |
| } |
| hdfsFreeFileInfo(fileInfo, 1); |
| return ret; |
| } else { |
| return -1; |
| } |
| } |
| |
| tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) |
| { |
| if (!fs || !file || file->type != INPUT || position < 0 || !buffer || length < 0) { |
| return -1; |
| } |
| file->offset = position; |
| return hdfsRead(fs, file, buffer, length); |
| } |
| |
| tOffset hdfsTell(hdfsFS fs, hdfsFile file) |
| { |
| if (!file) { |
| return -1; |
| } |
| return file->offset; |
| } |
| |
| char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) |
| { |
| if (fs == NULL || buffer == NULL || bufferSize <= 0) { |
| return NULL; |
| } |
| |
| struct hdfsBuilder * bld = (struct hdfsBuilder *) fs; |
| if (bld->workingDir) { |
| strncpy(buffer, bld->workingDir, bufferSize); |
| } |
| return buffer; |
| } |
| |
| int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) |
| { |
| if (fs == NULL || path == NULL) { |
| return -1; |
| } |
| |
| struct hdfsBuilder * bld = (struct hdfsBuilder *) fs; |
| free(bld->workingDir); |
| bld->workingDir = (char *)malloc(strlen(path) + 1); |
| if (!(bld->workingDir)) { |
| return -1; |
| } |
| strcpy(bld->workingDir, path); |
| return 0; |
| } |
| |
| void hdfsFreeHosts(char ***blockHosts) |
| { |
| int i, j; |
| for (i=0; blockHosts[i]; i++) { |
| for (j=0; blockHosts[i][j]; j++) { |
| free(blockHosts[i][j]); |
| } |
| free(blockHosts[i]); |
| } |
| free(blockHosts); |
| } |
| |
| /* not useful for libwebhdfs */ |
| int hdfsFileUsesDirectRead(hdfsFile file) |
| { |
| /* return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); */ |
| fprintf(stderr, "hdfsFileUsesDirectRead is no longer useful for libwebhdfs.\n"); |
| return -1; |
| } |
| |
| /* not useful for libwebhdfs */ |
| void hdfsFileDisableDirectRead(hdfsFile file) |
| { |
| /* file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; */ |
| fprintf(stderr, "hdfsFileDisableDirectRead is no longer useful for libwebhdfs.\n"); |
| } |
| |
| /* not useful for libwebhdfs */ |
| int hdfsHFlush(hdfsFS fs, hdfsFile file) |
| { |
| return 0; |
| } |
| |
| /* not useful for libwebhdfs */ |
| int hdfsFlush(hdfsFS fs, hdfsFile file) |
| { |
| return 0; |
| } |
| |
| char*** hdfsGetHosts(hdfsFS fs, const char* path, |
| tOffset start, tOffset length) |
| { |
| fprintf(stderr, "hdfsGetHosts is not but will be supported by libwebhdfs yet.\n"); |
| return NULL; |
| } |
| |
| tOffset hdfsGetCapacity(hdfsFS fs) |
| { |
| fprintf(stderr, "hdfsGetCapacity is not but will be supported by libwebhdfs.\n"); |
| return -1; |
| } |
| |
| tOffset hdfsGetUsed(hdfsFS fs) |
| { |
| fprintf(stderr, "hdfsGetUsed is not but will be supported by libwebhdfs yet.\n"); |
| return -1; |
| } |
| |