| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "exception.h" |
| #include "hdfs.h" |
| #include "jni_helper.h" |
| #include "platform.h" |
| |
| #include <fcntl.h> |
| #include <inttypes.h> |
| #include <stdio.h> |
| #include <string.h> |
| |
| /* Some frequently used Java paths */ |
| #define HADOOP_CONF "org/apache/hadoop/conf/Configuration" |
| #define HADOOP_PATH "org/apache/hadoop/fs/Path" |
| #define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem" |
| #define HADOOP_FS "org/apache/hadoop/fs/FileSystem" |
| #define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus" |
| #define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation" |
| #define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem" |
| #define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream" |
| #define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream" |
| #define HADOOP_STAT "org/apache/hadoop/fs/FileStatus" |
| #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission" |
| #define JAVA_NET_ISA "java/net/InetSocketAddress" |
| #define JAVA_NET_URI "java/net/URI" |
| #define JAVA_STRING "java/lang/String" |
| #define READ_OPTION "org/apache/hadoop/fs/ReadOption" |
| |
| #define JAVA_VOID "V" |
| |
| /* Macros for constructing method signatures */ |
| #define JPARAM(X) "L" X ";" |
| #define JARRPARAM(X) "[L" X ";" |
| #define JMETHOD1(X, R) "(" X ")" R |
| #define JMETHOD2(X, Y, R) "(" X Y ")" R |
| #define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R |
| |
| #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path" |
| |
| // Bit fields for hdfsFile_internal flags |
| #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) |
| |
| tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); |
| static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo); |
| |
| /** |
| * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream . |
| */ |
| enum hdfsStreamType |
| { |
| HDFS_STREAM_UNINITIALIZED = 0, |
| HDFS_STREAM_INPUT = 1, |
| HDFS_STREAM_OUTPUT = 2, |
| }; |
| |
| /** |
| * The 'file-handle' to a file in hdfs. |
| */ |
| struct hdfsFile_internal { |
| void* file; |
| enum hdfsStreamType type; |
| int flags; |
| }; |
| |
| int hdfsFileIsOpenForRead(hdfsFile file) |
| { |
| return (file->type == HDFS_STREAM_INPUT); |
| } |
| |
| int hdfsFileGetReadStatistics(hdfsFile file, |
| struct hdfsReadStatistics **stats) |
| { |
| jthrowable jthr; |
| jobject readStats = NULL; |
| jvalue jVal; |
| struct hdfsReadStatistics *s = NULL; |
| int ret; |
| JNIEnv* env = getJNIEnv(); |
| |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| if (file->type != HDFS_STREAM_INPUT) { |
| ret = EINVAL; |
| goto done; |
| } |
| jthr = invokeMethod(env, &jVal, INSTANCE, file->file, |
| "org/apache/hadoop/hdfs/client/HdfsDataInputStream", |
| "getReadStatistics", |
| "()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsFileGetReadStatistics: getReadStatistics failed"); |
| goto done; |
| } |
| readStats = jVal.l; |
| s = malloc(sizeof(struct hdfsReadStatistics)); |
| if (!s) { |
| ret = ENOMEM; |
| goto done; |
| } |
| jthr = invokeMethod(env, &jVal, INSTANCE, readStats, |
| "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", |
| "getTotalBytesRead", "()J"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsFileGetReadStatistics: getTotalBytesRead failed"); |
| goto done; |
| } |
| s->totalBytesRead = jVal.j; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, readStats, |
| "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", |
| "getTotalLocalBytesRead", "()J"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsFileGetReadStatistics: getTotalLocalBytesRead failed"); |
| goto done; |
| } |
| s->totalLocalBytesRead = jVal.j; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, readStats, |
| "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", |
| "getTotalShortCircuitBytesRead", "()J"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed"); |
| goto done; |
| } |
| s->totalShortCircuitBytesRead = jVal.j; |
| jthr = invokeMethod(env, &jVal, INSTANCE, readStats, |
| "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", |
| "getTotalZeroCopyBytesRead", "()J"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed"); |
| goto done; |
| } |
| s->totalZeroCopyBytesRead = jVal.j; |
| *stats = s; |
| s = NULL; |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, readStats); |
| free(s); |
| if (ret) { |
| errno = ret; |
| return -1; |
| } |
| return 0; |
| } |
| |
| int64_t hdfsReadStatisticsGetRemoteBytesRead( |
| const struct hdfsReadStatistics *stats) |
| { |
| return stats->totalBytesRead - stats->totalLocalBytesRead; |
| } |
| |
| void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) |
| { |
| free(stats); |
| } |
| |
| int hdfsFileIsOpenForWrite(hdfsFile file) |
| { |
| return (file->type == HDFS_STREAM_OUTPUT); |
| } |
| |
| int hdfsFileUsesDirectRead(hdfsFile file) |
| { |
| return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); |
| } |
| |
| void hdfsFileDisableDirectRead(hdfsFile file) |
| { |
| file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; |
| } |
| |
| int hdfsDisableDomainSocketSecurity(void) |
| { |
| jthrowable jthr; |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| jthr = invokeMethod(env, NULL, STATIC, NULL, |
| "org/apache/hadoop/net/unix/DomainSocket", |
| "disableBindPathValidation", "()V"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "DomainSocket#disableBindPathValidation"); |
| return -1; |
| } |
| return 0; |
| } |
| |
| /** |
| * hdfsJniEnv: A wrapper struct to be used as 'value' |
| * while saving thread -> JNIEnv* mappings |
| */ |
| typedef struct |
| { |
| JNIEnv* env; |
| } hdfsJniEnv; |
| |
| /** |
| * Helper function to create a org.apache.hadoop.fs.Path object. |
| * @param env: The JNIEnv pointer. |
| * @param path: The file-path for which to construct org.apache.hadoop.fs.Path |
| * object. |
| * @return Returns a jobject on success and NULL on error. |
| */ |
| static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path, |
| jobject *out) |
| { |
| jthrowable jthr; |
| jstring jPathString; |
| jobject jPath; |
| |
| //Construct a java.lang.String object |
| jthr = newJavaStr(env, path, &jPathString); |
| if (jthr) |
| return jthr; |
| //Construct the org.apache.hadoop.fs.Path object |
| jthr = constructNewObjectOfClass(env, &jPath, "org/apache/hadoop/fs/Path", |
| "(Ljava/lang/String;)V", jPathString); |
| destroyLocalReference(env, jPathString); |
| if (jthr) |
| return jthr; |
| *out = jPath; |
| return NULL; |
| } |
| |
| static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration, |
| const char *key, char **val) |
| { |
| jthrowable jthr; |
| jvalue jVal; |
| jstring jkey = NULL, jRet = NULL; |
| |
| jthr = newJavaStr(env, key, &jkey); |
| if (jthr) |
| goto done; |
| jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, |
| HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING), |
| JPARAM(JAVA_STRING)), jkey); |
| if (jthr) |
| goto done; |
| jRet = jVal.l; |
| jthr = newCStr(env, jRet, val); |
| done: |
| destroyLocalReference(env, jkey); |
| destroyLocalReference(env, jRet); |
| return jthr; |
| } |
| |
| int hdfsConfGetStr(const char *key, char **val) |
| { |
| JNIEnv *env; |
| int ret; |
| jthrowable jthr; |
| jobject jConfiguration = NULL; |
| |
| env = getJNIEnv(); |
| if (env == NULL) { |
| ret = EINTERNAL; |
| goto done; |
| } |
| jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsConfGetStr(%s): new Configuration", key); |
| goto done; |
| } |
| jthr = hadoopConfGetStr(env, jConfiguration, key, val); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsConfGetStr(%s): hadoopConfGetStr", key); |
| goto done; |
| } |
| ret = 0; |
| done: |
| destroyLocalReference(env, jConfiguration); |
| if (ret) |
| errno = ret; |
| return ret; |
| } |
| |
| void hdfsConfStrFree(char *val) |
| { |
| free(val); |
| } |
| |
| static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration, |
| const char *key, int32_t *val) |
| { |
| jthrowable jthr = NULL; |
| jvalue jVal; |
| jstring jkey = NULL; |
| |
| jthr = newJavaStr(env, key, &jkey); |
| if (jthr) |
| return jthr; |
| jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, |
| HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"), |
| jkey, (jint)(*val)); |
| destroyLocalReference(env, jkey); |
| if (jthr) |
| return jthr; |
| *val = jVal.i; |
| return NULL; |
| } |
| |
| int hdfsConfGetInt(const char *key, int32_t *val) |
| { |
| JNIEnv *env; |
| int ret; |
| jobject jConfiguration = NULL; |
| jthrowable jthr; |
| |
| env = getJNIEnv(); |
| if (env == NULL) { |
| ret = EINTERNAL; |
| goto done; |
| } |
| jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsConfGetInt(%s): new Configuration", key); |
| goto done; |
| } |
| jthr = hadoopConfGetInt(env, jConfiguration, key, val); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsConfGetInt(%s): hadoopConfGetInt", key); |
| goto done; |
| } |
| ret = 0; |
| done: |
| destroyLocalReference(env, jConfiguration); |
| if (ret) |
| errno = ret; |
| return ret; |
| } |
| |
| struct hdfsBuilderConfOpt { |
| struct hdfsBuilderConfOpt *next; |
| const char *key; |
| const char *val; |
| }; |
| |
| struct hdfsBuilder { |
| int forceNewInstance; |
| const char *nn; |
| tPort port; |
| const char *kerbTicketCachePath; |
| const char *userName; |
| struct hdfsBuilderConfOpt *opts; |
| }; |
| |
| struct hdfsBuilder *hdfsNewBuilder(void) |
| { |
| struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder)); |
| if (!bld) { |
| errno = ENOMEM; |
| return NULL; |
| } |
| return bld; |
| } |
| |
| int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, |
| const char *val) |
| { |
| struct hdfsBuilderConfOpt *opt, *next; |
| |
| opt = calloc(1, sizeof(struct hdfsBuilderConfOpt)); |
| if (!opt) |
| return -ENOMEM; |
| next = bld->opts; |
| bld->opts = opt; |
| opt->next = next; |
| opt->key = key; |
| opt->val = val; |
| return 0; |
| } |
| |
| void hdfsFreeBuilder(struct hdfsBuilder *bld) |
| { |
| struct hdfsBuilderConfOpt *cur, *next; |
| |
| cur = bld->opts; |
| for (cur = bld->opts; cur; ) { |
| next = cur->next; |
| free(cur); |
| cur = next; |
| } |
| free(bld); |
| } |
| |
| void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) |
| { |
| bld->forceNewInstance = 1; |
| } |
| |
| void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) |
| { |
| bld->nn = nn; |
| } |
| |
| void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) |
| { |
| bld->port = port; |
| } |
| |
| void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) |
| { |
| bld->userName = userName; |
| } |
| |
| void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, |
| const char *kerbTicketCachePath) |
| { |
| bld->kerbTicketCachePath = kerbTicketCachePath; |
| } |
| |
| hdfsFS hdfsConnect(const char *host, tPort port) |
| { |
| struct hdfsBuilder *bld = hdfsNewBuilder(); |
| if (!bld) |
| return NULL; |
| hdfsBuilderSetNameNode(bld, host); |
| hdfsBuilderSetNameNodePort(bld, port); |
| return hdfsBuilderConnect(bld); |
| } |
| |
| /** Always return a new FileSystem handle */ |
| hdfsFS hdfsConnectNewInstance(const char *host, tPort port) |
| { |
| struct hdfsBuilder *bld = hdfsNewBuilder(); |
| if (!bld) |
| return NULL; |
| hdfsBuilderSetNameNode(bld, host); |
| hdfsBuilderSetNameNodePort(bld, port); |
| hdfsBuilderSetForceNewInstance(bld); |
| return hdfsBuilderConnect(bld); |
| } |
| |
| hdfsFS hdfsConnectAsUser(const char *host, tPort port, const char *user) |
| { |
| struct hdfsBuilder *bld = hdfsNewBuilder(); |
| if (!bld) |
| return NULL; |
| hdfsBuilderSetNameNode(bld, host); |
| hdfsBuilderSetNameNodePort(bld, port); |
| hdfsBuilderSetUserName(bld, user); |
| return hdfsBuilderConnect(bld); |
| } |
| |
| /** Always return a new FileSystem handle */ |
| hdfsFS hdfsConnectAsUserNewInstance(const char *host, tPort port, |
| const char *user) |
| { |
| struct hdfsBuilder *bld = hdfsNewBuilder(); |
| if (!bld) |
| return NULL; |
| hdfsBuilderSetNameNode(bld, host); |
| hdfsBuilderSetNameNodePort(bld, port); |
| hdfsBuilderSetForceNewInstance(bld); |
| hdfsBuilderSetUserName(bld, user); |
| return hdfsBuilderConnect(bld); |
| } |
| |
| |
| /** |
| * Calculate the effective URI to use, given a builder configuration. |
| * |
| * If there is not already a URI scheme, we prepend 'hdfs://'. |
| * |
| * If there is not already a port specified, and a port was given to the |
| * builder, we suffix that port. If there is a port specified but also one in |
| * the URI, that is an error. |
| * |
| * @param bld The hdfs builder object |
| * @param uri (out param) dynamically allocated string representing the |
| * effective URI |
| * |
| * @return 0 on success; error code otherwise |
| */ |
| static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri) |
| { |
| const char *scheme; |
| char suffix[64]; |
| const char *lastColon; |
| char *u; |
| size_t uriLen; |
| |
| if (!bld->nn) |
| return EINVAL; |
| scheme = (strstr(bld->nn, "://")) ? "" : "hdfs://"; |
| if (bld->port == 0) { |
| suffix[0] = '\0'; |
| } else { |
| lastColon = strrchr(bld->nn, ':'); |
| if (lastColon && (strspn(lastColon + 1, "0123456789") == |
| strlen(lastColon + 1))) { |
| fprintf(stderr, "port %d was given, but URI '%s' already " |
| "contains a port!\n", bld->port, bld->nn); |
| return EINVAL; |
| } |
| snprintf(suffix, sizeof(suffix), ":%d", bld->port); |
| } |
| |
| uriLen = strlen(scheme) + strlen(bld->nn) + strlen(suffix); |
| u = malloc((uriLen + 1) * (sizeof(char))); |
| if (!u) { |
| fprintf(stderr, "calcEffectiveURI: out of memory"); |
| return ENOMEM; |
| } |
| snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn, suffix); |
| *uri = u; |
| return 0; |
| } |
| |
| static const char *maybeNull(const char *str) |
| { |
| return str ? str : "(NULL)"; |
| } |
| |
| static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, |
| char *buf, size_t bufLen) |
| { |
| snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, " |
| "kerbTicketCachePath=%s, userName=%s", |
| bld->forceNewInstance, maybeNull(bld->nn), bld->port, |
| maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName)); |
| return buf; |
| } |
| |
| hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) |
| { |
| JNIEnv *env = 0; |
| jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL; |
| jstring jURIString = NULL, jUserString = NULL; |
| jvalue jVal; |
| jthrowable jthr = NULL; |
| char *cURI = 0, buf[512]; |
| int ret; |
| jobject jRet = NULL; |
| struct hdfsBuilderConfOpt *opt; |
| |
| //Get the JNIEnv* corresponding to current thread |
| env = getJNIEnv(); |
| if (env == NULL) { |
| ret = EINTERNAL; |
| goto done; |
| } |
| |
| // jConfiguration = new Configuration(); |
| jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| // set configuration values |
| for (opt = bld->opts; opt; opt = opt->next) { |
| jthr = hadoopConfSetStr(env, jConfiguration, opt->key, opt->val); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s): error setting conf '%s' to '%s'", |
| hdfsBuilderToStr(bld, buf, sizeof(buf)), opt->key, opt->val); |
| goto done; |
| } |
| } |
| |
| //Check what type of FileSystem the caller wants... |
| if (bld->nn == NULL) { |
| // Get a local filesystem. |
| if (bld->forceNewInstance) { |
| // fs = FileSytem#newInstanceLocal(conf); |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, |
| "newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF), |
| JPARAM(HADOOP_LOCALFS)), jConfiguration); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| jFS = jVal.l; |
| } else { |
| // fs = FileSytem#getLocal(conf); |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal", |
| JMETHOD1(JPARAM(HADOOP_CONF), |
| JPARAM(HADOOP_LOCALFS)), |
| jConfiguration); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| jFS = jVal.l; |
| } |
| } else { |
| if (!strcmp(bld->nn, "default")) { |
| // jURI = FileSystem.getDefaultUri(conf) |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, |
| "getDefaultUri", |
| "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;", |
| jConfiguration); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| jURI = jVal.l; |
| } else { |
| // fs = FileSystem#get(URI, conf, ugi); |
| ret = calcEffectiveURI(bld, &cURI); |
| if (ret) |
| goto done; |
| jthr = newJavaStr(env, cURI, &jURIString); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI, |
| "create", "(Ljava/lang/String;)Ljava/net/URI;", |
| jURIString); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| jURI = jVal.l; |
| } |
| |
| if (bld->kerbTicketCachePath) { |
| jthr = hadoopConfSetStr(env, jConfiguration, |
| KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| } |
| jthr = newJavaStr(env, bld->userName, &jUserString); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| if (bld->forceNewInstance) { |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, |
| "newInstance", JMETHOD3(JPARAM(JAVA_NET_URI), |
| JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), |
| JPARAM(HADOOP_FS)), |
| jURI, jConfiguration, jUserString); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| jFS = jVal.l; |
| } else { |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get", |
| JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF), |
| JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), |
| jURI, jConfiguration, jUserString); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| jFS = jVal.l; |
| } |
| } |
| jRet = (*env)->NewGlobalRef(env, jFS); |
| if (!jRet) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsBuilderConnect(%s)", |
| hdfsBuilderToStr(bld, buf, sizeof(buf))); |
| goto done; |
| } |
| ret = 0; |
| |
| done: |
| // Release unnecessary local references |
| destroyLocalReference(env, jConfiguration); |
| destroyLocalReference(env, jFS); |
| destroyLocalReference(env, jURI); |
| destroyLocalReference(env, jCachePath); |
| destroyLocalReference(env, jURIString); |
| destroyLocalReference(env, jUserString); |
| free(cURI); |
| hdfsFreeBuilder(bld); |
| |
| if (ret) { |
| errno = ret; |
| return NULL; |
| } |
| return (hdfsFS)jRet; |
| } |
| |
| int hdfsDisconnect(hdfsFS fs) |
| { |
| // JAVA EQUIVALENT: |
| // fs.close() |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| int ret; |
| jobject jFS; |
| jthrowable jthr; |
| |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Parameters |
| jFS = (jobject)fs; |
| |
| //Sanity check |
| if (fs == NULL) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, |
| "close", "()V"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsDisconnect: FileSystem#close"); |
| } else { |
| ret = 0; |
| } |
| (*env)->DeleteGlobalRef(env, jFS); |
| if (ret) { |
| errno = ret; |
| return -1; |
| } |
| return 0; |
| } |
| |
| /** |
| * Get the default block size of a FileSystem object. |
| * |
| * @param env The Java env |
| * @param jFS The FileSystem object |
| * @param jPath The path to find the default blocksize at |
| * @param out (out param) the default block size |
| * |
| * @return NULL on success; or the exception |
| */ |
| static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS, |
| jobject jPath, jlong *out) |
| { |
| jthrowable jthr; |
| jvalue jVal; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "getDefaultBlockSize", JMETHOD1(JPARAM(HADOOP_PATH), "J"), jPath); |
| if (jthr) |
| return jthr; |
| *out = jVal.j; |
| return NULL; |
| } |
| |
| hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, |
| int bufferSize, short replication, tSize blockSize) |
| { |
| /* |
| JAVA EQUIVALENT: |
| File f = new File(path); |
| FSData{Input|Output}Stream f{is|os} = fs.create(f); |
| return f{is|os}; |
| */ |
| int accmode = flags & O_ACCMODE; |
| jstring jStrBufferSize = NULL, jStrReplication = NULL; |
| jobject jConfiguration = NULL, jPath = NULL, jFile = NULL; |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| jvalue jVal; |
| hdfsFile file = NULL; |
| int ret; |
| jint jBufferSize = bufferSize; |
| jshort jReplication = replication; |
| |
| /* The hadoop java api/signature */ |
| const char *method = NULL; |
| const char *signature = NULL; |
| |
| /* Get the JNIEnv* corresponding to current thread */ |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return NULL; |
| } |
| |
| |
| if (accmode == O_RDONLY || accmode == O_WRONLY) { |
| /* yay */ |
| } else if (accmode == O_RDWR) { |
| fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n"); |
| errno = ENOTSUP; |
| return NULL; |
| } else { |
| fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode); |
| errno = EINVAL; |
| return NULL; |
| } |
| |
| if ((flags & O_CREAT) && (flags & O_EXCL)) { |
| fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); |
| } |
| |
| if (accmode == O_RDONLY) { |
| method = "open"; |
| signature = JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_ISTRM)); |
| } else if (flags & O_APPEND) { |
| method = "append"; |
| signature = JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_OSTRM)); |
| } else { |
| method = "create"; |
| signature = JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_OSTRM)); |
| } |
| |
| /* Create an object of org.apache.hadoop.fs.Path */ |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsOpenFile(%s): constructNewObjectOfPath", path); |
| goto done; |
| } |
| |
| /* Get the Configuration object from the FileSystem object */ |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "getConf", JMETHOD1("", JPARAM(HADOOP_CONF))); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsOpenFile(%s): FileSystem#getConf", path); |
| goto done; |
| } |
| jConfiguration = jVal.l; |
| |
| jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size"); |
| if (!jStrBufferSize) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM"); |
| goto done; |
| } |
| jStrReplication = (*env)->NewStringUTF(env, "dfs.replication"); |
| if (!jStrReplication) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM"); |
| goto done; |
| } |
| |
| if (!bufferSize) { |
| jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, |
| HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", |
| jStrBufferSize, 4096); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND | |
| NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_UNRESOLVED_LINK, |
| "hdfsOpenFile(%s): Configuration#getInt(io.file.buffer.size)", |
| path); |
| goto done; |
| } |
| jBufferSize = jVal.i; |
| } |
| |
| if ((accmode == O_WRONLY) && (flags & O_APPEND) == 0) { |
| if (!replication) { |
| jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, |
| HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", |
| jStrReplication, 1); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsOpenFile(%s): Configuration#getInt(dfs.replication)", |
| path); |
| goto done; |
| } |
| jReplication = (jshort)jVal.i; |
| } |
| } |
| |
| /* Create and return either the FSDataInputStream or |
| FSDataOutputStream references jobject jStream */ |
| |
| // READ? |
| if (accmode == O_RDONLY) { |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| method, signature, jPath, jBufferSize); |
| } else if ((accmode == O_WRONLY) && (flags & O_APPEND)) { |
| // WRITE/APPEND? |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| method, signature, jPath); |
| } else { |
| // WRITE/CREATE |
| jboolean jOverWrite = 1; |
| jlong jBlockSize = blockSize; |
| |
| if (jBlockSize == 0) { |
| jthr = getDefaultBlockSize(env, jFS, jPath, &jBlockSize); |
| if (jthr) { |
| ret = EIO; |
| goto done; |
| } |
| } |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| method, signature, jPath, jOverWrite, |
| jBufferSize, jReplication, jBlockSize); |
| } |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsOpenFile(%s): FileSystem#%s(%s)", path, method, signature); |
| goto done; |
| } |
| jFile = jVal.l; |
| |
| file = calloc(1, sizeof(struct hdfsFile_internal)); |
| if (!file) { |
| fprintf(stderr, "hdfsOpenFile(%s): OOM create hdfsFile\n", path); |
| ret = ENOMEM; |
| goto done; |
| } |
| file->file = (*env)->NewGlobalRef(env, jFile); |
| if (!file->file) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsOpenFile(%s): NewGlobalRef", path); |
| goto done; |
| } |
| file->type = (((flags & O_WRONLY) == 0) ? HDFS_STREAM_INPUT : |
| HDFS_STREAM_OUTPUT); |
| file->flags = 0; |
| |
| if ((flags & O_WRONLY) == 0) { |
| // Try a test read to see if we can do direct reads |
| char buf; |
| if (readDirect(fs, file, &buf, 0) == 0) { |
| // Success - 0-byte read should return 0 |
| file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; |
| } else if (errno != ENOTSUP) { |
| // Unexpected error. Clear it, don't set the direct flag. |
| fprintf(stderr, |
| "hdfsOpenFile(%s): WARN: Unexpected error %d when testing " |
| "for direct read compatibility\n", path, errno); |
| } |
| } |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, jStrBufferSize); |
| destroyLocalReference(env, jStrReplication); |
| destroyLocalReference(env, jConfiguration); |
| destroyLocalReference(env, jPath); |
| destroyLocalReference(env, jFile); |
| if (ret) { |
| if (file) { |
| if (file->file) { |
| (*env)->DeleteGlobalRef(env, file->file); |
| } |
| free(file); |
| } |
| errno = ret; |
| return NULL; |
| } |
| return file; |
| } |
| |
| int hdfsCloseFile(hdfsFS fs, hdfsFile file) |
| { |
| int ret; |
| // JAVA EQUIVALENT: |
| // file.close |
| |
| //The interface whose 'close' method to be called |
| const char *interface; |
| const char *interfaceShortName; |
| |
| //Caught exception |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!file || file->type == HDFS_STREAM_UNINITIALIZED) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| interface = (file->type == HDFS_STREAM_INPUT) ? |
| HADOOP_ISTRM : HADOOP_OSTRM; |
| |
| jthr = invokeMethod(env, NULL, INSTANCE, file->file, interface, |
| "close", "()V"); |
| if (jthr) { |
| interfaceShortName = (file->type == HDFS_STREAM_INPUT) ? |
| "FSDataInputStream" : "FSDataOutputStream"; |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "%s#close", interfaceShortName); |
| } else { |
| ret = 0; |
| } |
| |
| //De-allocate memory |
| (*env)->DeleteGlobalRef(env, file->file); |
| free(file); |
| |
| if (ret) { |
| errno = ret; |
| return -1; |
| } |
| return 0; |
| } |
| |
| int hdfsExists(hdfsFS fs, const char *path) |
| { |
| JNIEnv *env = getJNIEnv(); |
| jobject jPath; |
| jvalue jVal; |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| if (path == NULL) { |
| errno = EINVAL; |
| return -1; |
| } |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsExists: constructNewObjectOfPath"); |
| return -1; |
| } |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath); |
| destroyLocalReference(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsExists: invokeMethod(%s)", |
| JMETHOD1(JPARAM(HADOOP_PATH), "Z")); |
| return -1; |
| } |
| if (jVal.z) { |
| return 0; |
| } else { |
| errno = ENOENT; |
| return -1; |
| } |
| } |
| |
| // Checks input file for readiness for reading. |
| static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f, |
| jobject* jInputStream) |
| { |
| *jInputStream = (jobject)(f ? f->file : NULL); |
| |
| //Sanity check |
| if (!f || f->type == HDFS_STREAM_UNINITIALIZED) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| //Error checking... make sure that this file is 'readable' |
| if (f->type != HDFS_STREAM_INPUT) { |
| fprintf(stderr, "Cannot read from a non-InputStream object!\n"); |
| errno = EINVAL; |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) |
| { |
| jobject jInputStream; |
| jbyteArray jbRarray; |
| jint noReadBytes = length; |
| jvalue jVal; |
| jthrowable jthr; |
| JNIEnv* env; |
| |
| if (length == 0) { |
| return 0; |
| } else if (length < 0) { |
| errno = EINVAL; |
| return -1; |
| } |
| if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { |
| return readDirect(fs, f, buffer, length); |
| } |
| |
| // JAVA EQUIVALENT: |
| // byte [] bR = new byte[length]; |
| // fis.read(bR); |
| |
| //Get the JNIEnv* corresponding to current thread |
| env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Parameters |
| if (readPrepare(env, fs, f, &jInputStream) == -1) { |
| return -1; |
| } |
| |
| //Read the requisite bytes |
| jbRarray = (*env)->NewByteArray(env, length); |
| if (!jbRarray) { |
| errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsRead: NewByteArray"); |
| return -1; |
| } |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, HADOOP_ISTRM, |
| "read", "([B)I", jbRarray); |
| if (jthr) { |
| destroyLocalReference(env, jbRarray); |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsRead: FSDataInputStream#read"); |
| return -1; |
| } |
| if (jVal.i < 0) { |
| // EOF |
| destroyLocalReference(env, jbRarray); |
| return 0; |
| } else if (jVal.i == 0) { |
| destroyLocalReference(env, jbRarray); |
| errno = EINTR; |
| return -1; |
| } |
| (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); |
| destroyLocalReference(env, jbRarray); |
| if ((*env)->ExceptionCheck(env)) { |
| errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsRead: GetByteArrayRegion"); |
| return -1; |
| } |
| return jVal.i; |
| } |
| |
| // Reads using the read(ByteBuffer) API, which does fewer copies |
| tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) |
| { |
| // JAVA EQUIVALENT: |
| // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer |
| // fis.read(bbuffer); |
| |
| jobject jInputStream; |
| jvalue jVal; |
| jthrowable jthr; |
| jobject bb; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| if (readPrepare(env, fs, f, &jInputStream) == -1) { |
| return -1; |
| } |
| |
| //Read the requisite bytes |
| bb = (*env)->NewDirectByteBuffer(env, buffer, length); |
| if (bb == NULL) { |
| errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "readDirect: NewDirectByteBuffer"); |
| return -1; |
| } |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, |
| HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb); |
| destroyLocalReference(env, bb); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "readDirect: FSDataInputStream#read"); |
| return -1; |
| } |
| return (jVal.i < 0) ? 0 : jVal.i; |
| } |
| |
| tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, |
| void* buffer, tSize length) |
| { |
| JNIEnv* env; |
| jbyteArray jbRarray; |
| jvalue jVal; |
| jthrowable jthr; |
| |
| if (length == 0) { |
| return 0; |
| } else if (length < 0) { |
| errno = EINVAL; |
| return -1; |
| } |
| if (!f || f->type == HDFS_STREAM_UNINITIALIZED) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Error checking... make sure that this file is 'readable' |
| if (f->type != HDFS_STREAM_INPUT) { |
| fprintf(stderr, "Cannot read from a non-InputStream object!\n"); |
| errno = EINVAL; |
| return -1; |
| } |
| |
| // JAVA EQUIVALENT: |
| // byte [] bR = new byte[length]; |
| // fis.read(pos, bR, 0, length); |
| jbRarray = (*env)->NewByteArray(env, length); |
| if (!jbRarray) { |
| errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsPread: NewByteArray"); |
| return -1; |
| } |
| jthr = invokeMethod(env, &jVal, INSTANCE, f->file, HADOOP_ISTRM, |
| "read", "(J[BII)I", position, jbRarray, 0, length); |
| if (jthr) { |
| destroyLocalReference(env, jbRarray); |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsPread: FSDataInputStream#read"); |
| return -1; |
| } |
| if (jVal.i < 0) { |
| // EOF |
| destroyLocalReference(env, jbRarray); |
| return 0; |
| } else if (jVal.i == 0) { |
| destroyLocalReference(env, jbRarray); |
| errno = EINTR; |
| return -1; |
| } |
| (*env)->GetByteArrayRegion(env, jbRarray, 0, jVal.i, buffer); |
| destroyLocalReference(env, jbRarray); |
| if ((*env)->ExceptionCheck(env)) { |
| errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsPread: GetByteArrayRegion"); |
| return -1; |
| } |
| return jVal.i; |
| } |
| |
| tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length) |
| { |
| // JAVA EQUIVALENT |
| // byte b[] = str.getBytes(); |
| // fso.write(b); |
| |
| jobject jOutputStream; |
| jbyteArray jbWarray; |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!f || f->type == HDFS_STREAM_UNINITIALIZED) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| jOutputStream = f->file; |
| |
| if (length < 0) { |
| errno = EINVAL; |
| return -1; |
| } |
| |
| //Error checking... make sure that this file is 'writable' |
| if (f->type != HDFS_STREAM_OUTPUT) { |
| fprintf(stderr, "Cannot write into a non-OutputStream object!\n"); |
| errno = EINVAL; |
| return -1; |
| } |
| |
| if (length < 0) { |
| errno = EINVAL; |
| return -1; |
| } |
| if (length == 0) { |
| return 0; |
| } |
| //Write the requisite bytes into the file |
| jbWarray = (*env)->NewByteArray(env, length); |
| if (!jbWarray) { |
| errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsWrite: NewByteArray"); |
| return -1; |
| } |
| (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer); |
| if ((*env)->ExceptionCheck(env)) { |
| destroyLocalReference(env, jbWarray); |
| errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsWrite(length = %d): SetByteArrayRegion", length); |
| return -1; |
| } |
| jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream, |
| HADOOP_OSTRM, "write", "([B)V", jbWarray); |
| destroyLocalReference(env, jbWarray); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsWrite: FSDataOutputStream#write"); |
| return -1; |
| } |
| // Unlike most Java streams, FSDataOutputStream never does partial writes. |
| // If we succeeded, all the data was written. |
| return length; |
| } |
| |
| int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos) |
| { |
| // JAVA EQUIVALENT |
| // fis.seek(pos); |
| |
| jobject jInputStream; |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!f || f->type != HDFS_STREAM_INPUT) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| jInputStream = f->file; |
| jthr = invokeMethod(env, NULL, INSTANCE, jInputStream, |
| HADOOP_ISTRM, "seek", "(J)V", desiredPos); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsSeek(desiredPos=%" PRId64 ")" |
| ": FSDataInputStream#seek", desiredPos); |
| return -1; |
| } |
| return 0; |
| } |
| |
| |
| |
| tOffset hdfsTell(hdfsFS fs, hdfsFile f) |
| { |
| // JAVA EQUIVALENT |
| // pos = f.getPos(); |
| |
| jobject jStream; |
| const char *interface; |
| jvalue jVal; |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!f || f->type == HDFS_STREAM_UNINITIALIZED) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| //Parameters |
| jStream = f->file; |
| interface = (f->type == HDFS_STREAM_INPUT) ? |
| HADOOP_ISTRM : HADOOP_OSTRM; |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStream, |
| interface, "getPos", "()J"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsTell: %s#getPos", |
| ((f->type == HDFS_STREAM_INPUT) ? "FSDataInputStream" : |
| "FSDataOutputStream")); |
| return -1; |
| } |
| return jVal.j; |
| } |
| |
| int hdfsFlush(hdfsFS fs, hdfsFile f) |
| { |
| // JAVA EQUIVALENT |
| // fos.flush(); |
| |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!f || f->type != HDFS_STREAM_OUTPUT) { |
| errno = EBADF; |
| return -1; |
| } |
| jthr = invokeMethod(env, NULL, INSTANCE, f->file, |
| HADOOP_OSTRM, "flush", "()V"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsFlush: FSDataInputStream#flush"); |
| return -1; |
| } |
| return 0; |
| } |
| |
| int hdfsHFlush(hdfsFS fs, hdfsFile f) |
| { |
| jobject jOutputStream; |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!f || f->type != HDFS_STREAM_OUTPUT) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| jOutputStream = f->file; |
| jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream, |
| HADOOP_OSTRM, "hflush", "()V"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsHFlush: FSDataOutputStream#hflush"); |
| return -1; |
| } |
| return 0; |
| } |
| |
| int hdfsHSync(hdfsFS fs, hdfsFile f) |
| { |
| jobject jOutputStream; |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!f || f->type != HDFS_STREAM_OUTPUT) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| jOutputStream = f->file; |
| jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream, |
| HADOOP_OSTRM, "hsync", "()V"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsHSync: FSDataOutputStream#hsync"); |
| return -1; |
| } |
| return 0; |
| } |
| |
| int hdfsAvailable(hdfsFS fs, hdfsFile f) |
| { |
| // JAVA EQUIVALENT |
| // fis.available(); |
| |
| jobject jInputStream; |
| jvalue jVal; |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Sanity check |
| if (!f || f->type != HDFS_STREAM_INPUT) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| //Parameters |
| jInputStream = f->file; |
| jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, |
| HADOOP_ISTRM, "available", "()I"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsAvailable: FSDataInputStream#available"); |
| return -1; |
| } |
| return jVal.i; |
| } |
| |
| static int hdfsCopyImpl(hdfsFS srcFS, const char *src, hdfsFS dstFS, |
| const char *dst, jboolean deleteSource) |
| { |
| //JAVA EQUIVALENT |
| // FileUtil#copy(srcFS, srcPath, dstFS, dstPath, |
| // deleteSource = false, conf) |
| |
| //Parameters |
| jobject jSrcFS = (jobject)srcFS; |
| jobject jDstFS = (jobject)dstFS; |
| jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL; |
| jthrowable jthr; |
| jvalue jVal; |
| int ret; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| jthr = constructNewObjectOfPath(env, src, &jSrcPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsCopyImpl(src=%s): constructNewObjectOfPath", src); |
| goto done; |
| } |
| jthr = constructNewObjectOfPath(env, dst, &jDstPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst); |
| goto done; |
| } |
| |
| //Create the org.apache.hadoop.conf.Configuration object |
| jthr = constructNewObjectOfClass(env, &jConfiguration, |
| HADOOP_CONF, "()V"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsCopyImpl: Configuration constructor"); |
| goto done; |
| } |
| |
| //FileUtil#copy |
| jthr = invokeMethod(env, &jVal, STATIC, |
| NULL, "org/apache/hadoop/fs/FileUtil", "copy", |
| "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;" |
| "Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;" |
| "ZLorg/apache/hadoop/conf/Configuration;)Z", |
| jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource, |
| jConfiguration); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): " |
| "FileUtil#copy", src, dst, deleteSource); |
| goto done; |
| } |
| if (!jVal.z) { |
| ret = EIO; |
| goto done; |
| } |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, jConfiguration); |
| destroyLocalReference(env, jSrcPath); |
| destroyLocalReference(env, jDstPath); |
| |
| if (ret) { |
| errno = ret; |
| return -1; |
| } |
| return 0; |
| } |
| |
| int hdfsCopy(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst) |
| { |
| return hdfsCopyImpl(srcFS, src, dstFS, dst, 0); |
| } |
| |
| int hdfsMove(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst) |
| { |
| return hdfsCopyImpl(srcFS, src, dstFS, dst, 1); |
| } |
| |
| int hdfsDelete(hdfsFS fs, const char *path, int recursive) |
| { |
| // JAVA EQUIVALENT: |
| // Path p = new Path(path); |
| // bool retval = fs.delete(p, recursive); |
| |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| jobject jPath; |
| jvalue jVal; |
| jboolean jRecursive; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsDelete(path=%s): constructNewObjectOfPath", path); |
| return -1; |
| } |
| jRecursive = recursive ? JNI_TRUE : JNI_FALSE; |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "delete", "(Lorg/apache/hadoop/fs/Path;Z)Z", |
| jPath, jRecursive); |
| destroyLocalReference(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsDelete(path=%s, recursive=%d): " |
| "FileSystem#delete", path, recursive); |
| return -1; |
| } |
| if (!jVal.z) { |
| errno = EIO; |
| return -1; |
| } |
| return 0; |
| } |
| |
| |
| |
| int hdfsRename(hdfsFS fs, const char *oldPath, const char *newPath) |
| { |
| // JAVA EQUIVALENT: |
| // Path old = new Path(oldPath); |
| // Path new = new Path(newPath); |
| // fs.rename(old, new); |
| |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| jobject jOldPath = NULL, jNewPath = NULL; |
| int ret = -1; |
| jvalue jVal; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| jthr = constructNewObjectOfPath(env, oldPath, &jOldPath ); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsRename: constructNewObjectOfPath(%s)", oldPath); |
| goto done; |
| } |
| jthr = constructNewObjectOfPath(env, newPath, &jNewPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsRename: constructNewObjectOfPath(%s)", newPath); |
| goto done; |
| } |
| |
| // Rename the file |
| // TODO: use rename2 here? (See HDFS-3592) |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "rename", |
| JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_PATH), "Z"), |
| jOldPath, jNewPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsRename(oldPath=%s, newPath=%s): FileSystem#rename", |
| oldPath, newPath); |
| goto done; |
| } |
| if (!jVal.z) { |
| errno = EIO; |
| goto done; |
| } |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, jOldPath); |
| destroyLocalReference(env, jNewPath); |
| return ret; |
| } |
| |
| |
| |
| char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) |
| { |
| // JAVA EQUIVALENT: |
| // Path p = fs.getWorkingDirectory(); |
| // return p.toString() |
| |
| jobject jPath = NULL; |
| jstring jPathString = NULL; |
| jobject jFS = (jobject)fs; |
| jvalue jVal; |
| jthrowable jthr; |
| int ret; |
| const char *jPathChars = NULL; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return NULL; |
| } |
| |
| //FileSystem#getWorkingDirectory() |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, |
| HADOOP_FS, "getWorkingDirectory", |
| "()Lorg/apache/hadoop/fs/Path;"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetWorkingDirectory: FileSystem#getWorkingDirectory"); |
| goto done; |
| } |
| jPath = jVal.l; |
| if (!jPath) { |
| fprintf(stderr, "hdfsGetWorkingDirectory: " |
| "FileSystem#getWorkingDirectory returned NULL"); |
| ret = -EIO; |
| goto done; |
| } |
| |
| //Path#toString() |
| jthr = invokeMethod(env, &jVal, INSTANCE, jPath, |
| "org/apache/hadoop/fs/Path", "toString", |
| "()Ljava/lang/String;"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetWorkingDirectory: Path#toString"); |
| goto done; |
| } |
| jPathString = jVal.l; |
| jPathChars = (*env)->GetStringUTFChars(env, jPathString, NULL); |
| if (!jPathChars) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsGetWorkingDirectory: GetStringUTFChars"); |
| goto done; |
| } |
| |
| //Copy to user-provided buffer |
| ret = snprintf(buffer, bufferSize, "%s", jPathChars); |
| if (ret >= bufferSize) { |
| ret = ENAMETOOLONG; |
| goto done; |
| } |
| ret = 0; |
| |
| done: |
| if (jPathChars) { |
| (*env)->ReleaseStringUTFChars(env, jPathString, jPathChars); |
| } |
| destroyLocalReference(env, jPath); |
| destroyLocalReference(env, jPathString); |
| |
| if (ret) { |
| errno = ret; |
| return NULL; |
| } |
| return buffer; |
| } |
| |
| |
| |
| int hdfsSetWorkingDirectory(hdfsFS fs, const char *path) |
| { |
| // JAVA EQUIVALENT: |
| // fs.setWorkingDirectory(Path(path)); |
| |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| jobject jPath; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsSetWorkingDirectory(%s): constructNewObjectOfPath", |
| path); |
| return -1; |
| } |
| |
| //FileSystem#setWorkingDirectory() |
| jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, |
| "setWorkingDirectory", |
| "(Lorg/apache/hadoop/fs/Path;)V", jPath); |
| destroyLocalReference(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ILLEGAL_ARGUMENT, |
| "hdfsSetWorkingDirectory(%s): FileSystem#setWorkingDirectory", |
| path); |
| return -1; |
| } |
| return 0; |
| } |
| |
| |
| |
| int hdfsCreateDirectory(hdfsFS fs, const char *path) |
| { |
| // JAVA EQUIVALENT: |
| // fs.mkdirs(new Path(path)); |
| |
| jobject jFS = (jobject)fs; |
| jobject jPath; |
| jthrowable jthr; |
| jvalue jVal; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsCreateDirectory(%s): constructNewObjectOfPath", path); |
| return -1; |
| } |
| |
| //Create the directory |
| jVal.z = 0; |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z", |
| jPath); |
| destroyLocalReference(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, |
| NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | |
| NOPRINT_EXC_UNRESOLVED_LINK | NOPRINT_EXC_PARENT_NOT_DIRECTORY, |
| "hdfsCreateDirectory(%s): FileSystem#mkdirs", path); |
| return -1; |
| } |
| if (!jVal.z) { |
| // It's unclear under exactly which conditions FileSystem#mkdirs |
| // is supposed to return false (as opposed to throwing an exception.) |
| // It seems like the current code never actually returns false. |
| // So we're going to translate this to EIO, since there seems to be |
| // nothing more specific we can do with it. |
| errno = EIO; |
| return -1; |
| } |
| return 0; |
| } |
| |
| |
| int hdfsSetReplication(hdfsFS fs, const char *path, int16_t replication) |
| { |
| // JAVA EQUIVALENT: |
| // fs.setReplication(new Path(path), replication); |
| |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| jobject jPath; |
| jvalue jVal; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsSetReplication(path=%s): constructNewObjectOfPath", path); |
| return -1; |
| } |
| |
| //Create the directory |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z", |
| jPath, replication); |
| destroyLocalReference(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsSetReplication(path=%s, replication=%d): " |
| "FileSystem#setReplication", path, replication); |
| return -1; |
| } |
| if (!jVal.z) { |
| // setReplication returns false "if file does not exist or is a |
| // directory." So the nearest translation to that is ENOENT. |
| errno = ENOENT; |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| int hdfsChown(hdfsFS fs, const char *path, const char *owner, const char *group) |
| { |
| // JAVA EQUIVALENT: |
| // fs.setOwner(path, owner, group) |
| |
| jobject jFS = (jobject)fs; |
| jobject jPath = NULL; |
| jstring jOwner = NULL, jGroup = NULL; |
| jthrowable jthr; |
| int ret; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| if (owner == NULL && group == NULL) { |
| return 0; |
| } |
| |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsChown(path=%s): constructNewObjectOfPath", path); |
| goto done; |
| } |
| |
| jthr = newJavaStr(env, owner, &jOwner); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsChown(path=%s): newJavaStr(%s)", path, owner); |
| goto done; |
| } |
| jthr = newJavaStr(env, group, &jGroup); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsChown(path=%s): newJavaStr(%s)", path, group); |
| goto done; |
| } |
| |
| //Create the directory |
| jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, |
| "setOwner", JMETHOD3(JPARAM(HADOOP_PATH), |
| JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID), |
| jPath, jOwner, jGroup); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, |
| NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | |
| NOPRINT_EXC_UNRESOLVED_LINK, |
| "hdfsChown(path=%s, owner=%s, group=%s): " |
| "FileSystem#setOwner", path, owner, group); |
| goto done; |
| } |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, jPath); |
| destroyLocalReference(env, jOwner); |
| destroyLocalReference(env, jGroup); |
| |
| if (ret) { |
| errno = ret; |
| return -1; |
| } |
| return 0; |
| } |
| |
| int hdfsChmod(hdfsFS fs, const char *path, short mode) |
| { |
| int ret; |
| // JAVA EQUIVALENT: |
| // fs.setPermission(path, FsPermission) |
| |
| jthrowable jthr; |
| jobject jPath = NULL, jPermObj = NULL; |
| jobject jFS = (jobject)fs; |
| jshort jmode = mode; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| // construct jPerm = FsPermission.createImmutable(short mode); |
| jthr = constructNewObjectOfClass(env, &jPermObj, |
| HADOOP_FSPERM,"(S)V",jmode); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "constructNewObjectOfClass(%s)", HADOOP_FSPERM); |
| return -1; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsChmod(%s): constructNewObjectOfPath", path); |
| goto done; |
| } |
| |
| //Create the directory |
| jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, |
| "setPermission", |
| JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSPERM), JAVA_VOID), |
| jPath, jPermObj); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, |
| NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | |
| NOPRINT_EXC_UNRESOLVED_LINK, |
| "hdfsChmod(%s): FileSystem#setPermission", path); |
| goto done; |
| } |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, jPath); |
| destroyLocalReference(env, jPermObj); |
| |
| if (ret) { |
| errno = ret; |
| return -1; |
| } |
| return 0; |
| } |
| |
| int hdfsUtime(hdfsFS fs, const char *path, tTime mtime, tTime atime) |
| { |
| // JAVA EQUIVALENT: |
| // fs.setTimes(src, mtime, atime) |
| |
| jthrowable jthr; |
| jobject jFS = (jobject)fs; |
| jobject jPath; |
| static const tTime NO_CHANGE = -1; |
| jlong jmtime, jatime; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsUtime(path=%s): constructNewObjectOfPath", path); |
| return -1; |
| } |
| |
| jmtime = (mtime == NO_CHANGE) ? -1 : (mtime * (jlong)1000); |
| jatime = (atime == NO_CHANGE) ? -1 : (atime * (jlong)1000); |
| |
| jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, |
| "setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J", JAVA_VOID), |
| jPath, jmtime, jatime); |
| destroyLocalReference(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, |
| NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | |
| NOPRINT_EXC_UNRESOLVED_LINK, |
| "hdfsUtime(path=%s): FileSystem#setTimes", path); |
| return -1; |
| } |
| return 0; |
| } |
| |
| /** |
| * Zero-copy options. |
| * |
| * We cache the EnumSet of ReadOptions which has to be passed into every |
| * readZero call, to avoid reconstructing it each time. This cache is cleared |
| * whenever an element changes. |
| */ |
| struct hadoopRzOptions |
| { |
| JNIEnv *env; |
| int skipChecksums; |
| jobject byteBufferPool; |
| jobject cachedEnumSet; |
| }; |
| |
| struct hadoopRzOptions *hadoopRzOptionsAlloc(void) |
| { |
| struct hadoopRzOptions *opts; |
| JNIEnv *env; |
| |
| env = getJNIEnv(); |
| if (!env) { |
| // Check to make sure the JNI environment is set up properly. |
| errno = EINTERNAL; |
| return NULL; |
| } |
| opts = calloc(1, sizeof(struct hadoopRzOptions)); |
| if (!opts) { |
| errno = ENOMEM; |
| return NULL; |
| } |
| return opts; |
| } |
| |
| static void hadoopRzOptionsClearCached(JNIEnv *env, |
| struct hadoopRzOptions *opts) |
| { |
| if (!opts->cachedEnumSet) { |
| return; |
| } |
| (*env)->DeleteGlobalRef(env, opts->cachedEnumSet); |
| opts->cachedEnumSet = NULL; |
| } |
| |
| int hadoopRzOptionsSetSkipChecksum( |
| struct hadoopRzOptions *opts, int skip) |
| { |
| JNIEnv *env; |
| env = getJNIEnv(); |
| if (!env) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| hadoopRzOptionsClearCached(env, opts); |
| opts->skipChecksums = !!skip; |
| return 0; |
| } |
| |
| int hadoopRzOptionsSetByteBufferPool( |
| struct hadoopRzOptions *opts, const char *className) |
| { |
| JNIEnv *env; |
| jthrowable jthr; |
| jobject byteBufferPool = NULL; |
| |
| env = getJNIEnv(); |
| if (!env) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| if (className) { |
| // Note: we don't have to call hadoopRzOptionsClearCached in this |
| // function, since the ByteBufferPool is passed separately from the |
| // EnumSet of ReadOptions. |
| |
| jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V"); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopRzOptionsSetByteBufferPool(className=%s): ", className); |
| errno = EINVAL; |
| return -1; |
| } |
| } |
| if (opts->byteBufferPool) { |
| // Delete any previous ByteBufferPool we had. |
| (*env)->DeleteGlobalRef(env, opts->byteBufferPool); |
| } |
| opts->byteBufferPool = byteBufferPool; |
| return 0; |
| } |
| |
| void hadoopRzOptionsFree(struct hadoopRzOptions *opts) |
| { |
| JNIEnv *env; |
| env = getJNIEnv(); |
| if (!env) { |
| return; |
| } |
| hadoopRzOptionsClearCached(env, opts); |
| if (opts->byteBufferPool) { |
| (*env)->DeleteGlobalRef(env, opts->byteBufferPool); |
| opts->byteBufferPool = NULL; |
| } |
| free(opts); |
| } |
| |
| struct hadoopRzBuffer |
| { |
| jobject byteBuffer; |
| uint8_t *ptr; |
| int32_t length; |
| int direct; |
| }; |
| |
| static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env, |
| struct hadoopRzOptions *opts, jobject *enumSet) |
| { |
| jthrowable jthr = NULL; |
| jobject enumInst = NULL, enumSetObj = NULL; |
| jvalue jVal; |
| |
| if (opts->cachedEnumSet) { |
| // If we cached the value, return it now. |
| *enumSet = opts->cachedEnumSet; |
| goto done; |
| } |
| if (opts->skipChecksums) { |
| jthr = fetchEnumInstance(env, READ_OPTION, |
| "SKIP_CHECKSUMS", &enumInst); |
| if (jthr) { |
| goto done; |
| } |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, |
| "java/util/EnumSet", "of", |
| "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst); |
| if (jthr) { |
| goto done; |
| } |
| enumSetObj = jVal.l; |
| } else { |
| jclass clazz = (*env)->FindClass(env, READ_OPTION); |
| if (!clazz) { |
| jthr = newRuntimeError(env, "failed " |
| "to find class for %s", READ_OPTION); |
| goto done; |
| } |
| jthr = invokeMethod(env, &jVal, STATIC, NULL, |
| "java/util/EnumSet", "noneOf", |
| "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz); |
| enumSetObj = jVal.l; |
| } |
| // create global ref |
| opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj); |
| if (!opts->cachedEnumSet) { |
| jthr = getPendingExceptionAndClear(env); |
| goto done; |
| } |
| *enumSet = opts->cachedEnumSet; |
| jthr = NULL; |
| done: |
| (*env)->DeleteLocalRef(env, enumInst); |
| (*env)->DeleteLocalRef(env, enumSetObj); |
| return jthr; |
| } |
| |
| static int hadoopReadZeroExtractBuffer(JNIEnv *env, |
| const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer) |
| { |
| int ret; |
| jthrowable jthr; |
| jvalue jVal; |
| uint8_t *directStart; |
| void *mallocBuf = NULL; |
| jint position; |
| jarray array = NULL; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer, |
| "java/nio/ByteBuffer", "remaining", "()I"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: "); |
| goto done; |
| } |
| buffer->length = jVal.i; |
| jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer, |
| "java/nio/ByteBuffer", "position", "()I"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: "); |
| goto done; |
| } |
| position = jVal.i; |
| directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer); |
| if (directStart) { |
| // Handle direct buffers. |
| buffer->ptr = directStart + position; |
| buffer->direct = 1; |
| ret = 0; |
| goto done; |
| } |
| // Handle indirect buffers. |
| // The JNI docs don't say that GetDirectBufferAddress throws any exceptions |
| // when it fails. However, they also don't clearly say that it doesn't. It |
| // seems safest to clear any pending exceptions here, to prevent problems on |
| // various JVMs. |
| (*env)->ExceptionClear(env); |
| if (!opts->byteBufferPool) { |
| fputs("hadoopReadZeroExtractBuffer: we read through the " |
| "zero-copy path, but failed to get the address of the buffer via " |
| "GetDirectBufferAddress. Please make sure your JVM supports " |
| "GetDirectBufferAddress.\n", stderr); |
| ret = ENOTSUP; |
| goto done; |
| } |
| // Get the backing array object of this buffer. |
| jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer, |
| "java/nio/ByteBuffer", "array", "()[B"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: "); |
| goto done; |
| } |
| array = jVal.l; |
| if (!array) { |
| fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.", |
| stderr); |
| ret = EIO; |
| goto done; |
| } |
| mallocBuf = malloc(buffer->length); |
| if (!mallocBuf) { |
| fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n", |
| buffer->length); |
| ret = ENOMEM; |
| goto done; |
| } |
| (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf); |
| jthr = (*env)->ExceptionOccurred(env); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: "); |
| goto done; |
| } |
| buffer->ptr = mallocBuf; |
| buffer->direct = 0; |
| ret = 0; |
| |
| done: |
| free(mallocBuf); |
| (*env)->DeleteLocalRef(env, array); |
| return ret; |
| } |
| |
| static int translateZCRException(JNIEnv *env, jthrowable exc) |
| { |
| int ret; |
| char *className = NULL; |
| jthrowable jthr = classNameOfObject(exc, env, &className); |
| |
| if (jthr) { |
| fputs("hadoopReadZero: failed to get class name of " |
| "exception from read().\n", stderr); |
| destroyLocalReference(env, exc); |
| destroyLocalReference(env, jthr); |
| ret = EIO; |
| goto done; |
| } |
| if (!strcmp(className, "java.lang.UnsupportedOperationException")) { |
| ret = EPROTONOSUPPORT; |
| goto done; |
| } |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopZeroCopyRead: ZeroCopyCursor#read failed"); |
| done: |
| free(className); |
| return ret; |
| } |
| |
| struct hadoopRzBuffer* hadoopReadZero(hdfsFile file, |
| struct hadoopRzOptions *opts, int32_t maxLength) |
| { |
| JNIEnv *env; |
| jthrowable jthr = NULL; |
| jvalue jVal; |
| jobject enumSet = NULL, byteBuffer = NULL; |
| struct hadoopRzBuffer* buffer = NULL; |
| int ret; |
| |
| env = getJNIEnv(); |
| if (!env) { |
| errno = EINTERNAL; |
| return NULL; |
| } |
| if (file->type != HDFS_STREAM_INPUT) { |
| fputs("Cannot read from a non-InputStream object!\n", stderr); |
| ret = EINVAL; |
| goto done; |
| } |
| buffer = calloc(1, sizeof(struct hadoopRzBuffer)); |
| if (!buffer) { |
| ret = ENOMEM; |
| goto done; |
| } |
| jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: "); |
| goto done; |
| } |
| jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read", |
| "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)" |
| "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet); |
| if (jthr) { |
| ret = translateZCRException(env, jthr); |
| goto done; |
| } |
| byteBuffer = jVal.l; |
| if (!byteBuffer) { |
| buffer->byteBuffer = NULL; |
| buffer->length = 0; |
| buffer->ptr = NULL; |
| } else { |
| buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer); |
| if (!buffer->byteBuffer) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hadoopReadZero: failed to create global ref to ByteBuffer"); |
| goto done; |
| } |
| ret = hadoopReadZeroExtractBuffer(env, opts, buffer); |
| if (ret) { |
| goto done; |
| } |
| } |
| ret = 0; |
| done: |
| (*env)->DeleteLocalRef(env, byteBuffer); |
| if (ret) { |
| if (buffer) { |
| if (buffer->byteBuffer) { |
| (*env)->DeleteGlobalRef(env, buffer->byteBuffer); |
| } |
| free(buffer); |
| } |
| errno = ret; |
| return NULL; |
| } else { |
| errno = 0; |
| } |
| return buffer; |
| } |
| |
| int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer) |
| { |
| return buffer->length; |
| } |
| |
| const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer) |
| { |
| return buffer->ptr; |
| } |
| |
| void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer) |
| { |
| jvalue jVal; |
| jthrowable jthr; |
| JNIEnv* env; |
| |
| env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return; |
| } |
| if (buffer->byteBuffer) { |
| jthr = invokeMethod(env, &jVal, INSTANCE, file->file, |
| HADOOP_ISTRM, "releaseBuffer", |
| "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hadoopRzBufferFree: releaseBuffer failed: "); |
| // even on error, we have to delete the reference. |
| } |
| (*env)->DeleteGlobalRef(env, buffer->byteBuffer); |
| } |
| if (!buffer->direct) { |
| free(buffer->ptr); |
| } |
| memset(buffer, 0, sizeof(*buffer)); |
| free(buffer); |
| } |
| |
| char*** |
| hdfsGetHosts(hdfsFS fs, const char *path, tOffset start, tOffset length) |
| { |
| // JAVA EQUIVALENT: |
| // fs.getFileBlockLoctions(new Path(path), start, length); |
| |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| jobject jPath = NULL; |
| jobject jFileStatus = NULL; |
| jvalue jFSVal, jVal; |
| jobjectArray jBlockLocations = NULL, jFileBlockHosts = NULL; |
| jstring jHost = NULL; |
| char*** blockHosts = NULL; |
| int i, j, ret; |
| jsize jNumFileBlocks = 0; |
| jobject jFileBlock; |
| jsize jNumBlockHosts; |
| const char *hostName; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return NULL; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetHosts(path=%s): constructNewObjectOfPath", path); |
| goto done; |
| } |
| jthr = invokeMethod(env, &jFSVal, INSTANCE, jFS, |
| HADOOP_FS, "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)" |
| "Lorg/apache/hadoop/fs/FileStatus;", jPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND, |
| "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" |
| "FileSystem#getFileStatus", path, start, length); |
| destroyLocalReference(env, jPath); |
| goto done; |
| } |
| jFileStatus = jFSVal.l; |
| |
| //org.apache.hadoop.fs.FileSystem#getFileBlockLocations |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, |
| HADOOP_FS, "getFileBlockLocations", |
| "(Lorg/apache/hadoop/fs/FileStatus;JJ)" |
| "[Lorg/apache/hadoop/fs/BlockLocation;", |
| jFileStatus, start, length); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" |
| "FileSystem#getFileBlockLocations", path, start, length); |
| goto done; |
| } |
| jBlockLocations = jVal.l; |
| |
| //Figure out no of entries in jBlockLocations |
| //Allocate memory and add NULL at the end |
| jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations); |
| |
| blockHosts = calloc(jNumFileBlocks + 1, sizeof(char**)); |
| if (blockHosts == NULL) { |
| ret = ENOMEM; |
| goto done; |
| } |
| if (jNumFileBlocks == 0) { |
| ret = 0; |
| goto done; |
| } |
| |
| //Now parse each block to get hostnames |
| for (i = 0; i < jNumFileBlocks; ++i) { |
| jFileBlock = |
| (*env)->GetObjectArrayElement(env, jBlockLocations, i); |
| if (!jFileBlock) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" |
| "GetObjectArrayElement(%d)", path, start, length, i); |
| goto done; |
| } |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFileBlock, HADOOP_BLK_LOC, |
| "getHosts", "()[Ljava/lang/String;"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" |
| "BlockLocation#getHosts", path, start, length); |
| goto done; |
| } |
| jFileBlockHosts = jVal.l; |
| if (!jFileBlockHosts) { |
| fprintf(stderr, |
| "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" |
| "BlockLocation#getHosts returned NULL", path, start, length); |
| ret = EINTERNAL; |
| goto done; |
| } |
| //Figure out no of hosts in jFileBlockHosts, and allocate the memory |
| jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts); |
| blockHosts[i] = calloc(jNumBlockHosts + 1, sizeof(char*)); |
| if (!blockHosts[i]) { |
| ret = ENOMEM; |
| goto done; |
| } |
| |
| //Now parse each hostname |
| for (j = 0; j < jNumBlockHosts; ++j) { |
| jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j); |
| if (!jHost) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"): " |
| "NewByteArray", path, start, length); |
| goto done; |
| } |
| hostName = |
| (const char*)((*env)->GetStringUTFChars(env, jHost, NULL)); |
| if (!hostName) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64", " |
| "j=%d out of %d): GetStringUTFChars", |
| path, start, length, j, jNumBlockHosts); |
| goto done; |
| } |
| blockHosts[i][j] = strdup(hostName); |
| (*env)->ReleaseStringUTFChars(env, jHost, hostName); |
| if (!blockHosts[i][j]) { |
| ret = ENOMEM; |
| goto done; |
| } |
| destroyLocalReference(env, jHost); |
| jHost = NULL; |
| } |
| |
| destroyLocalReference(env, jFileBlockHosts); |
| jFileBlockHosts = NULL; |
| } |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, jPath); |
| destroyLocalReference(env, jFileStatus); |
| destroyLocalReference(env, jBlockLocations); |
| destroyLocalReference(env, jFileBlockHosts); |
| destroyLocalReference(env, jHost); |
| if (ret) { |
| if (blockHosts) { |
| hdfsFreeHosts(blockHosts); |
| } |
| return NULL; |
| } |
| |
| return blockHosts; |
| } |
| |
| |
| void hdfsFreeHosts(char ***blockHosts) |
| { |
| int i, j; |
| for (i=0; blockHosts[i]; i++) { |
| for (j=0; blockHosts[i][j]; j++) { |
| free(blockHosts[i][j]); |
| } |
| free(blockHosts[i]); |
| } |
| free(blockHosts); |
| } |
| |
| |
| tOffset hdfsGetDefaultBlockSize(hdfsFS fs) |
| { |
| // JAVA EQUIVALENT: |
| // fs.getDefaultBlockSize(); |
| |
| jobject jFS = (jobject)fs; |
| jvalue jVal; |
| jthrowable jthr; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //FileSystem#getDefaultBlockSize() |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "getDefaultBlockSize", "()J"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize"); |
| return -1; |
| } |
| return jVal.j; |
| } |
| |
| |
| tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) |
| { |
| // JAVA EQUIVALENT: |
| // fs.getDefaultBlockSize(path); |
| |
| jthrowable jthr; |
| jobject jFS = (jobject)fs; |
| jobject jPath; |
| tOffset blockSize; |
| JNIEnv* env = getJNIEnv(); |
| |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath", |
| path); |
| return -1; |
| } |
| jthr = getDefaultBlockSize(env, jFS, jPath, &blockSize); |
| (*env)->DeleteLocalRef(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetDefaultBlockSize(path=%s): " |
| "FileSystem#getDefaultBlockSize", path); |
| return -1; |
| } |
| return blockSize; |
| } |
| |
| |
| tOffset hdfsGetCapacity(hdfsFS fs) |
| { |
| // JAVA EQUIVALENT: |
| // FsStatus fss = fs.getStatus(); |
| // return Fss.getCapacity(); |
| |
| jobject jFS = (jobject)fs; |
| jvalue jVal; |
| jthrowable jthr; |
| jobject fss; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //FileSystem#getStatus |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetCapacity: FileSystem#getStatus"); |
| return -1; |
| } |
| fss = (jobject)jVal.l; |
| jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS, |
| "getCapacity", "()J"); |
| destroyLocalReference(env, fss); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetCapacity: FsStatus#getCapacity"); |
| return -1; |
| } |
| return jVal.j; |
| } |
| |
| |
| |
| tOffset hdfsGetUsed(hdfsFS fs) |
| { |
| // JAVA EQUIVALENT: |
| // FsStatus fss = fs.getStatus(); |
| // return Fss.getUsed(); |
| |
| jobject jFS = (jobject)fs; |
| jvalue jVal; |
| jthrowable jthr; |
| jobject fss; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return -1; |
| } |
| |
| //FileSystem#getStatus |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;"); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetUsed: FileSystem#getStatus"); |
| return -1; |
| } |
| fss = (jobject)jVal.l; |
| jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS, |
| "getUsed", "()J"); |
| destroyLocalReference(env, fss); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetUsed: FsStatus#getUsed"); |
| return -1; |
| } |
| return jVal.j; |
| } |
| |
| |
| |
| static jthrowable |
| getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo) |
| { |
| jvalue jVal; |
| jthrowable jthr; |
| jobject jPath = NULL; |
| jstring jPathName = NULL; |
| jstring jUserName = NULL; |
| jstring jGroupName = NULL; |
| jobject jPermission = NULL; |
| const char *cPathName; |
| const char *cUserName; |
| const char *cGroupName; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, |
| HADOOP_STAT, "isDir", "()Z"); |
| if (jthr) |
| goto done; |
| fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, |
| HADOOP_STAT, "getReplication", "()S"); |
| if (jthr) |
| goto done; |
| fileInfo->mReplication = jVal.s; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, |
| HADOOP_STAT, "getBlockSize", "()J"); |
| if (jthr) |
| goto done; |
| fileInfo->mBlockSize = jVal.j; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, |
| HADOOP_STAT, "getModificationTime", "()J"); |
| if (jthr) |
| goto done; |
| fileInfo->mLastMod = jVal.j / 1000; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, |
| HADOOP_STAT, "getAccessTime", "()J"); |
| if (jthr) |
| goto done; |
| fileInfo->mLastAccess = (tTime) (jVal.j / 1000); |
| |
| if (fileInfo->mKind == kObjectKindFile) { |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, |
| HADOOP_STAT, "getLen", "()J"); |
| if (jthr) |
| goto done; |
| fileInfo->mSize = jVal.j; |
| } |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, |
| "getPath", "()Lorg/apache/hadoop/fs/Path;"); |
| if (jthr) |
| goto done; |
| jPath = jVal.l; |
| if (jPath == NULL) { |
| jthr = newRuntimeError(env, "org.apache.hadoop.fs.FileStatus#" |
| "getPath returned NULL!"); |
| goto done; |
| } |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jPath, HADOOP_PATH, |
| "toString", "()Ljava/lang/String;"); |
| if (jthr) |
| goto done; |
| jPathName = jVal.l; |
| cPathName = |
| (const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL)); |
| if (!cPathName) { |
| jthr = getPendingExceptionAndClear(env); |
| goto done; |
| } |
| fileInfo->mName = strdup(cPathName); |
| (*env)->ReleaseStringUTFChars(env, jPathName, cPathName); |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, |
| "getOwner", "()Ljava/lang/String;"); |
| if (jthr) |
| goto done; |
| jUserName = jVal.l; |
| cUserName = |
| (const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL)); |
| if (!cUserName) { |
| jthr = getPendingExceptionAndClear(env); |
| goto done; |
| } |
| fileInfo->mOwner = strdup(cUserName); |
| (*env)->ReleaseStringUTFChars(env, jUserName, cUserName); |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, |
| "getGroup", "()Ljava/lang/String;"); |
| if (jthr) |
| goto done; |
| jGroupName = jVal.l; |
| cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL)); |
| if (!cGroupName) { |
| jthr = getPendingExceptionAndClear(env); |
| goto done; |
| } |
| fileInfo->mGroup = strdup(cGroupName); |
| (*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName); |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, |
| "getPermission", |
| "()Lorg/apache/hadoop/fs/permission/FsPermission;"); |
| if (jthr) |
| goto done; |
| if (jVal.l == NULL) { |
| jthr = newRuntimeError(env, "%s#getPermission returned NULL!", |
| HADOOP_STAT); |
| goto done; |
| } |
| jPermission = jVal.l; |
| jthr = invokeMethod(env, &jVal, INSTANCE, jPermission, HADOOP_FSPERM, |
| "toShort", "()S"); |
| if (jthr) |
| goto done; |
| fileInfo->mPermissions = jVal.s; |
| jthr = NULL; |
| |
| done: |
| if (jthr) |
| hdfsFreeFileInfoEntry(fileInfo); |
| destroyLocalReference(env, jPath); |
| destroyLocalReference(env, jPathName); |
| destroyLocalReference(env, jUserName); |
| destroyLocalReference(env, jGroupName); |
| destroyLocalReference(env, jPermission); |
| destroyLocalReference(env, jPath); |
| return jthr; |
| } |
| |
| static jthrowable |
| getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo **fileInfo) |
| { |
| // JAVA EQUIVALENT: |
| // fs.isDirectory(f) |
| // fs.getModificationTime() |
| // fs.getAccessTime() |
| // fs.getLength(f) |
| // f.getPath() |
| // f.getOwner() |
| // f.getGroup() |
| // f.getPermission().toShort() |
| jobject jStat; |
| jvalue jVal; |
| jthrowable jthr; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, |
| "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), |
| jPath); |
| if (jthr) |
| return jthr; |
| if (jVal.z == 0) { |
| *fileInfo = NULL; |
| return NULL; |
| } |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, |
| HADOOP_FS, "getFileStatus", |
| JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_STAT)), jPath); |
| if (jthr) |
| return jthr; |
| jStat = jVal.l; |
| *fileInfo = calloc(1, sizeof(hdfsFileInfo)); |
| if (!*fileInfo) { |
| destroyLocalReference(env, jStat); |
| return newRuntimeError(env, "getFileInfo: OOM allocating hdfsFileInfo"); |
| } |
| jthr = getFileInfoFromStat(env, jStat, *fileInfo); |
| destroyLocalReference(env, jStat); |
| return jthr; |
| } |
| |
| |
| |
| hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char *path, int *numEntries) |
| { |
| // JAVA EQUIVALENT: |
| // Path p(path); |
| // Path []pathList = fs.listPaths(p) |
| // foreach path in pathList |
| // getFileInfo(path) |
| |
| jobject jFS = (jobject)fs; |
| jthrowable jthr; |
| jobject jPath = NULL; |
| hdfsFileInfo *pathList = NULL; |
| jobjectArray jPathList = NULL; |
| jvalue jVal; |
| jsize jPathListSize = 0; |
| int ret; |
| jsize i; |
| jobject tmpStat; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return NULL; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsListDirectory(%s): constructNewObjectOfPath", path); |
| goto done; |
| } |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_DFS, "listStatus", |
| JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_STAT)), |
| jPath); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, |
| NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | |
| NOPRINT_EXC_UNRESOLVED_LINK, |
| "hdfsListDirectory(%s): FileSystem#listStatus", path); |
| goto done; |
| } |
| jPathList = jVal.l; |
| |
| //Figure out the number of entries in that directory |
| jPathListSize = (*env)->GetArrayLength(env, jPathList); |
| if (jPathListSize == 0) { |
| ret = 0; |
| goto done; |
| } |
| |
| //Allocate memory |
| pathList = calloc(jPathListSize, sizeof(hdfsFileInfo)); |
| if (pathList == NULL) { |
| ret = ENOMEM; |
| goto done; |
| } |
| |
| //Save path information in pathList |
| for (i=0; i < jPathListSize; ++i) { |
| tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i); |
| if (!tmpStat) { |
| ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "hdfsListDirectory(%s): GetObjectArrayElement(%d out of %d)", |
| path, i, jPathListSize); |
| goto done; |
| } |
| jthr = getFileInfoFromStat(env, tmpStat, &pathList[i]); |
| destroyLocalReference(env, tmpStat); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsListDirectory(%s): getFileInfoFromStat(%d out of %d)", |
| path, i, jPathListSize); |
| goto done; |
| } |
| } |
| ret = 0; |
| |
| done: |
| destroyLocalReference(env, jPath); |
| destroyLocalReference(env, jPathList); |
| |
| if (ret) { |
| hdfsFreeFileInfo(pathList, jPathListSize); |
| errno = ret; |
| return NULL; |
| } |
| *numEntries = jPathListSize; |
| return pathList; |
| } |
| |
| |
| |
| hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char *path) |
| { |
| // JAVA EQUIVALENT: |
| // File f(path); |
| // fs.isDirectory(f) |
| // fs.lastModified() ?? |
| // fs.getLength(f) |
| // f.getPath() |
| |
| jobject jFS = (jobject)fs; |
| jobject jPath; |
| jthrowable jthr; |
| hdfsFileInfo *fileInfo; |
| |
| //Get the JNIEnv* corresponding to current thread |
| JNIEnv* env = getJNIEnv(); |
| if (env == NULL) { |
| errno = EINTERNAL; |
| return NULL; |
| } |
| |
| //Create an object of org.apache.hadoop.fs.Path |
| jthr = constructNewObjectOfPath(env, path, &jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "hdfsGetPathInfo(%s): constructNewObjectOfPath", path); |
| return NULL; |
| } |
| jthr = getFileInfo(env, jFS, jPath, &fileInfo); |
| destroyLocalReference(env, jPath); |
| if (jthr) { |
| errno = printExceptionAndFree(env, jthr, |
| NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | |
| NOPRINT_EXC_UNRESOLVED_LINK, |
| "hdfsGetPathInfo(%s): getFileInfo", path); |
| return NULL; |
| } |
| if (!fileInfo) { |
| errno = ENOENT; |
| return NULL; |
| } |
| return fileInfo; |
| } |
| |
| static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo) |
| { |
| free(hdfsFileInfo->mName); |
| free(hdfsFileInfo->mOwner); |
| free(hdfsFileInfo->mGroup); |
| memset(hdfsFileInfo, 0, sizeof(hdfsFileInfo)); |
| } |
| |
| void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) |
| { |
| //Free the mName, mOwner, and mGroup |
| int i; |
| for (i=0; i < numEntries; ++i) { |
| hdfsFreeFileInfoEntry(hdfsFileInfo + i); |
| } |
| |
| //Free entire block |
| free(hdfsFileInfo); |
| } |
| |
| |
| |
| |
| /** |
| * vim: ts=4: sw=4: et: |
| */ |