blob: d4704c7e328d1485712e785096b147bf18d937cb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::Debug;
use std::fmt::Display;
use std::sync::Arc;
use log::log;
use log::Level;
use crate::raw::*;
use crate::*;
/// Add [log](https://docs.rs/log/) for every operation.
///
/// # Logging
///
/// - OpenDAL will log in structural way.
/// - Every operation will start with a `started` log entry.
/// - Every operation will finish with the following status:
/// - `succeeded`: the operation is successful, but might have more to take.
/// - `finished`: the whole operation is finished.
/// - `failed`: the operation returns an unexpected error.
/// - The default log level while expected error happened is `Warn`.
/// - The default log level while unexpected failure happened is `Error`.
///
/// # Examples
///
/// ```no_run
/// # use opendal::layers::LoggingLayer;
/// # use opendal::services;
/// # use opendal::Operator;
/// # use opendal::Result;
/// # use opendal::Scheme;
///
/// # fn main() -> Result<()> {
/// let _ = Operator::new(services::Memory::default())?
/// .layer(LoggingLayer::default())
/// .finish();
/// Ok(())
/// # }
/// ```
///
/// # Output
///
/// OpenDAL is using [`log`](https://docs.rs/log/latest/log/) for logging internally.
///
/// To enable logging output, please set `RUST_LOG`:
///
/// ```shell
/// RUST_LOG=debug ./app
/// ```
///
/// To config logging output, please refer to [Configure Logging](https://rust-lang-nursery.github.io/rust-cookbook/development_tools/debugging/config_log.html):
///
/// ```shell
/// RUST_LOG="info,opendal::services=debug" ./app
/// ```
///
/// # Logging Interceptor
///
/// You can implement your own logging interceptor to customize the logging behavior.
///
/// ```no_run
/// # use opendal::layers::LoggingInterceptor;
/// # use opendal::layers::LoggingLayer;
/// # use opendal::raw;
/// # use opendal::services;
/// # use opendal::Error;
/// # use opendal::Operator;
/// # use opendal::Result;
/// # use opendal::Scheme;
///
/// #[derive(Debug, Clone)]
/// struct MyLoggingInterceptor;
///
/// impl LoggingInterceptor for MyLoggingInterceptor {
/// fn log(
/// &self,
/// info: &raw::AccessorInfo,
/// operation: raw::Operation,
/// context: &[(&str, &str)],
/// message: &str,
/// err: Option<&Error>,
/// ) {
/// // log something
/// }
/// }
///
/// # fn main() -> Result<()> {
/// let _ = Operator::new(services::Memory::default())?
/// .layer(LoggingLayer::new(MyLoggingInterceptor))
/// .finish();
/// Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
logger: I,
}
impl Default for LoggingLayer {
fn default() -> Self {
Self {
logger: DefaultLoggingInterceptor,
}
}
}
impl LoggingLayer {
/// Create the layer with specific logging interceptor.
pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
LoggingLayer { logger }
}
}
impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
type LayeredAccess = LoggingAccessor<A, I>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
let info = inner.info();
LoggingAccessor {
inner,
info,
logger: self.logger.clone(),
}
}
}
/// LoggingInterceptor is used to intercept the log.
pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
/// Everytime there is a log, this function will be called.
///
/// # Inputs
///
/// - info: The service's access info.
/// - operation: The operation to log.
/// - context: Additional context of the log like path, etc.
/// - message: The log message.
/// - err: The error to log.
///
/// # Note
///
/// Users should avoid calling resource-intensive operations such as I/O or network
/// functions here, especially anything that takes longer than 10ms. Otherwise, Opendal
/// could perform unexpectedly slow.
fn log(
&self,
info: &AccessorInfo,
operation: Operation,
context: &[(&str, &str)],
message: &str,
err: Option<&Error>,
);
}
/// The DefaultLoggingInterceptor will log the message by the standard logging macro.
#[derive(Debug, Copy, Clone, Default)]
pub struct DefaultLoggingInterceptor;
impl LoggingInterceptor for DefaultLoggingInterceptor {
#[inline]
fn log(
&self,
info: &AccessorInfo,
operation: Operation,
context: &[(&str, &str)],
message: &str,
err: Option<&Error>,
) {
if let Some(err) = err {
// Print error if it's unexpected, otherwise in warn.
let lvl = if err.kind() == ErrorKind::Unexpected {
Level::Error
} else {
Level::Warn
};
log!(
target: LOGGING_TARGET,
lvl,
"service={} name={}{}: {operation} {message} {}",
info.scheme(),
info.name(),
LoggingContext(context),
// Print error message with debug output while unexpected happened.
//
// It's super sad that we can't bind `format_args!()` here.
// See: https://github.com/rust-lang/rust/issues/92698
if err.kind() != ErrorKind::Unexpected {
format!("{err}")
} else {
format!("{err:?}")
}
);
}
log!(
target: LOGGING_TARGET,
Level::Debug,
"service={} name={}{}: {operation} {message}",
info.scheme(),
info.name(),
LoggingContext(context),
);
}
}
struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
impl Display for LoggingContext<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (k, v) in self.0.iter() {
write!(f, " {k}={v}")?;
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
inner: A,
info: Arc<AccessorInfo>,
logger: I,
}
static LOGGING_TARGET: &str = "opendal::services";
impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
type Inner = A;
type Reader = LoggingReader<A::Reader, I>;
type Writer = LoggingWriter<A::Writer, I>;
type Lister = LoggingLister<A::Lister, I>;
type Deleter = LoggingDeleter<A::Deleter, I>;
fn inner(&self) -> &Self::Inner {
&self.inner
}
fn info(&self) -> Arc<AccessorInfo> {
self.info.clone()
}
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.logger.log(
&self.info,
Operation::CreateDir,
&[("path", path)],
"started",
None,
);
self.inner
.create_dir(path, args)
.await
.inspect(|_| {
self.logger.log(
&self.info,
Operation::CreateDir,
&[("path", path)],
"finished",
None,
);
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::CreateDir,
&[("path", path)],
"failed",
Some(err),
);
})
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.logger.log(
&self.info,
Operation::Read,
&[("path", path)],
"started",
None,
);
self.inner
.read(path, args)
.await
.map(|(rp, r)| {
self.logger.log(
&self.info,
Operation::Read,
&[("path", path)],
"created reader",
None,
);
(
rp,
LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
)
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::Read,
&[("path", path)],
"failed",
Some(err),
);
})
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.logger.log(
&self.info,
Operation::Write,
&[("path", path)],
"started",
None,
);
self.inner
.write(path, args)
.await
.map(|(rp, w)| {
self.logger.log(
&self.info,
Operation::Write,
&[("path", path)],
"created writer",
None,
);
let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
(rp, w)
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::Write,
&[("path", path)],
"failed",
Some(err),
);
})
}
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.logger.log(
&self.info,
Operation::Copy,
&[("from", from), ("to", to)],
"started",
None,
);
self.inner
.copy(from, to, args)
.await
.inspect(|_| {
self.logger.log(
&self.info,
Operation::Copy,
&[("from", from), ("to", to)],
"finished",
None,
);
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::Copy,
&[("from", from), ("to", to)],
"failed",
Some(err),
);
})
}
async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
self.logger.log(
&self.info,
Operation::Rename,
&[("from", from), ("to", to)],
"started",
None,
);
self.inner
.rename(from, to, args)
.await
.inspect(|_| {
self.logger.log(
&self.info,
Operation::Rename,
&[("from", from), ("to", to)],
"finished",
None,
);
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::Rename,
&[("from", from), ("to", to)],
"failed",
Some(err),
);
})
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.logger.log(
&self.info,
Operation::Stat,
&[("path", path)],
"started",
None,
);
self.inner
.stat(path, args)
.await
.inspect(|_| {
self.logger.log(
&self.info,
Operation::Stat,
&[("path", path)],
"finished",
None,
);
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::Stat,
&[("path", path)],
"failed",
Some(err),
);
})
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.logger
.log(&self.info, Operation::Delete, &[], "started", None);
self.inner
.delete()
.await
.map(|(rp, d)| {
self.logger
.log(&self.info, Operation::Delete, &[], "finished", None);
let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
(rp, d)
})
.inspect_err(|err| {
self.logger
.log(&self.info, Operation::Delete, &[], "failed", Some(err));
})
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.logger.log(
&self.info,
Operation::List,
&[("path", path)],
"started",
None,
);
self.inner
.list(path, args)
.await
.map(|(rp, v)| {
self.logger.log(
&self.info,
Operation::List,
&[("path", path)],
"created lister",
None,
);
let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
(rp, streamer)
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::List,
&[("path", path)],
"failed",
Some(err),
);
})
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.logger.log(
&self.info,
Operation::Presign,
&[("path", path)],
"started",
None,
);
self.inner
.presign(path, args)
.await
.inspect(|_| {
self.logger.log(
&self.info,
Operation::Presign,
&[("path", path)],
"finished",
None,
);
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::Presign,
&[("path", path)],
"failed",
Some(err),
);
})
}
}
pub struct LoggingReader<R, I: LoggingInterceptor> {
info: Arc<AccessorInfo>,
logger: I,
path: String,
read: u64,
inner: R,
}
impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
Self {
info,
logger,
path: path.to_string(),
read: 0,
inner: reader,
}
}
}
impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
async fn read(&mut self) -> Result<Buffer> {
match self.inner.read().await {
Ok(bs) if bs.is_empty() => {
self.logger.log(
&self.info,
Operation::Read,
&[
("path", &self.path),
("read", &self.read.to_string()),
("size", &bs.len().to_string()),
],
"finished",
None,
);
Ok(bs)
}
Ok(bs) => {
self.read += bs.len() as u64;
Ok(bs)
}
Err(err) => {
self.logger.log(
&self.info,
Operation::Read,
&[("path", &self.path), ("read", &self.read.to_string())],
"failed",
Some(&err),
);
Err(err)
}
}
}
}
pub struct LoggingWriter<W, I> {
info: Arc<AccessorInfo>,
logger: I,
path: String,
written: u64,
inner: W,
}
impl<W, I> LoggingWriter<W, I> {
fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
Self {
info,
logger,
path: path.to_string(),
written: 0,
inner: writer,
}
}
}
impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
match self.inner.write(bs).await {
Ok(_) => {
self.written += size as u64;
Ok(())
}
Err(err) => {
self.logger.log(
&self.info,
Operation::Write,
&[
("path", &self.path),
("written", &self.written.to_string()),
("size", &size.to_string()),
],
"failed",
Some(&err),
);
Err(err)
}
}
}
async fn abort(&mut self) -> Result<()> {
match self.inner.abort().await {
Ok(_) => {
self.logger.log(
&self.info,
Operation::Write,
&[("path", &self.path), ("written", &self.written.to_string())],
"abort succeeded",
None,
);
Ok(())
}
Err(err) => {
self.logger.log(
&self.info,
Operation::Write,
&[("path", &self.path), ("written", &self.written.to_string())],
"abort failed",
Some(&err),
);
Err(err)
}
}
}
async fn close(&mut self) -> Result<Metadata> {
match self.inner.close().await {
Ok(meta) => {
self.logger.log(
&self.info,
Operation::Write,
&[("path", &self.path), ("written", &self.written.to_string())],
"close succeeded",
None,
);
Ok(meta)
}
Err(err) => {
self.logger.log(
&self.info,
Operation::Write,
&[("path", &self.path), ("written", &self.written.to_string())],
"close failed",
Some(&err),
);
Err(err)
}
}
}
}
pub struct LoggingLister<P, I: LoggingInterceptor> {
info: Arc<AccessorInfo>,
logger: I,
path: String,
listed: usize,
inner: P,
}
impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
Self {
info,
logger,
path: path.to_string(),
listed: 0,
inner,
}
}
}
impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
let res = self.inner.next().await;
match &res {
Ok(Some(_)) => {
self.listed += 1;
}
Ok(None) => {
self.logger.log(
&self.info,
Operation::List,
&[("path", &self.path), ("listed", &self.listed.to_string())],
"finished",
None,
);
}
Err(err) => {
self.logger.log(
&self.info,
Operation::List,
&[("path", &self.path), ("listed", &self.listed.to_string())],
"failed",
Some(err),
);
}
};
res
}
}
pub struct LoggingDeleter<D, I: LoggingInterceptor> {
info: Arc<AccessorInfo>,
logger: I,
queued: usize,
deleted: usize,
inner: D,
}
impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
Self {
info,
logger,
queued: 0,
deleted: 0,
inner,
}
}
}
impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
let version = args
.version()
.map(|v| v.to_string())
.unwrap_or_else(|| "<latest>".to_string());
let res = self.inner.delete(path, args);
match &res {
Ok(_) => {
self.queued += 1;
}
Err(err) => {
self.logger.log(
&self.info,
Operation::Delete,
&[
("path", path),
("version", &version),
("queued", &self.queued.to_string()),
("deleted", &self.deleted.to_string()),
],
"failed",
Some(err),
);
}
};
res
}
async fn flush(&mut self) -> Result<usize> {
let res = self.inner.flush().await;
match &res {
Ok(flushed) => {
self.queued -= flushed;
self.deleted += flushed;
self.logger.log(
&self.info,
Operation::Delete,
&[
("queued", &self.queued.to_string()),
("deleted", &self.deleted.to_string()),
],
"succeeded",
None,
);
}
Err(err) => {
self.logger.log(
&self.info,
Operation::Delete,
&[
("queued", &self.queued.to_string()),
("deleted", &self.deleted.to_string()),
],
"failed",
Some(err),
);
}
};
res
}
}