fix(core/oio): Make ConcurrentTasks cancel safe by only pop after ready (#4707)
* fix(core/oio): Make ConcurrentTasks cancel safe by only pop after ready
Signed-off-by: Xuanwo <github@xuanwo.io>
* add part numbers check
Signed-off-by: Xuanwo <github@xuanwo.io>
---------
Signed-off-by: Xuanwo <github@xuanwo.io>
diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 6fe4472..3abf645 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -171,15 +171,20 @@
loop {
// Try poll once to see if there is any ready task.
- if let Some(mut task) = self.tasks.pop_front() {
- if let Poll::Ready((i, o)) = poll!(&mut task) {
+ if let Some(task) = self.tasks.front_mut() {
+ if let Poll::Ready((i, o)) = poll!(task) {
match o {
- Ok(o) => self.results.push_back(o),
+ Ok(o) => {
+ let _ = self.tasks.pop_front();
+ self.results.push_back(o)
+ }
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
- .push_front(self.executor.execute((self.factory)(i)));
+ .front_mut()
+ .expect("tasks must have at least one task")
+ .replace(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
@@ -187,9 +192,6 @@
return Err(err);
}
}
- } else {
- // task is not ready, push it back.
- self.tasks.push_front(task)
}
}
@@ -203,11 +205,12 @@
// Wait for the next task to be ready.
let task = self
.tasks
- .pop_front()
+ .front_mut()
.expect("tasks must have at least one task");
let (i, o) = task.await;
match o {
Ok(o) => {
+ let _ = self.tasks.pop_front();
self.results.push_back(o);
continue;
}
@@ -215,7 +218,9 @@
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
- .push_front(self.executor.execute((self.factory)(i)));
+ .front_mut()
+ .expect("tasks must have at least one task")
+ .replace(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
@@ -239,15 +244,20 @@
return Some(Ok(result));
}
- if let Some(task) = self.tasks.pop_front() {
+ if let Some(task) = self.tasks.front_mut() {
let (i, o) = task.await;
return match o {
- Ok(o) => Some(Ok(o)),
+ Ok(o) => {
+ let _ = self.tasks.pop_front();
+ Some(Ok(o))
+ }
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
- .push_front(self.executor.execute((self.factory)(i)));
+ .front_mut()
+ .expect("tasks must have at least one task")
+ .replace(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs
index 03549e8..0d893d7 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -276,6 +276,15 @@
self.parts.push(result)
}
+ if self.parts.len() != self.next_part_number {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "multipart part numbers mismatch, please report bug to opendal",
+ )
+ .with_context("expected", self.next_part_number)
+ .with_context("actual", self.parts.len())
+ .with_context("upload_id", upload_id));
+ }
self.w.complete_part(&upload_id, &self.parts).await
}
@@ -300,7 +309,7 @@
use rand::Rng;
use rand::RngCore;
use tokio::sync::Mutex;
- use tokio::time::sleep;
+ use tokio::time::{sleep, timeout};
use super::*;
use crate::raw::oio::Write;
@@ -347,7 +356,7 @@
}
// Add an async sleep here to enforce some pending.
- sleep(Duration::from_millis(50)).await;
+ sleep(Duration::from_nanos(50)).await;
// We will have 10% percent rate for write part to fail.
if thread_rng().gen_bool(1.0 / 10.0) {
@@ -385,11 +394,38 @@
}
}
+ struct TimeoutExecutor {
+ exec: Arc<dyn Execute>,
+ }
+
+ impl TimeoutExecutor {
+ pub fn new() -> Self {
+ Self {
+ exec: Executor::new().into_inner(),
+ }
+ }
+ }
+
+ impl Execute for TimeoutExecutor {
+ fn execute(&self, f: BoxedStaticFuture<()>) {
+ self.exec.execute(f)
+ }
+
+ fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+ let time = thread_rng().gen_range(0..100);
+ Some(Box::pin(tokio::time::sleep(Duration::from_nanos(time))))
+ }
+ }
+
#[tokio::test]
async fn test_multipart_upload_writer_with_concurrent_errors() {
let mut rng = thread_rng();
- let mut w = MultipartWriter::new(TestWrite::new(), Some(Executor::new()), 200);
+ let mut w = MultipartWriter::new(
+ TestWrite::new(),
+ Some(Executor::with(TimeoutExecutor::new())),
+ 200,
+ );
let mut total_size = 0u64;
for _ in 0..1000 {
@@ -400,17 +436,23 @@
rng.fill_bytes(&mut bs);
loop {
- match w.write(bs.clone().into()).await {
- Ok(_) => break,
- Err(_) => continue,
+ match timeout(Duration::from_nanos(10), w.write(bs.clone().into())).await {
+ Ok(Ok(_)) => break,
+ Ok(Err(_)) => continue,
+ Err(_) => {
+ continue;
+ }
}
}
}
loop {
- match w.close().await {
- Ok(_) => break,
- Err(_) => continue,
+ match timeout(Duration::from_nanos(10), w.close()).await {
+ Ok(Ok(_)) => break,
+ Ok(Err(_)) => continue,
+ Err(_) => {
+ continue;
+ }
}
}
diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs
index 2317434..53da1a7 100644
--- a/core/src/types/execute/api.rs
+++ b/core/src/types/execute/api.rs
@@ -91,6 +91,14 @@
pub fn new(handle: RemoteHandle<T>) -> Self {
Self { handle }
}
+
+ /// Replace the task with a new task.
+ ///
+ /// The old task will be dropped directly.
+ #[inline]
+ pub fn replace(&mut self, new_task: Self) {
+ self.handle = new_task.handle;
+ }
}
impl<T: 'static> Future for Task<T> {