| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::future::Future; |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| use crate::raw::*; |
| use crate::*; |
| |
| /// Add timeout for every operation to avoid slow or unexpected hang operations. |
| /// |
| /// For example, a dead connection could hang a databases sql query. TimeoutLayer |
| /// will break this connection and returns an error so users can handle it by |
| /// retrying or print to users. |
| /// |
| /// # Notes |
| /// |
| /// `TimeoutLayer` treats all operations in two kinds: |
| /// |
| /// - Non IO Operation like `stat`, `delete` they operate on a single file. We control |
| /// them by setting `timeout`. |
| /// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we |
| /// control them by setting `io_timeout`. |
| /// |
| /// # Default |
| /// |
| /// - timeout: 60 seconds |
| /// - io_timeout: 10 seconds |
| /// |
| /// # Panics |
| /// |
| /// TimeoutLayer will drop the future if the timeout is reached. This might cause the internal state |
| /// of the future to be broken. If underlying future moves ownership into the future, it will be |
| /// dropped and will neven return back. |
| /// |
| /// For example, while using `TimeoutLayer` with `RetryLayer` at the same time, please make sure |
| /// timeout layer showed up before retry layer. |
| /// |
| /// ```no_run |
| /// # use std::time::Duration; |
| /// |
| /// # use opendal::layers::RetryLayer; |
| /// # use opendal::layers::TimeoutLayer; |
| /// # use opendal::services; |
| /// # use opendal::Operator; |
| /// # use opendal::Result; |
| /// |
| /// # fn main() -> Result<()> { |
| /// let op = Operator::new(services::Memory::default())? |
| /// // This is fine, since timeout happen during retry. |
| /// .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1))) |
| /// .layer(RetryLayer::new()) |
| /// // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state. |
| /// .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1))) |
| /// .finish(); |
| /// Ok(()) |
| /// # } |
| /// ``` |
| /// |
| /// # Examples |
| /// |
| /// The following examples will create a timeout layer with 10 seconds timeout for all non-io |
| /// operations, 3 seconds timeout for all io operations. |
| /// |
| /// ```no_run |
| /// # use std::time::Duration; |
| /// |
| /// # use opendal::layers::TimeoutLayer; |
| /// # use opendal::services; |
| /// # use opendal::Operator; |
| /// # use opendal::Result; |
| /// # use opendal::Scheme; |
| /// |
| /// # fn main() -> Result<()> { |
| /// let _ = Operator::new(services::Memory::default())? |
| /// .layer( |
| /// TimeoutLayer::default() |
| /// .with_timeout(Duration::from_secs(10)) |
| /// .with_io_timeout(Duration::from_secs(3)), |
| /// ) |
| /// .finish(); |
| /// Ok(()) |
| /// # } |
| /// ``` |
| /// |
| /// # Implementation Notes |
| /// |
| /// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO |
| /// Operations insides `reader`, `writer` will use `Pin<Box<tokio::time::Sleep>>` to track the |
| /// timeout. |
| /// |
| /// This might introduce a bit overhead for IO operations, but it's the only way to implement |
| /// timeout correctly. We used to implement timeout layer in zero cost way that only stores |
| /// a [`std::time::Instant`] and check the timeout by comparing the instant with current time. |
| /// However, it doesn't work for all cases. |
| /// |
| /// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime |
| /// will never poll our future again. From the application side, this future is hanging forever |
| /// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times. |
| #[derive(Clone)] |
| pub struct TimeoutLayer { |
| timeout: Duration, |
| io_timeout: Duration, |
| } |
| |
| impl Default for TimeoutLayer { |
| fn default() -> Self { |
| Self { |
| timeout: Duration::from_secs(60), |
| io_timeout: Duration::from_secs(10), |
| } |
| } |
| } |
| |
| impl TimeoutLayer { |
| /// Create a new `TimeoutLayer` with default settings. |
| pub fn new() -> Self { |
| Self::default() |
| } |
| |
| /// Set timeout for TimeoutLayer with given value. |
| /// |
| /// This timeout is for all non-io operations like `stat`, `delete`. |
| pub fn with_timeout(mut self, timeout: Duration) -> Self { |
| self.timeout = timeout; |
| self |
| } |
| |
| /// Set io timeout for TimeoutLayer with given value. |
| /// |
| /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`. |
| pub fn with_io_timeout(mut self, timeout: Duration) -> Self { |
| self.io_timeout = timeout; |
| self |
| } |
| |
| /// Set speed for TimeoutLayer with given value. |
| /// |
| /// # Notes |
| /// |
| /// The speed should be the lower bound of the IO speed. Set this value too |
| /// large could result in all write operations failing. |
| /// |
| /// # Panics |
| /// |
| /// This function will panic if speed is 0. |
| #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")] |
| pub fn with_speed(self, _: u64) -> Self { |
| self |
| } |
| } |
| |
| impl<A: Access> Layer<A> for TimeoutLayer { |
| type LayeredAccess = TimeoutAccessor<A>; |
| |
| fn layer(&self, inner: A) -> Self::LayeredAccess { |
| let info = inner.info(); |
| info.update_executor(|exec| { |
| Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout)) |
| }); |
| |
| TimeoutAccessor { |
| inner, |
| |
| timeout: self.timeout, |
| io_timeout: self.io_timeout, |
| } |
| } |
| } |
| |
| #[derive(Debug, Clone)] |
| pub struct TimeoutAccessor<A: Access> { |
| inner: A, |
| |
| timeout: Duration, |
| io_timeout: Duration, |
| } |
| |
| impl<A: Access> TimeoutAccessor<A> { |
| async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> { |
| tokio::time::timeout(self.timeout, fut).await.map_err(|_| { |
| Error::new(ErrorKind::Unexpected, "operation timeout reached") |
| .with_operation(op) |
| .with_context("timeout", self.timeout.as_secs_f64().to_string()) |
| .set_temporary() |
| })? |
| } |
| |
| async fn io_timeout<F: Future<Output = Result<T>>, T>( |
| &self, |
| op: Operation, |
| fut: F, |
| ) -> Result<T> { |
| tokio::time::timeout(self.io_timeout, fut) |
| .await |
| .map_err(|_| { |
| Error::new(ErrorKind::Unexpected, "io timeout reached") |
| .with_operation(op) |
| .with_context("timeout", self.io_timeout.as_secs_f64().to_string()) |
| .set_temporary() |
| })? |
| } |
| } |
| |
| impl<A: Access> LayeredAccess for TimeoutAccessor<A> { |
| type Inner = A; |
| type Reader = TimeoutWrapper<A::Reader>; |
| type Writer = TimeoutWrapper<A::Writer>; |
| type Lister = TimeoutWrapper<A::Lister>; |
| type Deleter = TimeoutWrapper<A::Deleter>; |
| |
| fn inner(&self) -> &Self::Inner { |
| &self.inner |
| } |
| |
| async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { |
| self.timeout(Operation::CreateDir, self.inner.create_dir(path, args)) |
| .await |
| } |
| |
| async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { |
| self.io_timeout(Operation::Read, self.inner.read(path, args)) |
| .await |
| .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) |
| } |
| |
| async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { |
| self.io_timeout(Operation::Write, self.inner.write(path, args)) |
| .await |
| .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) |
| } |
| |
| async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { |
| self.timeout(Operation::Copy, self.inner.copy(from, to, args)) |
| .await |
| } |
| |
| async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> { |
| self.timeout(Operation::Rename, self.inner.rename(from, to, args)) |
| .await |
| } |
| |
| async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { |
| self.timeout(Operation::Stat, self.inner.stat(path, args)) |
| .await |
| } |
| |
| async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { |
| self.timeout(Operation::Delete, self.inner.delete()) |
| .await |
| .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) |
| } |
| |
| async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { |
| self.io_timeout(Operation::List, self.inner.list(path, args)) |
| .await |
| .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) |
| } |
| |
| async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> { |
| self.timeout(Operation::Presign, self.inner.presign(path, args)) |
| .await |
| } |
| } |
| |
| pub struct TimeoutExecutor { |
| exec: Arc<dyn Execute>, |
| timeout: Duration, |
| } |
| |
| impl TimeoutExecutor { |
| pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self { |
| Self { exec, timeout } |
| } |
| } |
| |
| impl Execute for TimeoutExecutor { |
| fn execute(&self, f: BoxedStaticFuture<()>) { |
| self.exec.execute(f) |
| } |
| |
| fn timeout(&self) -> Option<BoxedStaticFuture<()>> { |
| Some(Box::pin(tokio::time::sleep(self.timeout))) |
| } |
| } |
| |
| pub struct TimeoutWrapper<R> { |
| inner: R, |
| |
| timeout: Duration, |
| } |
| |
| impl<R> TimeoutWrapper<R> { |
| fn new(inner: R, timeout: Duration) -> Self { |
| Self { inner, timeout } |
| } |
| |
| #[inline] |
| async fn io_timeout<F: Future<Output = Result<T>>, T>( |
| timeout: Duration, |
| op: &'static str, |
| fut: F, |
| ) -> Result<T> { |
| tokio::time::timeout(timeout, fut).await.map_err(|_| { |
| Error::new(ErrorKind::Unexpected, "io operation timeout reached") |
| .with_operation(op) |
| .with_context("timeout", timeout.as_secs_f64().to_string()) |
| .set_temporary() |
| })? |
| } |
| } |
| |
| impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { |
| async fn read(&mut self) -> Result<Buffer> { |
| let fut = self.inner.read(); |
| Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await |
| } |
| } |
| |
| impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { |
| async fn write(&mut self, bs: Buffer) -> Result<()> { |
| let fut = self.inner.write(bs); |
| Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await |
| } |
| |
| async fn close(&mut self) -> Result<Metadata> { |
| let fut = self.inner.close(); |
| Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await |
| } |
| |
| async fn abort(&mut self) -> Result<()> { |
| let fut = self.inner.abort(); |
| Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await |
| } |
| } |
| |
| impl<R: oio::List> oio::List for TimeoutWrapper<R> { |
| async fn next(&mut self) -> Result<Option<oio::Entry>> { |
| let fut = self.inner.next(); |
| Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await |
| } |
| } |
| |
| impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> { |
| fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { |
| self.inner.delete(path, args) |
| } |
| |
| async fn flush(&mut self) -> Result<usize> { |
| let fut = self.inner.flush(); |
| Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::future::pending; |
| use std::future::Future; |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| use futures::StreamExt; |
| use tokio::time::sleep; |
| use tokio::time::timeout; |
| |
| use crate::layers::TimeoutLayer; |
| use crate::layers::TypeEraseLayer; |
| use crate::raw::*; |
| use crate::*; |
| |
| #[derive(Debug, Clone, Default)] |
| struct MockService; |
| |
| impl Access for MockService { |
| type Reader = MockReader; |
| type Writer = (); |
| type Lister = MockLister; |
| type Deleter = (); |
| |
| fn info(&self) -> Arc<AccessorInfo> { |
| let am = AccessorInfo::default(); |
| am.set_native_capability(Capability { |
| read: true, |
| delete: true, |
| ..Default::default() |
| }); |
| |
| am.into() |
| } |
| |
| /// This function will build a reader that always return pending. |
| async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { |
| Ok((RpRead::new(), MockReader)) |
| } |
| |
| /// This function will never return. |
| async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { |
| sleep(Duration::from_secs(u64::MAX)).await; |
| |
| Ok((RpDelete::default(), ())) |
| } |
| |
| async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> { |
| Ok((RpList::default(), MockLister)) |
| } |
| } |
| |
| #[derive(Debug, Clone, Default)] |
| struct MockReader; |
| |
| impl oio::Read for MockReader { |
| fn read(&mut self) -> impl Future<Output = Result<Buffer>> { |
| pending() |
| } |
| } |
| |
| #[derive(Debug, Clone, Default)] |
| struct MockLister; |
| |
| impl oio::List for MockLister { |
| fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> { |
| pending() |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_operation_timeout() { |
| let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor; |
| let op = Operator::from_inner(acc) |
| .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1))); |
| |
| let fut = async { |
| let res = op.delete("test").await; |
| assert!(res.is_err()); |
| let err = res.unwrap_err(); |
| assert_eq!(err.kind(), ErrorKind::Unexpected); |
| assert!(err.to_string().contains("timeout")) |
| }; |
| |
| timeout(Duration::from_secs(2), fut) |
| .await |
| .expect("this test should not exceed 2 seconds") |
| } |
| |
| #[tokio::test] |
| async fn test_io_timeout() { |
| let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor; |
| let op = Operator::from_inner(acc) |
| .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1))); |
| |
| let reader = op.reader("test").await.unwrap(); |
| |
| let res = reader.read(0..4).await; |
| assert!(res.is_err()); |
| let err = res.unwrap_err(); |
| assert_eq!(err.kind(), ErrorKind::Unexpected); |
| assert!(err.to_string().contains("timeout")) |
| } |
| |
| #[tokio::test] |
| async fn test_list_timeout() { |
| let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor; |
| let op = Operator::from_inner(acc).layer( |
| TimeoutLayer::new() |
| .with_timeout(Duration::from_secs(1)) |
| .with_io_timeout(Duration::from_secs(1)), |
| ); |
| |
| let mut lister = op.lister("test").await.unwrap(); |
| |
| let res = lister.next().await.unwrap(); |
| assert!(res.is_err()); |
| let err = res.unwrap_err(); |
| assert_eq!(err.kind(), ErrorKind::Unexpected); |
| assert!(err.to_string().contains("timeout")) |
| } |
| |
| #[tokio::test] |
| async fn test_list_timeout_raw() { |
| use oio::List; |
| |
| let acc = MockService; |
| let timeout_layer = TimeoutLayer::new() |
| .with_timeout(Duration::from_secs(1)) |
| .with_io_timeout(Duration::from_secs(1)); |
| let timeout_acc = timeout_layer.layer(acc); |
| |
| let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default()) |
| .await |
| .unwrap(); |
| |
| let res = lister.next().await; |
| assert!(res.is_err()); |
| let err = res.unwrap_err(); |
| assert_eq!(err.kind(), ErrorKind::Unexpected); |
| assert!(err.to_string().contains("timeout")); |
| } |
| } |