// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::cell::RefCell;
use std::ffi::c_void;
use std::future::Future;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::thread::available_parallelism;

use jni::JNIEnv;
use jni::JavaVM;
use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JValue;
use jni::sys::jlong;
use tokio::task::JoinHandle;

use crate::Result;

static mut RUNTIME: OnceLock<Executor> = OnceLock::new();
thread_local! {
    static ENV: RefCell<Option<*mut jni::sys::JNIEnv>> = const { RefCell::new(None) };
}

/// # Safety
///
/// This function could be only called by java vm when unload this lib.
#[allow(static_mut_refs)]
#[unsafe(no_mangle)]
pub unsafe extern "system" fn JNI_OnUnload(_: JavaVM, _: *mut c_void) {
    unsafe {
        RUNTIME.take();
    }
}

/// # Safety
///
/// This function could be only called when the lib is loaded and within an executor thread.
pub(crate) unsafe fn get_current_env<'local>() -> JNIEnv<'local> {
    let env = ENV
        .with(|cell| *cell.borrow_mut())
        .expect("env must be available");
    unsafe { JNIEnv::from_raw(env) }.expect("env must be valid")
}

pub enum Executor {
    Tokio(tokio::runtime::Runtime),
}

impl Executor {
    pub fn enter_with<F, R>(&self, f: F) -> R
    where
        F: FnOnce() -> R,
    {
        match self {
            Executor::Tokio(e) => {
                let _guard = e.enter();
                f()
            }
        }
    }

    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        match self {
            Executor::Tokio(e) => e.spawn(future),
        }
    }
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_apache_opendal_AsyncExecutor_makeTokioExecutor(
    mut env: JNIEnv,
    _: JClass,
    cores: usize,
) -> jlong {
    make_tokio_executor(&mut env, cores)
        .map(|executor| Box::into_raw(Box::new(executor)) as jlong)
        .unwrap_or_else(|e| {
            e.throw(&mut env);
            0
        })
}

/// # Safety
///
/// This function should not be called before the AsyncExecutor is ready.
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_org_apache_opendal_AsyncExecutor_disposeInternal(
    _: JNIEnv,
    _: JObject,
    executor: *mut Executor,
) {
    unsafe {
        drop(Box::from_raw(executor));
    }
}

pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result<Executor> {
    let vm = Arc::new(env.get_java_vm().expect("JavaVM must be available"));
    let counter = AtomicUsize::new(0);
    let executor = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(cores)
        .thread_name_fn(move || {
            let id = counter.fetch_add(1, Ordering::SeqCst);
            format!("opendal-tokio-worker-{id}")
        })
        .on_thread_start({
            let vm = vm.clone();
            move || {
                ENV.with(|cell| {
                    let mut env = vm
                        .attach_current_thread_as_daemon()
                        .expect("attach thread must succeed");

                    set_current_thread_name(&mut env)
                        .expect("current thread name has been set above");

                    *cell.borrow_mut() = Some(env.get_raw());
                })
            }
        })
        .on_thread_stop(move || {
            // Typically, the thread attached to the JVM will be detached automatically when the thread exits
            // and the corresponding thread-local AttachGuard is dropped.
            //
            // However, there are some edge cases on Windows that may lead to deadlocks. To mitigate this,
            // we explicitly detach the thread here.
            //
            // See https://github.com/apache/opendal/issues/6869 and https://github.com/jni-rs/jni-rs/issues/701
            // for more details.

            ENV.with(|cell| {
                *cell.borrow_mut() = None;
            });

            // SAFETY: JNIEnv is unset above and we do not use AttachGuard anywhere.
            unsafe { vm.detach_current_thread() };
        })
        .enable_all()
        .build()
        .map_err(|e| {
            opendal::Error::new(
                opendal::ErrorKind::Unexpected,
                "Failed to create tokio runtime.",
            )
            .set_source(e)
        })?;
    Ok(Executor::Tokio(executor))
}

fn set_current_thread_name(env: &mut JNIEnv) -> Result<()> {
    let current_thread = env
        .call_static_method(
            "java/lang/Thread",
            "currentThread",
            "()Ljava/lang/Thread;",
            &[],
        )?
        .l()?;
    let thread_name = match std::thread::current().name() {
        Some(thread_name) => env.new_string(thread_name)?,
        None => unreachable!("thread name must be set"),
    };
    env.call_method(
        current_thread,
        "setName",
        "(Ljava/lang/String;)V",
        &[JValue::Object(&thread_name)],
    )?;
    Ok(())
}

/// # Panic
///
/// Crash if the executor is disposed.
#[inline]
pub(crate) fn executor_or_default<'a>(
    env: &mut JNIEnv<'a>,
    executor: *const Executor,
) -> Result<&'a Executor> {
    unsafe {
        if executor.is_null() {
            default_executor(env)
        } else {
            // SAFETY: executor must be valid
            Ok(&*executor)
        }
    }
}

/// # Safety
///
/// This function could be only when the lib is loaded.
#[allow(static_mut_refs)]
unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> Result<&'a Executor> {
    // Return the executor if it's already initialized
    if let Some(runtime) = unsafe { RUNTIME.get() } {
        return Ok(runtime);
    }

    // Try to initialize the executor
    let executor = make_tokio_executor(
        env,
        available_parallelism().map(NonZeroUsize::get).unwrap_or(1),
    )?;

    Ok(unsafe { RUNTIME.get_or_init(|| executor) })
}
