blob: caa8263aead42cabfe5127046f17668f4206a287 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::cell::RefCell;
use std::ffi::c_void;
use std::future::Future;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::thread::available_parallelism;
use jni::JNIEnv;
use jni::JavaVM;
use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JValue;
use jni::sys::jlong;
use tokio::task::JoinHandle;
use crate::Result;
static mut RUNTIME: OnceLock<Executor> = OnceLock::new();
thread_local! {
static ENV: RefCell<Option<*mut jni::sys::JNIEnv>> = const { RefCell::new(None) };
}
/// # Safety
///
/// This function could be only called by java vm when unload this lib.
#[allow(static_mut_refs)]
#[unsafe(no_mangle)]
pub unsafe extern "system" fn JNI_OnUnload(_: JavaVM, _: *mut c_void) {
unsafe {
RUNTIME.take();
}
}
/// # Safety
///
/// This function could be only called when the lib is loaded and within an executor thread.
pub(crate) unsafe fn get_current_env<'local>() -> JNIEnv<'local> {
let env = ENV
.with(|cell| *cell.borrow_mut())
.expect("env must be available");
unsafe { JNIEnv::from_raw(env) }.expect("env must be valid")
}
pub enum Executor {
Tokio(tokio::runtime::Runtime),
}
impl Executor {
pub fn enter_with<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
match self {
Executor::Tokio(e) => {
let _guard = e.enter();
f()
}
}
}
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Executor::Tokio(e) => e.spawn(future),
}
}
}
#[unsafe(no_mangle)]
pub extern "system" fn Java_org_apache_opendal_AsyncExecutor_makeTokioExecutor(
mut env: JNIEnv,
_: JClass,
cores: usize,
) -> jlong {
make_tokio_executor(&mut env, cores)
.map(|executor| Box::into_raw(Box::new(executor)) as jlong)
.unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}
/// # Safety
///
/// This function should not be called before the AsyncExecutor is ready.
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_org_apache_opendal_AsyncExecutor_disposeInternal(
_: JNIEnv,
_: JObject,
executor: *mut Executor,
) {
unsafe {
drop(Box::from_raw(executor));
}
}
pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result<Executor> {
let vm = Arc::new(env.get_java_vm().expect("JavaVM must be available"));
let counter = AtomicUsize::new(0);
let executor = tokio::runtime::Builder::new_multi_thread()
.worker_threads(cores)
.thread_name_fn(move || {
let id = counter.fetch_add(1, Ordering::SeqCst);
format!("opendal-tokio-worker-{id}")
})
.on_thread_start({
let vm = vm.clone();
move || {
ENV.with(|cell| {
let mut env = vm
.attach_current_thread_as_daemon()
.expect("attach thread must succeed");
set_current_thread_name(&mut env)
.expect("current thread name has been set above");
*cell.borrow_mut() = Some(env.get_raw());
})
}
})
.on_thread_stop(move || {
// Typically, the thread attached to the JVM will be detached automatically when the thread exits
// and the corresponding thread-local AttachGuard is dropped.
//
// However, there are some edge cases on Windows that may lead to deadlocks. To mitigate this,
// we explicitly detach the thread here.
//
// See https://github.com/apache/opendal/issues/6869 and https://github.com/jni-rs/jni-rs/issues/701
// for more details.
ENV.with(|cell| {
*cell.borrow_mut() = None;
});
// SAFETY: JNIEnv is unset above and we do not use AttachGuard anywhere.
unsafe { vm.detach_current_thread() };
})
.enable_all()
.build()
.map_err(|e| {
opendal::Error::new(
opendal::ErrorKind::Unexpected,
"Failed to create tokio runtime.",
)
.set_source(e)
})?;
Ok(Executor::Tokio(executor))
}
fn set_current_thread_name(env: &mut JNIEnv) -> Result<()> {
let current_thread = env
.call_static_method(
"java/lang/Thread",
"currentThread",
"()Ljava/lang/Thread;",
&[],
)?
.l()?;
let thread_name = match std::thread::current().name() {
Some(thread_name) => env.new_string(thread_name)?,
None => unreachable!("thread name must be set"),
};
env.call_method(
current_thread,
"setName",
"(Ljava/lang/String;)V",
&[JValue::Object(&thread_name)],
)?;
Ok(())
}
/// # Panic
///
/// Crash if the executor is disposed.
#[inline]
pub(crate) fn executor_or_default<'a>(
env: &mut JNIEnv<'a>,
executor: *const Executor,
) -> Result<&'a Executor> {
unsafe {
if executor.is_null() {
default_executor(env)
} else {
// SAFETY: executor must be valid
Ok(&*executor)
}
}
}
/// # Safety
///
/// This function could be only when the lib is loaded.
#[allow(static_mut_refs)]
unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> Result<&'a Executor> {
// Return the executor if it's already initialized
if let Some(runtime) = unsafe { RUNTIME.get() } {
return Ok(runtime);
}
// Try to initialize the executor
let executor = make_tokio_executor(
env,
available_parallelism().map(NonZeroUsize::get).unwrap_or(1),
)?;
Ok(unsafe { RUNTIME.get_or_init(|| executor) })
}