| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "JniFileSystem.h" |
| #include "jni/JniCommon.h" |
| #include "velox/common/io/IoStatistics.h" |
| |
| namespace { |
| constexpr std::string_view kJniFsScheme("jni:"); |
| constexpr std::string_view kJolFsScheme("jol:"); |
| |
| JavaVM* vm; |
| |
| jclass jniFileSystemClass; |
| jclass jniReadFileClass; |
| jclass jniWriteFileClass; |
| |
| jmethodID jniGetFileSystem; |
| jmethodID jniIsCapableForNewFile; |
| jmethodID jniFileSystemOpenFileForRead; |
| jmethodID jniFileSystemOpenFileForWrite; |
| jmethodID jniFileSystemRemove; |
| jmethodID jniFileSystemRename; |
| jmethodID jniFileSystemExists; |
| jmethodID jniFileSystemList; |
| jmethodID jniFileSystemMkdir; |
| jmethodID jniFileSystemRmdir; |
| |
| jmethodID jniReadFilePread; |
| jmethodID jniReadFileShouldCoalesce; |
| jmethodID jniReadFileSize; |
| jmethodID jniReadFileMemoryUsage; |
| jmethodID jniReadFileGetNaturalReadSize; |
| jmethodID jniReadFileClose; |
| |
| jmethodID jniWriteFileAppend; |
| jmethodID jniWriteFileFlush; |
| jmethodID jniWriteFileClose; |
| jmethodID jniWriteFileSize; |
| |
| jstring createJString(JNIEnv* env, const std::string_view& path) { |
| return env->NewStringUTF(std::string(path).c_str()); |
| } |
| |
| std::string_view removePathSchema(std::string_view path) { |
| unsigned long pos = path.find(':'); |
| if (pos == std::string::npos) { |
| return path; |
| } |
| return path.substr(pos + 1); |
| } |
| |
| class JniReadFile : public facebook::velox::ReadFile { |
| public: |
| explicit JniReadFile(jobject obj) { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| obj_ = env->NewGlobalRef(obj); |
| checkException(env); |
| } |
| |
| ~JniReadFile() override { |
| try { |
| closeInternal(); |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->DeleteGlobalRef(obj_); |
| checkException(env); |
| } catch (const std::exception& e) { |
| LOG(WARNING) << "Error closing jni read file " << e.what(); |
| } |
| } |
| |
| std::string_view pread( |
| uint64_t offset, |
| uint64_t length, |
| void* buf, |
| const facebook::velox::FileStorageContext& fileStorageContext = {}) const override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod( |
| obj_, jniReadFilePread, static_cast<jlong>(offset), static_cast<jlong>(length), reinterpret_cast<jlong>(buf)); |
| checkException(env); |
| return std::string_view(reinterpret_cast<const char*>(buf)); |
| } |
| |
| bool shouldCoalesce() const override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jboolean out = env->CallBooleanMethod(obj_, jniReadFileShouldCoalesce); |
| checkException(env); |
| return out; |
| } |
| |
| uint64_t size() const override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jlong out = env->CallLongMethod(obj_, jniReadFileSize); |
| checkException(env); |
| return static_cast<uint64_t>(out); |
| } |
| |
| uint64_t memoryUsage() const override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jlong out = env->CallLongMethod(obj_, jniReadFileMemoryUsage); |
| checkException(env); |
| return static_cast<uint64_t>(out); |
| } |
| |
| std::string getName() const override { |
| return "<JniReadFile>"; |
| } |
| |
| uint64_t getNaturalReadSize() const override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jlong out = env->CallLongMethod(obj_, jniReadFileGetNaturalReadSize); |
| checkException(env); |
| return static_cast<uint64_t>(out); |
| } |
| |
| private: |
| void closeInternal() { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod(obj_, jniReadFileClose); |
| checkException(env); |
| } |
| |
| jobject obj_; |
| }; |
| |
| class JniWriteFile : public facebook::velox::WriteFile { |
| public: |
| explicit JniWriteFile(jobject obj) { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| obj_ = env->NewGlobalRef(obj); |
| checkException(env); |
| } |
| |
| ~JniWriteFile() override { |
| try { |
| closeInternal(); |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->DeleteGlobalRef(obj_); |
| checkException(env); |
| } catch (const std::exception& e) { |
| LOG(WARNING) << "Error closing jni write file " << e.what(); |
| } |
| } |
| |
| void append(std::string_view data) override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| const void* bytes = data.data(); |
| unsigned long len = data.size(); |
| env->CallVoidMethod(obj_, jniWriteFileAppend, static_cast<jlong>(len), reinterpret_cast<jlong>(bytes)); |
| checkException(env); |
| } |
| |
| void flush() override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod(obj_, jniWriteFileFlush); |
| checkException(env); |
| } |
| |
| void close() override { |
| closeInternal(); |
| } |
| |
| uint64_t size() const override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jlong out = env->CallLongMethod(obj_, jniWriteFileSize); |
| checkException(env); |
| return static_cast<uint64_t>(out); |
| } |
| |
| private: |
| void closeInternal() { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod(obj_, jniWriteFileClose); |
| checkException(env); |
| } |
| |
| jobject obj_; |
| }; |
| |
| // Convert "xxx:/a/b/c" to "/a/b/c". Probably it's Velox's job to remove the protocol when calling the member |
| // functions? |
| class FileSystemWrapper : public facebook::velox::filesystems::FileSystem { |
| public: |
| static std::shared_ptr<facebook::velox::filesystems::FileSystem> wrap( |
| std::shared_ptr<facebook::velox::filesystems::FileSystem> fs) { |
| return std::shared_ptr<facebook::velox::filesystems::FileSystem>(new FileSystemWrapper(fs)); |
| } |
| |
| std::string name() const override { |
| return fs_->name(); |
| } |
| |
| std::unique_ptr<facebook::velox::ReadFile> openFileForRead( |
| std::string_view path, |
| const facebook::velox::filesystems::FileOptions& options) override { |
| return fs_->openFileForRead(rewrite(path), options); |
| } |
| |
| std::unique_ptr<facebook::velox::WriteFile> openFileForWrite( |
| std::string_view path, |
| const facebook::velox::filesystems::FileOptions& options) override { |
| return fs_->openFileForWrite(rewrite(path), options); |
| } |
| |
| void remove(std::string_view path) override { |
| fs_->remove(rewrite(path)); |
| } |
| |
| void rename(std::string_view oldPath, std::string_view newPath, bool overwrite) override { |
| fs_->rename(rewrite(oldPath), rewrite(newPath), overwrite); |
| } |
| |
| bool exists(std::string_view path) override { |
| return fs_->exists(rewrite(path)); |
| } |
| |
| std::vector<std::string> list(std::string_view path) override { |
| return fs_->list(rewrite(path)); |
| } |
| |
| void mkdir(std::string_view path, const facebook::velox::filesystems::DirectoryOptions& options = {}) override { |
| fs_->mkdir(rewrite(path)); |
| } |
| |
| void rmdir(std::string_view path) override { |
| fs_->rmdir(rewrite(path)); |
| } |
| |
| private: |
| FileSystemWrapper(std::shared_ptr<facebook::velox::filesystems::FileSystem> fs) : FileSystem({}), fs_(fs) {} |
| |
| static std::string_view rewrite(std::string_view path) { |
| return removePathSchema(path); |
| } |
| |
| std::shared_ptr<facebook::velox::filesystems::FileSystem> fs_; |
| }; |
| |
| class JniFileSystem : public facebook::velox::filesystems::FileSystem { |
| public: |
| explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::config::ConfigBase> config) |
| : FileSystem(config) { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| obj_ = env->NewGlobalRef(obj); |
| checkException(env); |
| } |
| |
| ~JniFileSystem() override { |
| try { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->DeleteGlobalRef(obj_); |
| checkException(env); |
| } catch (const std::exception& e) { |
| LOG(WARNING) << "Error releasing jni file system " << e.what(); |
| } |
| } |
| |
| std::string name() const override { |
| return "JNI FS"; |
| } |
| |
| std::unique_ptr<facebook::velox::ReadFile> openFileForRead( |
| std::string_view path, |
| const facebook::velox::filesystems::FileOptions& options) override { |
| GLUTEN_CHECK( |
| options.values.empty(), |
| "JniFileSystem::openFileForRead: file options is not empty, this is not currently supported"); |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForRead, createJString(env, path)); |
| checkException(env); |
| auto out = std::make_unique<JniReadFile>(obj); |
| return out; |
| } |
| |
| std::unique_ptr<facebook::velox::WriteFile> openFileForWrite( |
| std::string_view path, |
| const facebook::velox::filesystems::FileOptions& options) override { |
| GLUTEN_CHECK( |
| options.values.empty(), |
| "JniFileSystem::openFileForWrite: file options is not empty, this is not currently supported"); |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForWrite, createJString(env, path)); |
| checkException(env); |
| auto out = std::make_unique<JniWriteFile>(obj); |
| return out; |
| } |
| |
| void remove(std::string_view path) override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod(obj_, jniFileSystemRemove, createJString(env, path)); |
| checkException(env); |
| } |
| |
| void rename(std::string_view oldPath, std::string_view newPath, bool overwrite) override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod(obj_, jniFileSystemRename, createJString(env, oldPath), createJString(env, newPath), overwrite); |
| checkException(env); |
| } |
| |
| bool exists(std::string_view path) override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| bool out = env->CallBooleanMethod(obj_, jniFileSystemExists, createJString(env, path)); |
| checkException(env); |
| return out; |
| } |
| |
| std::vector<std::string> list(std::string_view path) override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| std::vector<std::string> out; |
| jobjectArray jarray = |
| static_cast<jobjectArray>(env->CallObjectMethod(obj_, jniFileSystemList, createJString(env, path))); |
| checkException(env); |
| jsize length = env->GetArrayLength(jarray); |
| for (jsize i = 0; i < length; ++i) { |
| jstring element = static_cast<jstring>(env->GetObjectArrayElement(jarray, i)); |
| std::string cElement = jStringToCString(env, element); |
| out.push_back(cElement); |
| } |
| return out; |
| } |
| |
| void mkdir(std::string_view path, const facebook::velox::filesystems::DirectoryOptions& options = {}) override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod(obj_, jniFileSystemMkdir, createJString(env, path)); |
| checkException(env); |
| } |
| |
| void rmdir(std::string_view path) override { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| env->CallVoidMethod(obj_, jniFileSystemRmdir, createJString(env, path)); |
| checkException(env); |
| } |
| |
| static bool isCapableForNewFile(uint64_t size) { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| bool out = env->CallStaticBooleanMethod(jniFileSystemClass, jniIsCapableForNewFile, static_cast<jlong>(size)); |
| checkException(env); |
| return out; |
| } |
| |
| static std::function<bool(std::string_view)> schemeMatcher() { |
| return [](std::string_view filePath) { return filePath.find(kJniFsScheme) == 0; }; |
| } |
| |
| static std::function< |
| std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::config::ConfigBase>, std::string_view)> |
| fileSystemGenerator() { |
| return [](std::shared_ptr<const facebook::velox::config::ConfigBase> properties, std::string_view filePath) { |
| JNIEnv* env = nullptr; |
| attachCurrentThreadAsDaemonOrThrow(vm, &env); |
| jobject obj = env->CallStaticObjectMethod(jniFileSystemClass, jniGetFileSystem); |
| checkException(env); |
| // remove "jni:" or "jol:" prefix. |
| std::shared_ptr<FileSystem> lfs = FileSystemWrapper::wrap(std::make_shared<JniFileSystem>(obj, properties)); |
| return lfs; |
| }; |
| } |
| |
| private: |
| jobject obj_; |
| }; |
| } // namespace |
| |
| void gluten::initVeloxJniFileSystem(JNIEnv* env) { |
| // vm |
| if (env->GetJavaVM(&vm) != JNI_OK) { |
| throw gluten::GlutenException("Unable to get JavaVM instance"); |
| } |
| |
| // classes |
| jniFileSystemClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/fs/JniFilesystem;"); |
| jniReadFileClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/fs/JniFilesystem$ReadFile;"); |
| jniWriteFileClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/fs/JniFilesystem$WriteFile;"); |
| |
| // methods in JniFilesystem |
| jniGetFileSystem = |
| getStaticMethodIdOrError(env, jniFileSystemClass, "getFileSystem", "()Lorg/apache/gluten/fs/JniFilesystem;"); |
| jniIsCapableForNewFile = getStaticMethodIdOrError(env, jniFileSystemClass, "isCapableForNewFile", "(J)Z"); |
| jniFileSystemOpenFileForRead = getMethodIdOrError( |
| env, jniFileSystemClass, "openFileForRead", "(Ljava/lang/String;)Lorg/apache/gluten/fs/JniFilesystem$ReadFile;"); |
| jniFileSystemOpenFileForWrite = getMethodIdOrError( |
| env, |
| jniFileSystemClass, |
| "openFileForWrite", |
| "(Ljava/lang/String;)Lorg/apache/gluten/fs/JniFilesystem$WriteFile;"); |
| jniFileSystemRemove = getMethodIdOrError(env, jniFileSystemClass, "remove", "(Ljava/lang/String;)V"); |
| jniFileSystemRename = |
| getMethodIdOrError(env, jniFileSystemClass, "rename", "(Ljava/lang/String;Ljava/lang/String;Z)V"); |
| jniFileSystemExists = getMethodIdOrError(env, jniFileSystemClass, "exists", "(Ljava/lang/String;)Z"); |
| jniFileSystemList = getMethodIdOrError(env, jniFileSystemClass, "list", "(Ljava/lang/String;)[Ljava/lang/String;"); |
| jniFileSystemMkdir = getMethodIdOrError(env, jniFileSystemClass, "mkdir", "(Ljava/lang/String;)V"); |
| jniFileSystemRmdir = getMethodIdOrError(env, jniFileSystemClass, "rmdir", "(Ljava/lang/String;)V"); |
| |
| // methods in JniFilesystem$ReadFile |
| jniReadFilePread = getMethodIdOrError(env, jniReadFileClass, "pread", "(JJJ)V"); |
| jniReadFileShouldCoalesce = getMethodIdOrError(env, jniReadFileClass, "shouldCoalesce", "()Z"); |
| jniReadFileSize = getMethodIdOrError(env, jniReadFileClass, "size", "()J"); |
| jniReadFileMemoryUsage = getMethodIdOrError(env, jniReadFileClass, "memoryUsage", "()J"); |
| jniReadFileGetNaturalReadSize = getMethodIdOrError(env, jniReadFileClass, "getNaturalReadSize", "()J"); |
| jniReadFileClose = getMethodIdOrError(env, jniReadFileClass, "close", "()V"); |
| |
| // methods in JniFilesystem$WriteFile |
| jniWriteFileAppend = getMethodIdOrError(env, jniWriteFileClass, "append", "(JJ)V"); |
| jniWriteFileFlush = getMethodIdOrError(env, jniWriteFileClass, "flush", "()V"); |
| jniWriteFileClose = getMethodIdOrError(env, jniWriteFileClass, "close", "()V"); |
| jniWriteFileSize = getMethodIdOrError(env, jniWriteFileClass, "size", "()J"); |
| } |
| |
| void gluten::finalizeVeloxJniFileSystem(JNIEnv* env) { |
| env->DeleteGlobalRef(jniWriteFileClass); |
| env->DeleteGlobalRef(jniReadFileClass); |
| env->DeleteGlobalRef(jniFileSystemClass); |
| |
| vm = nullptr; |
| } |
| |
| // "jol" stands for letting Gluten choose between jni fs and local fs. |
| // This doesn't implement facebook::velox::filesystems::FileSystem since it just |
| // act as a entry-side router to create JniFilesystem and LocalFilesystem |
| void gluten::registerJolFileSystem(uint64_t maxFileSize) { |
| GLUTEN_CHECK(maxFileSize > 0, "Unexpected max file size for jol fs: " + std::to_string(maxFileSize)); |
| |
| auto JolSchemeMatcher = [](std::string_view filePath) { return filePath.find(kJolFsScheme) == 0; }; |
| |
| auto fileSystemGenerator = |
| [maxFileSize]( |
| std::shared_ptr<const facebook::velox::config::ConfigBase> properties, |
| std::string_view filePath) -> std::shared_ptr<facebook::velox::filesystems::FileSystem> { |
| // select JNI file if there is enough space |
| if (JniFileSystem::isCapableForNewFile(maxFileSize)) { |
| return JniFileSystem::fileSystemGenerator()(properties, filePath); |
| } |
| |
| // otherwise select local file |
| // remove "jol:" to make Velox choose local fs. |
| auto localFilePath = removePathSchema(filePath); |
| auto fs = FileSystemWrapper::wrap(facebook::velox::filesystems::getFileSystem(localFilePath, properties)); |
| return fs; |
| }; |
| |
| facebook::velox::filesystems::registerFileSystem(JolSchemeMatcher, fileSystemGenerator); |
| } |