| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "exception.h" |
| #include "hdfs.h" |
| #include "hdfs_test.h" |
| #include "jni_helper.h" |
| #include "native_mini_dfs.h" |
| #include "platform.h" |
| |
| #include <errno.h> |
| #include <jni.h> |
| #include <limits.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/types.h> |
| #include <unistd.h> |
| |
| #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder" |
| #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster" |
| #define HADOOP_CONF "org/apache/hadoop/conf/Configuration" |
| #define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode" |
| #define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress" |
| |
| #define DFS_WEBHDFS_ENABLED_KEY "dfs.webhdfs.enabled" |
| |
| struct NativeMiniDfsCluster { |
| /** |
| * The NativeMiniDfsCluster object |
| */ |
| jobject obj; |
| |
| /** |
| * Path to the domain socket, or the empty string if there is none. |
| */ |
| char domainSocketPath[PATH_MAX]; |
| }; |
| |
| static jthrowable nmdConfigureShortCircuit(JNIEnv *env, |
| struct NativeMiniDfsCluster *cl, jobject cobj) |
| { |
| jthrowable jthr; |
| char *tmpDir; |
| |
| int ret = hdfsDisableDomainSocketSecurity(); |
| if (ret) { |
| return newRuntimeError(env, "failed to disable hdfs domain " |
| "socket security: error %d", ret); |
| } |
| jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true"); |
| if (jthr) { |
| return jthr; |
| } |
| tmpDir = getenv("TMPDIR"); |
| if (!tmpDir) { |
| tmpDir = "/tmp"; |
| } |
| snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d", |
| tmpDir, getpid(), rand()); |
| snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d", |
| tmpDir, getpid(), rand()); |
| jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path", |
| cl->domainSocketPath); |
| if (jthr) { |
| return jthr; |
| } |
| return NULL; |
| } |
| |
| struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) |
| { |
| struct NativeMiniDfsCluster* cl = NULL; |
| jobject bld = NULL, cobj = NULL, cluster = NULL; |
| jvalue val; |
| JNIEnv *env = getJNIEnv(); |
| jthrowable jthr; |
| jstring jconfStr = NULL; |
| |
| if (!env) { |
| fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n"); |
| return NULL; |
| } |
| cl = calloc(1, sizeof(struct NativeMiniDfsCluster)); |
| if (!cl) { |
| fprintf(stderr, "nmdCreate: OOM"); |
| goto error; |
| } |
| jthr = constructNewObjectOfClass(env, &cobj, HADOOP_CONF, "()V"); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: new Configuration"); |
| goto error; |
| } |
| if (conf->webhdfsEnabled) { |
| jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: new String"); |
| goto error; |
| } |
| jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF, |
| "setBoolean", "(Ljava/lang/String;Z)V", |
| jconfStr, conf->webhdfsEnabled); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: Configuration::setBoolean"); |
| goto error; |
| } |
| } |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: Configuration::setBoolean"); |
| goto error; |
| } |
| // Disable 'minimum block size' -- it's annoying in tests. |
| (*env)->DeleteLocalRef(env, jconfStr); |
| jconfStr = NULL; |
| jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: new String"); |
| goto error; |
| } |
| jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF, |
| "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: Configuration::setLong"); |
| goto error; |
| } |
| // Creae MiniDFSCluster object |
| jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER, |
| "(L"HADOOP_CONF";)V", cobj); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: NativeMiniDfsCluster#Builder#Builder"); |
| goto error; |
| } |
| if (conf->configureShortCircuit) { |
| jthr = nmdConfigureShortCircuit(env, cl, cobj); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: nmdConfigureShortCircuit error"); |
| goto error; |
| } |
| } |
| jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, |
| "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " |
| "Builder::format"); |
| goto error; |
| } |
| (*env)->DeleteLocalRef(env, val.l); |
| if (conf->webhdfsEnabled) { |
| jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, |
| "nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";", |
| conf->namenodeHttpPort); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " |
| "Builder::nameNodeHttpPort"); |
| goto error; |
| } |
| (*env)->DeleteLocalRef(env, val.l); |
| } |
| jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, |
| "build", "()L" MINIDFS_CLUSTER ";"); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdCreate: Builder#build"); |
| goto error; |
| } |
| cluster = val.l; |
| cl->obj = (*env)->NewGlobalRef(env, val.l); |
| if (!cl->obj) { |
| printPendingExceptionAndFree(env, PRINT_EXC_ALL, |
| "nmdCreate: NewGlobalRef"); |
| goto error; |
| } |
| (*env)->DeleteLocalRef(env, cluster); |
| (*env)->DeleteLocalRef(env, bld); |
| (*env)->DeleteLocalRef(env, cobj); |
| (*env)->DeleteLocalRef(env, jconfStr); |
| return cl; |
| |
| error: |
| (*env)->DeleteLocalRef(env, cluster); |
| (*env)->DeleteLocalRef(env, bld); |
| (*env)->DeleteLocalRef(env, cobj); |
| (*env)->DeleteLocalRef(env, jconfStr); |
| free(cl); |
| return NULL; |
| } |
| |
| void nmdFree(struct NativeMiniDfsCluster* cl) |
| { |
| JNIEnv *env = getJNIEnv(); |
| if (!env) { |
| fprintf(stderr, "nmdFree: getJNIEnv failed\n"); |
| free(cl); |
| return; |
| } |
| (*env)->DeleteGlobalRef(env, cl->obj); |
| free(cl); |
| } |
| |
| int nmdShutdown(struct NativeMiniDfsCluster* cl) |
| { |
| JNIEnv *env = getJNIEnv(); |
| jthrowable jthr; |
| |
| if (!env) { |
| fprintf(stderr, "nmdShutdown: getJNIEnv failed\n"); |
| return -EIO; |
| } |
| jthr = invokeMethod(env, NULL, INSTANCE, cl->obj, |
| MINIDFS_CLUSTER, "shutdown", "()V"); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdShutdown: MiniDFSCluster#shutdown"); |
| return -EIO; |
| } |
| return 0; |
| } |
| |
| int nmdWaitClusterUp(struct NativeMiniDfsCluster *cl) |
| { |
| jthrowable jthr; |
| JNIEnv *env = getJNIEnv(); |
| if (!env) { |
| fprintf(stderr, "nmdWaitClusterUp: getJNIEnv failed\n"); |
| return -EIO; |
| } |
| jthr = invokeMethod(env, NULL, INSTANCE, cl->obj, |
| MINIDFS_CLUSTER, "waitClusterUp", "()V"); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdWaitClusterUp: MiniDFSCluster#waitClusterUp "); |
| return -EIO; |
| } |
| return 0; |
| } |
| |
| int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl) |
| { |
| JNIEnv *env = getJNIEnv(); |
| jvalue jVal; |
| jthrowable jthr; |
| |
| if (!env) { |
| fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n"); |
| return -EIO; |
| } |
| // Note: this will have to be updated when HA nativeMiniDfs clusters are |
| // supported |
| jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj, |
| MINIDFS_CLUSTER, "getNameNodePort", "()I"); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdHdfsConnect: MiniDFSCluster#getNameNodePort"); |
| return -EIO; |
| } |
| return jVal.i; |
| } |
| |
| int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, |
| int *port, const char **hostName) |
| { |
| JNIEnv *env = getJNIEnv(); |
| jvalue jVal; |
| jobject jNameNode, jAddress; |
| jthrowable jthr; |
| int ret = 0; |
| const char *host; |
| |
| if (!env) { |
| fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n"); |
| return -EIO; |
| } |
| // First get the (first) NameNode of the cluster |
| jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj, MINIDFS_CLUSTER, |
| "getNameNode", "()L" HADOOP_NAMENODE ";"); |
| if (jthr) { |
| printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdGetNameNodeHttpAddress: " |
| "MiniDFSCluster#getNameNode"); |
| return -EIO; |
| } |
| jNameNode = jVal.l; |
| |
| // Then get the http address (InetSocketAddress) of the NameNode |
| jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE, |
| "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdGetNameNodeHttpAddress: " |
| "NameNode#getHttpAddress"); |
| goto error_dlr_nn; |
| } |
| jAddress = jVal.l; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, |
| JAVA_INETSOCKETADDRESS, "getPort", "()I"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdGetNameNodeHttpAddress: " |
| "InetSocketAddress#getPort"); |
| goto error_dlr_addr; |
| } |
| *port = jVal.i; |
| |
| jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS, |
| "getHostName", "()Ljava/lang/String;"); |
| if (jthr) { |
| ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, |
| "nmdGetNameNodeHttpAddress: " |
| "InetSocketAddress#getHostName"); |
| goto error_dlr_addr; |
| } |
| host = (*env)->GetStringUTFChars(env, jVal.l, NULL); |
| *hostName = strdup(host); |
| (*env)->ReleaseStringUTFChars(env, jVal.l, host); |
| |
| error_dlr_addr: |
| (*env)->DeleteLocalRef(env, jAddress); |
| error_dlr_nn: |
| (*env)->DeleteLocalRef(env, jNameNode); |
| |
| return ret; |
| } |
| |
| int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl, |
| struct hdfsBuilder *bld) |
| { |
| int ret; |
| tPort port; |
| |
| hdfsBuilderSetNameNode(bld, "localhost"); |
| port = (tPort)nmdGetNameNodePort(cl); |
| if (port < 0) { |
| fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port); |
| return EIO; |
| } |
| hdfsBuilderSetNameNodePort(bld, port); |
| if (cl->domainSocketPath[0]) { |
| ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true"); |
| if (ret) { |
| return ret; |
| } |
| ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path", |
| cl->domainSocketPath); |
| if (ret) { |
| return ret; |
| } |
| } |
| return 0; |
| } |