fix(core/oio): Make ConcurrentTasks cancel safe by only pop after ready (#4707)

* fix(core/oio): Make ConcurrentTasks cancel safe by only pop after ready

Signed-off-by: Xuanwo <github@xuanwo.io>

* add part numbers check

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 6fe4472..3abf645 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -171,15 +171,20 @@
 
         loop {
             // Try poll once to see if there is any ready task.
-            if let Some(mut task) = self.tasks.pop_front() {
-                if let Poll::Ready((i, o)) = poll!(&mut task) {
+            if let Some(task) = self.tasks.front_mut() {
+                if let Poll::Ready((i, o)) = poll!(task) {
                     match o {
-                        Ok(o) => self.results.push_back(o),
+                        Ok(o) => {
+                            let _ = self.tasks.pop_front();
+                            self.results.push_back(o)
+                        }
                         Err(err) => {
                             // Retry this task if the error is temporary
                             if err.is_temporary() {
                                 self.tasks
-                                    .push_front(self.executor.execute((self.factory)(i)));
+                                    .front_mut()
+                                    .expect("tasks must have at least one task")
+                                    .replace(self.executor.execute((self.factory)(i)));
                             } else {
                                 self.clear();
                                 self.errored = true;
@@ -187,9 +192,6 @@
                             return Err(err);
                         }
                     }
-                } else {
-                    // task is not ready, push it back.
-                    self.tasks.push_front(task)
                 }
             }
 
@@ -203,11 +205,12 @@
             // Wait for the next task to be ready.
             let task = self
                 .tasks
-                .pop_front()
+                .front_mut()
                 .expect("tasks must have at least one task");
             let (i, o) = task.await;
             match o {
                 Ok(o) => {
+                    let _ = self.tasks.pop_front();
                     self.results.push_back(o);
                     continue;
                 }
@@ -215,7 +218,9 @@
                     // Retry this task if the error is temporary
                     if err.is_temporary() {
                         self.tasks
-                            .push_front(self.executor.execute((self.factory)(i)));
+                            .front_mut()
+                            .expect("tasks must have at least one task")
+                            .replace(self.executor.execute((self.factory)(i)));
                     } else {
                         self.clear();
                         self.errored = true;
@@ -239,15 +244,20 @@
             return Some(Ok(result));
         }
 
-        if let Some(task) = self.tasks.pop_front() {
+        if let Some(task) = self.tasks.front_mut() {
             let (i, o) = task.await;
             return match o {
-                Ok(o) => Some(Ok(o)),
+                Ok(o) => {
+                    let _ = self.tasks.pop_front();
+                    Some(Ok(o))
+                }
                 Err(err) => {
                     // Retry this task if the error is temporary
                     if err.is_temporary() {
                         self.tasks
-                            .push_front(self.executor.execute((self.factory)(i)));
+                            .front_mut()
+                            .expect("tasks must have at least one task")
+                            .replace(self.executor.execute((self.factory)(i)));
                     } else {
                         self.clear();
                         self.errored = true;
diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs
index 03549e8..0d893d7 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -276,6 +276,15 @@
             self.parts.push(result)
         }
 
+        if self.parts.len() != self.next_part_number {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "multipart part numbers mismatch, please report bug to opendal",
+            )
+            .with_context("expected", self.next_part_number)
+            .with_context("actual", self.parts.len())
+            .with_context("upload_id", upload_id));
+        }
         self.w.complete_part(&upload_id, &self.parts).await
     }
 
@@ -300,7 +309,7 @@
     use rand::Rng;
     use rand::RngCore;
     use tokio::sync::Mutex;
-    use tokio::time::sleep;
+    use tokio::time::{sleep, timeout};
 
     use super::*;
     use crate::raw::oio::Write;
@@ -347,7 +356,7 @@
             }
 
             // Add an async sleep here to enforce some pending.
-            sleep(Duration::from_millis(50)).await;
+            sleep(Duration::from_nanos(50)).await;
 
             // We will have 10% percent rate for write part to fail.
             if thread_rng().gen_bool(1.0 / 10.0) {
@@ -385,11 +394,38 @@
         }
     }
 
+    struct TimeoutExecutor {
+        exec: Arc<dyn Execute>,
+    }
+
+    impl TimeoutExecutor {
+        pub fn new() -> Self {
+            Self {
+                exec: Executor::new().into_inner(),
+            }
+        }
+    }
+
+    impl Execute for TimeoutExecutor {
+        fn execute(&self, f: BoxedStaticFuture<()>) {
+            self.exec.execute(f)
+        }
+
+        fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+            let time = thread_rng().gen_range(0..100);
+            Some(Box::pin(tokio::time::sleep(Duration::from_nanos(time))))
+        }
+    }
+
     #[tokio::test]
     async fn test_multipart_upload_writer_with_concurrent_errors() {
         let mut rng = thread_rng();
 
-        let mut w = MultipartWriter::new(TestWrite::new(), Some(Executor::new()), 200);
+        let mut w = MultipartWriter::new(
+            TestWrite::new(),
+            Some(Executor::with(TimeoutExecutor::new())),
+            200,
+        );
         let mut total_size = 0u64;
 
         for _ in 0..1000 {
@@ -400,17 +436,23 @@
             rng.fill_bytes(&mut bs);
 
             loop {
-                match w.write(bs.clone().into()).await {
-                    Ok(_) => break,
-                    Err(_) => continue,
+                match timeout(Duration::from_nanos(10), w.write(bs.clone().into())).await {
+                    Ok(Ok(_)) => break,
+                    Ok(Err(_)) => continue,
+                    Err(_) => {
+                        continue;
+                    }
                 }
             }
         }
 
         loop {
-            match w.close().await {
-                Ok(_) => break,
-                Err(_) => continue,
+            match timeout(Duration::from_nanos(10), w.close()).await {
+                Ok(Ok(_)) => break,
+                Ok(Err(_)) => continue,
+                Err(_) => {
+                    continue;
+                }
             }
         }
 
diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs
index 2317434..53da1a7 100644
--- a/core/src/types/execute/api.rs
+++ b/core/src/types/execute/api.rs
@@ -91,6 +91,14 @@
     pub fn new(handle: RemoteHandle<T>) -> Self {
         Self { handle }
     }
+
+    /// Replace the task with a new task.
+    ///
+    /// The old task will be dropped directly.
+    #[inline]
+    pub fn replace(&mut self, new_task: Self) {
+        self.handle = new_task.handle;
+    }
 }
 
 impl<T: 'static> Future for Task<T> {