| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::cell::RefCell; |
| use std::ffi::c_void; |
| use std::future::Future; |
| use std::num::NonZeroUsize; |
| use std::sync::atomic::{AtomicUsize, Ordering}; |
| use std::sync::{Arc, OnceLock}; |
| use std::thread::available_parallelism; |
| |
| use jni::JNIEnv; |
| use jni::JavaVM; |
| use jni::objects::JClass; |
| use jni::objects::JObject; |
| use jni::objects::JValue; |
| use jni::sys::jlong; |
| use tokio::task::JoinHandle; |
| |
| use crate::Result; |
| |
| static mut RUNTIME: OnceLock<Executor> = OnceLock::new(); |
| thread_local! { |
| static ENV: RefCell<Option<*mut jni::sys::JNIEnv>> = const { RefCell::new(None) }; |
| } |
| |
| /// # Safety |
| /// |
| /// This function could be only called by java vm when unload this lib. |
| #[allow(static_mut_refs)] |
| #[unsafe(no_mangle)] |
| pub unsafe extern "system" fn JNI_OnUnload(_: JavaVM, _: *mut c_void) { |
| unsafe { |
| RUNTIME.take(); |
| } |
| } |
| |
| /// # Safety |
| /// |
| /// This function could be only called when the lib is loaded and within an executor thread. |
| pub(crate) unsafe fn get_current_env<'local>() -> JNIEnv<'local> { |
| let env = ENV |
| .with(|cell| *cell.borrow_mut()) |
| .expect("env must be available"); |
| unsafe { JNIEnv::from_raw(env) }.expect("env must be valid") |
| } |
| |
| pub enum Executor { |
| Tokio(tokio::runtime::Runtime), |
| } |
| |
| impl Executor { |
| pub fn enter_with<F, R>(&self, f: F) -> R |
| where |
| F: FnOnce() -> R, |
| { |
| match self { |
| Executor::Tokio(e) => { |
| let _guard = e.enter(); |
| f() |
| } |
| } |
| } |
| |
| pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> |
| where |
| F: Future + Send + 'static, |
| F::Output: Send + 'static, |
| { |
| match self { |
| Executor::Tokio(e) => e.spawn(future), |
| } |
| } |
| } |
| |
| #[unsafe(no_mangle)] |
| pub extern "system" fn Java_org_apache_opendal_AsyncExecutor_makeTokioExecutor( |
| mut env: JNIEnv, |
| _: JClass, |
| cores: usize, |
| ) -> jlong { |
| make_tokio_executor(&mut env, cores) |
| .map(|executor| Box::into_raw(Box::new(executor)) as jlong) |
| .unwrap_or_else(|e| { |
| e.throw(&mut env); |
| 0 |
| }) |
| } |
| |
| /// # Safety |
| /// |
| /// This function should not be called before the AsyncExecutor is ready. |
| #[unsafe(no_mangle)] |
| pub unsafe extern "system" fn Java_org_apache_opendal_AsyncExecutor_disposeInternal( |
| _: JNIEnv, |
| _: JObject, |
| executor: *mut Executor, |
| ) { |
| unsafe { |
| drop(Box::from_raw(executor)); |
| } |
| } |
| |
| pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result<Executor> { |
| let vm = Arc::new(env.get_java_vm().expect("JavaVM must be available")); |
| let counter = AtomicUsize::new(0); |
| let executor = tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(cores) |
| .thread_name_fn(move || { |
| let id = counter.fetch_add(1, Ordering::SeqCst); |
| format!("opendal-tokio-worker-{id}") |
| }) |
| .on_thread_start({ |
| let vm = vm.clone(); |
| move || { |
| ENV.with(|cell| { |
| let mut env = vm |
| .attach_current_thread_as_daemon() |
| .expect("attach thread must succeed"); |
| |
| set_current_thread_name(&mut env) |
| .expect("current thread name has been set above"); |
| |
| *cell.borrow_mut() = Some(env.get_raw()); |
| }) |
| } |
| }) |
| .on_thread_stop(move || { |
| // Typically, the thread attached to the JVM will be detached automatically when the thread exits |
| // and the corresponding thread-local AttachGuard is dropped. |
| // |
| // However, there are some edge cases on Windows that may lead to deadlocks. To mitigate this, |
| // we explicitly detach the thread here. |
| // |
| // See https://github.com/apache/opendal/issues/6869 and https://github.com/jni-rs/jni-rs/issues/701 |
| // for more details. |
| |
| ENV.with(|cell| { |
| *cell.borrow_mut() = None; |
| }); |
| |
| // SAFETY: JNIEnv is unset above and we do not use AttachGuard anywhere. |
| unsafe { vm.detach_current_thread() }; |
| }) |
| .enable_all() |
| .build() |
| .map_err(|e| { |
| opendal::Error::new( |
| opendal::ErrorKind::Unexpected, |
| "Failed to create tokio runtime.", |
| ) |
| .set_source(e) |
| })?; |
| Ok(Executor::Tokio(executor)) |
| } |
| |
| fn set_current_thread_name(env: &mut JNIEnv) -> Result<()> { |
| let current_thread = env |
| .call_static_method( |
| "java/lang/Thread", |
| "currentThread", |
| "()Ljava/lang/Thread;", |
| &[], |
| )? |
| .l()?; |
| let thread_name = match std::thread::current().name() { |
| Some(thread_name) => env.new_string(thread_name)?, |
| None => unreachable!("thread name must be set"), |
| }; |
| env.call_method( |
| current_thread, |
| "setName", |
| "(Ljava/lang/String;)V", |
| &[JValue::Object(&thread_name)], |
| )?; |
| Ok(()) |
| } |
| |
| /// # Panic |
| /// |
| /// Crash if the executor is disposed. |
| #[inline] |
| pub(crate) fn executor_or_default<'a>( |
| env: &mut JNIEnv<'a>, |
| executor: *const Executor, |
| ) -> Result<&'a Executor> { |
| unsafe { |
| if executor.is_null() { |
| default_executor(env) |
| } else { |
| // SAFETY: executor must be valid |
| Ok(&*executor) |
| } |
| } |
| } |
| |
| /// # Safety |
| /// |
| /// This function could be only when the lib is loaded. |
| #[allow(static_mut_refs)] |
| unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> Result<&'a Executor> { |
| // Return the executor if it's already initialized |
| if let Some(runtime) = unsafe { RUNTIME.get() } { |
| return Ok(runtime); |
| } |
| |
| // Try to initialize the executor |
| let executor = make_tokio_executor( |
| env, |
| available_parallelism().map(NonZeroUsize::get).unwrap_or(1), |
| )?; |
| |
| Ok(unsafe { RUNTIME.get_or_init(|| executor) }) |
| } |