blob: c4b63b417fe42c4ff941d591c97f86dfd853dd31 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{
cmp, fmt,
ops::AddAssign,
str::FromStr,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::Duration,
};
use arrow::array::{ArrayRef, RecordBatch, StringArray};
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use chrono::Utc;
use datafusion::{
common::{instant::Instant, HashMap},
error::DataFusionError,
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use futures::stream::BoxStream;
use object_store::{
path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
};
use parking_lot::{Mutex, RwLock};
use url::Url;
/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum InstrumentedObjectStoreMode {
/// Disable collection of profiling data
#[default]
Disabled,
/// Enable collection of profiling data and output a summary
Summary,
/// Enable collection of profiling data and output a summary and all details
Trace,
}
impl fmt::Display for InstrumentedObjectStoreMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
impl FromStr for InstrumentedObjectStoreMode {
type Err = DataFusionError;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"disabled" => Ok(Self::Disabled),
"summary" => Ok(Self::Summary),
"trace" => Ok(Self::Trace),
_ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
}
}
}
impl From<u8> for InstrumentedObjectStoreMode {
fn from(value: u8) -> Self {
match value {
1 => InstrumentedObjectStoreMode::Summary,
2 => InstrumentedObjectStoreMode::Trace,
_ => InstrumentedObjectStoreMode::Disabled,
}
}
}
/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
/// inner [`ObjectStore`]
#[derive(Debug)]
pub struct InstrumentedObjectStore {
inner: Arc<dyn ObjectStore>,
instrument_mode: AtomicU8,
requests: Mutex<Vec<RequestDetails>>,
}
impl InstrumentedObjectStore {
/// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
Self {
inner: object_store,
instrument_mode,
requests: Mutex::new(Vec::new()),
}
}
fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
self.instrument_mode.store(mode as u8, Ordering::Relaxed)
}
/// Returns all [`RequestDetails`] accumulated in this [`InstrumentedObjectStore`] and clears
/// the stored requests
pub fn take_requests(&self) -> Vec<RequestDetails> {
let mut req = self.requests.lock();
req.drain(..).collect()
}
fn enabled(&self) -> bool {
self.instrument_mode.load(Ordering::Relaxed)
!= InstrumentedObjectStoreMode::Disabled as u8
}
async fn instrumented_put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let timestamp = Utc::now();
let start = Instant::now();
let size = payload.content_length();
let ret = self.inner.put_opts(location, payload, opts).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::Put,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: Some(size),
range: None,
extra_display: None,
});
Ok(ret)
}
async fn instrumented_put_multipart(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
let timestamp = Utc::now();
let start = Instant::now();
let ret = self.inner.put_multipart_opts(location, opts).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::Put,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});
Ok(ret)
}
async fn instrumented_get_opts(
&self,
location: &Path,
options: GetOptions,
) -> Result<GetResult> {
let timestamp = Utc::now();
let range = options.range.clone();
let start = Instant::now();
let ret = self.inner.get_opts(location, options).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::Get,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: Some((ret.range.end - ret.range.start) as usize),
range,
extra_display: None,
});
Ok(ret)
}
async fn instrumented_delete(&self, location: &Path) -> Result<()> {
let timestamp = Utc::now();
let start = Instant::now();
self.inner.delete(location).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::Delete,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});
Ok(())
}
fn instrumented_list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
let timestamp = Utc::now();
let ret = self.inner.list(prefix);
self.requests.lock().push(RequestDetails {
op: Operation::List,
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
timestamp,
duration: None, // list returns a stream, so the duration isn't meaningful
size: None,
range: None,
extra_display: None,
});
ret
}
async fn instrumented_list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> Result<ListResult> {
let timestamp = Utc::now();
let start = Instant::now();
let ret = self.inner.list_with_delimiter(prefix).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::List,
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});
Ok(ret)
}
async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
let timestamp = Utc::now();
let start = Instant::now();
self.inner.copy(from, to).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::Copy,
path: from.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: Some(format!("copy_to: {to}")),
});
Ok(())
}
async fn instrumented_copy_if_not_exists(
&self,
from: &Path,
to: &Path,
) -> Result<()> {
let timestamp = Utc::now();
let start = Instant::now();
self.inner.copy_if_not_exists(from, to).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::Copy,
path: from.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: Some(format!("copy_to: {to}")),
});
Ok(())
}
async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
let timestamp = Utc::now();
let start = Instant::now();
let ret = self.inner.head(location).await?;
let elapsed = start.elapsed();
self.requests.lock().push(RequestDetails {
op: Operation::Head,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});
Ok(ret)
}
}
impl fmt::Display for InstrumentedObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mode: InstrumentedObjectStoreMode =
self.instrument_mode.load(Ordering::Relaxed).into();
write!(
f,
"Instrumented Object Store: instrument_mode: {mode}, inner: {}",
self.inner
)
}
}
#[async_trait]
impl ObjectStore for InstrumentedObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
if self.enabled() {
return self.instrumented_put_opts(location, payload, opts).await;
}
self.inner.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
if self.enabled() {
return self.instrumented_put_multipart(location, opts).await;
}
self.inner.put_multipart_opts(location, opts).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
if self.enabled() {
return self.instrumented_get_opts(location, options).await;
}
self.inner.get_opts(location, options).await
}
async fn delete(&self, location: &Path) -> Result<()> {
if self.enabled() {
return self.instrumented_delete(location).await;
}
self.inner.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
if self.enabled() {
return self.instrumented_list(prefix);
}
self.inner.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
if self.enabled() {
return self.instrumented_list_with_delimiter(prefix).await;
}
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
if self.enabled() {
return self.instrumented_copy(from, to).await;
}
self.inner.copy(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
if self.enabled() {
return self.instrumented_copy_if_not_exists(from, to).await;
}
self.inner.copy_if_not_exists(from, to).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
if self.enabled() {
return self.instrumented_head(location).await;
}
self.inner.head(location).await
}
}
/// Object store operation types tracked by [`InstrumentedObjectStore`]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum Operation {
Copy,
Delete,
Get,
Head,
List,
Put,
}
impl fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
/// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`]
#[derive(Debug)]
pub struct RequestDetails {
op: Operation,
path: Path,
timestamp: chrono::DateTime<Utc>,
duration: Option<Duration>,
size: Option<usize>,
range: Option<GetRange>,
extra_display: Option<String>,
}
impl fmt::Display for RequestDetails {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut output_parts = vec![format!(
"{} operation={:?}",
self.timestamp.to_rfc3339(),
self.op
)];
if let Some(d) = self.duration {
output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
}
if let Some(s) = self.size {
output_parts.push(format!("size={s}"));
}
if let Some(r) = &self.range {
output_parts.push(format!("range: {r}"));
}
output_parts.push(format!("path={}", self.path));
if let Some(ed) = &self.extra_display {
output_parts.push(ed.clone());
}
write!(f, "{}", output_parts.join(" "))
}
}
/// Summary statistics for all requests recorded in an [`InstrumentedObjectStore`]
#[derive(Default)]
pub struct RequestSummaries {
summaries: Vec<RequestSummary>,
}
/// Display the summary as a table
impl fmt::Display for RequestSummaries {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Don't expect an error, but avoid panicking if it happens
match pretty_format_batches(&[self.to_batch()]) {
Err(e) => {
write!(f, "Error formatting summary: {e}")
}
Ok(displayable) => {
write!(f, "{displayable}")
}
}
}
}
impl RequestSummaries {
/// Summarizes input [`RequestDetails`]
pub fn new(requests: &[RequestDetails]) -> Self {
let mut summaries: HashMap<Operation, RequestSummary> = HashMap::new();
for rd in requests {
match summaries.get_mut(&rd.op) {
Some(rs) => rs.push(rd),
None => {
let mut rs = RequestSummary::new(rd.op);
rs.push(rd);
summaries.insert(rd.op, rs);
}
}
}
// Convert to a Vec with consistent ordering
let mut summaries: Vec<RequestSummary> = summaries.into_values().collect();
summaries.sort_by_key(|s| s.operation);
Self { summaries }
}
/// Convert the summaries into a `RecordBatch` for display
///
/// Results in a table like:
/// ```text
/// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
/// | Operation | Metric | min | max | avg | sum | count |
/// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
/// | Get | duration | 5.000000s | 5.000000s | 5.000000s | | 1 |
/// | Get | size | 100 B | 100 B | 100 B | 100 B | 1 |
/// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
/// ```
pub fn to_batch(&self) -> RecordBatch {
let operations: StringArray = self
.iter()
.flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2))
.collect();
let metrics: StringArray = self
.iter()
.flat_map(|_s| [Some("duration"), Some("size")])
.collect();
let mins: StringArray = self
.stats_iter()
.flat_map(|(duration_stats, size_stats)| {
let dur_min =
duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32()));
let size_min = size_stats.map(|s| format!("{} B", s.min));
[dur_min, size_min]
})
.collect();
let maxs: StringArray = self
.stats_iter()
.flat_map(|(duration_stats, size_stats)| {
let dur_max =
duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32()));
let size_max = size_stats.map(|s| format!("{} B", s.max));
[dur_max, size_max]
})
.collect();
let avgs: StringArray = self
.iter()
.flat_map(|s| {
let count = s.count as f32;
let duration_stats = s.duration_stats.as_ref();
let size_stats = s.size_stats.as_ref();
let dur_avg = duration_stats.map(|d| {
let avg = d.sum.as_secs_f32() / count;
format!("{avg:.6}s")
});
let size_avg = size_stats.map(|s| {
let avg = s.sum as f32 / count;
format!("{avg} B")
});
[dur_avg, size_avg]
})
.collect();
let sums: StringArray = self
.stats_iter()
.flat_map(|(duration_stats, size_stats)| {
// Omit a sum stat for duration in the initial
// implementation because it can be a bit misleading (at least
// at first glance). For example, particularly large queries the
// sum of the durations was often larger than the total time of
// the query itself, can be confusing without additional
// explanation (e.g. that the sum is of individual requests,
// which may be concurrent).
let dur_sum =
duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32()));
let size_sum = size_stats.map(|s| format!("{} B", s.sum));
[dur_sum, size_sum]
})
.collect();
let counts: StringArray = self
.iter()
.flat_map(|s| {
let count = s.count.to_string();
[Some(count.clone()), Some(count)]
})
.collect();
RecordBatch::try_from_iter(vec![
("Operation", Arc::new(operations) as ArrayRef),
("Metric", Arc::new(metrics) as ArrayRef),
("min", Arc::new(mins) as ArrayRef),
("max", Arc::new(maxs) as ArrayRef),
("avg", Arc::new(avgs) as ArrayRef),
("sum", Arc::new(sums) as ArrayRef),
("count", Arc::new(counts) as ArrayRef),
])
.expect("Created the batch correctly")
}
/// Return an iterator over the summaries
fn iter(&self) -> impl Iterator<Item = &RequestSummary> {
self.summaries.iter()
}
/// Return an iterator over (duration_stats, size_stats) tuples
/// for each summary
fn stats_iter(
&self,
) -> impl Iterator<Item = (Option<&Stats<Duration>>, Option<&Stats<usize>>)> {
self.summaries
.iter()
.map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref()))
}
}
/// Summary statistics for a particular type of [`Operation`] (e.g. `GET` or `PUT`)
/// in an [`InstrumentedObjectStore`]'s [`RequestDetails`]
pub struct RequestSummary {
operation: Operation,
count: usize,
duration_stats: Option<Stats<Duration>>,
size_stats: Option<Stats<usize>>,
}
impl RequestSummary {
fn new(operation: Operation) -> Self {
Self {
operation,
count: 0,
duration_stats: None,
size_stats: None,
}
}
fn push(&mut self, request: &RequestDetails) {
self.count += 1;
if let Some(dur) = request.duration {
self.duration_stats.get_or_insert_default().push(dur)
}
if let Some(size) = request.size {
self.size_stats.get_or_insert_default().push(size)
}
}
}
struct Stats<T: Copy + Ord + AddAssign<T>> {
min: T,
max: T,
sum: T,
}
impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
fn push(&mut self, val: T) {
self.min = cmp::min(val, self.min);
self.max = cmp::max(val, self.max);
self.sum += val;
}
}
impl Default for Stats<Duration> {
fn default() -> Self {
Self {
min: Duration::MAX,
max: Duration::ZERO,
sum: Duration::ZERO,
}
}
}
impl Default for Stats<usize> {
fn default() -> Self {
Self {
min: usize::MAX,
max: usize::MIN,
sum: 0,
}
}
}
/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
#[derive(Debug)]
pub struct InstrumentedObjectStoreRegistry {
inner: Arc<dyn ObjectStoreRegistry>,
instrument_mode: AtomicU8,
stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
}
impl Default for InstrumentedObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
impl InstrumentedObjectStoreRegistry {
/// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
/// [`ObjectStoreRegistry`]
pub fn new() -> Self {
Self {
inner: Arc::new(DefaultObjectStoreRegistry::new()),
instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
stores: RwLock::new(Vec::new()),
}
}
pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
self.instrument_mode.store(mode as u8, Ordering::Relaxed);
self
}
/// Provides access to all of the [`InstrumentedObjectStore`]s managed by this
/// [`InstrumentedObjectStoreRegistry`]
pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
self.stores.read().clone()
}
/// Returns the current [`InstrumentedObjectStoreMode`] for this
/// [`InstrumentedObjectStoreRegistry`]
pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
self.instrument_mode.load(Ordering::Relaxed).into()
}
/// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`]
pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
self.instrument_mode.store(mode as u8, Ordering::Relaxed);
for s in self.stores.read().iter() {
s.set_instrument_mode(mode)
}
}
}
impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
fn register_store(
&self,
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let mode = self.instrument_mode.load(Ordering::Relaxed);
let instrumented =
Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
self.stores.write().push(Arc::clone(&instrumented));
self.inner.register_store(url, instrumented)
}
fn deregister_store(
&self,
url: &Url,
) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
self.inner.deregister_store(url)
}
fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
self.inner.get_store(url)
}
}
#[cfg(test)]
mod tests {
use object_store::WriteMultipart;
use super::*;
use insta::assert_snapshot;
#[test]
fn instrumented_mode() {
assert!(matches!(
InstrumentedObjectStoreMode::default(),
InstrumentedObjectStoreMode::Disabled
));
assert!(matches!(
"dIsABleD".parse().unwrap(),
InstrumentedObjectStoreMode::Disabled
));
assert!(matches!(
"SUmMaRy".parse().unwrap(),
InstrumentedObjectStoreMode::Summary
));
assert!(matches!(
"TRaCe".parse().unwrap(),
InstrumentedObjectStoreMode::Trace
));
assert!("does_not_exist"
.parse::<InstrumentedObjectStoreMode>()
.is_err());
assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary));
assert!(matches!(2.into(), InstrumentedObjectStoreMode::Trace));
assert!(matches!(3.into(), InstrumentedObjectStoreMode::Disabled));
}
#[test]
fn instrumented_registry() {
let mut reg = InstrumentedObjectStoreRegistry::new();
assert!(reg.stores().is_empty());
assert_eq!(
reg.instrument_mode(),
InstrumentedObjectStoreMode::default()
);
reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Trace);
assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Trace);
let store = object_store::memory::InMemory::new();
let url = "mem://test".parse().unwrap();
let registered = reg.register_store(&url, Arc::new(store));
assert!(registered.is_none());
let fetched = reg.get_store(&url);
assert!(fetched.is_ok());
assert_eq!(reg.stores().len(), 1);
}
// Returns an `InstrumentedObjectStore` with some data loaded for testing and the path to
// access the data
async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
let store = Arc::new(object_store::memory::InMemory::new());
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
let instrumented = InstrumentedObjectStore::new(store, mode);
// Load the test store with some data we can read
let path = Path::from("test/data");
let payload = PutPayload::from_static(b"test_data");
instrumented.put(&path, payload).await.unwrap();
(instrumented, path)
}
#[tokio::test]
async fn instrumented_store_get() {
let (instrumented, path) = setup_test_store().await;
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.get(&path).await.unwrap();
assert!(instrumented.requests.lock().is_empty());
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.get(&path).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Get);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert_eq!(request.size, Some(9));
assert_eq!(request.range, None);
assert!(request.extra_display.is_none());
}
#[tokio::test]
async fn instrumented_store_delete() {
let (instrumented, path) = setup_test_store().await;
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
instrumented.delete(&path).await.unwrap();
assert!(instrumented.requests.lock().is_empty());
// We need a new store so we have data to delete again
let (instrumented, path) = setup_test_store().await;
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
instrumented.delete(&path).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Delete);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}
#[tokio::test]
async fn instrumented_store_list() {
let (instrumented, path) = setup_test_store().await;
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list(Some(&path));
assert!(instrumented.requests.lock().is_empty());
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list(Some(&path));
assert_eq!(instrumented.requests.lock().len(), 1);
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::List);
assert_eq!(request.path, path);
assert!(request.duration.is_none());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}
#[tokio::test]
async fn instrumented_store_list_with_delimiter() {
let (instrumented, path) = setup_test_store().await;
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
assert!(instrumented.requests.lock().is_empty());
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::List);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}
#[tokio::test]
async fn instrumented_store_put_opts() {
// The `setup_test_store()` method comes with data already `put` into it, so we'll setup
// manually for this test
let store = Arc::new(object_store::memory::InMemory::new());
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
let instrumented = InstrumentedObjectStore::new(store, mode);
let path = Path::from("test/data");
let payload = PutPayload::from_static(b"test_data");
let size = payload.content_length();
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
instrumented.put(&path, payload.clone()).await.unwrap();
assert!(instrumented.requests.lock().is_empty());
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
instrumented.put(&path, payload).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::Put);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert_eq!(request.size.unwrap(), size);
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}
#[tokio::test]
async fn instrumented_store_put_multipart() {
// The `setup_test_store()` method comes with data already `put` into it, so we'll setup
// manually for this test
let store = Arc::new(object_store::memory::InMemory::new());
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
let instrumented = InstrumentedObjectStore::new(store, mode);
let path = Path::from("test/data");
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let mp = instrumented.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(mp);
write.write(b"test_data");
write.finish().await.unwrap();
assert!(instrumented.requests.lock().is_empty());
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let mp = instrumented.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(mp);
write.write(b"test_data");
write.finish().await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::Put);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}
#[tokio::test]
async fn instrumented_store_copy() {
let (instrumented, path) = setup_test_store().await;
let copy_to = Path::from("test/copied");
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
instrumented.copy(&path, &copy_to).await.unwrap();
assert!(instrumented.requests.lock().is_empty());
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
instrumented.copy(&path, &copy_to).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Copy);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert_eq!(
request.extra_display.unwrap(),
format!("copy_to: {copy_to}")
);
}
#[tokio::test]
async fn instrumented_store_copy_if_not_exists() {
let (instrumented, path) = setup_test_store().await;
let mut copy_to = Path::from("test/copied");
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
instrumented
.copy_if_not_exists(&path, &copy_to)
.await
.unwrap();
assert!(instrumented.requests.lock().is_empty());
// Use a new destination since the previous one already exists
copy_to = Path::from("test/copied_again");
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
instrumented
.copy_if_not_exists(&path, &copy_to)
.await
.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Copy);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert_eq!(
request.extra_display.unwrap(),
format!("copy_to: {copy_to}")
);
}
#[tokio::test]
async fn instrumented_store_head() {
let (instrumented, path) = setup_test_store().await;
// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.head(&path).await.unwrap();
assert!(instrumented.requests.lock().is_empty());
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.head(&path).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);
let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Head);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}
#[test]
fn request_details() {
let rd = RequestDetails {
op: Operation::Get,
path: Path::from("test"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: Some(Duration::new(5, 0)),
size: Some(10),
range: Some((..10).into()),
extra_display: Some(String::from("extra info")),
};
assert_eq!(
format!("{rd}"),
"1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
);
}
#[test]
fn request_summary() {
// Test empty request list
let mut requests = Vec::new();
assert_snapshot!(RequestSummaries::new(&requests), @r"
+-----------+--------+-----+-----+-----+-----+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+--------+-----+-----+-----+-----+-------+
+-----------+--------+-----+-----+-----+-----+-------+
");
requests.push(RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: Some(Duration::from_secs(5)),
size: Some(100),
range: None,
extra_display: None,
});
assert_snapshot!(RequestSummaries::new(&requests), @r"
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Get | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1 |
| Get | size | 100 B | 100 B | 100 B | 100 B | 1 |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
");
// Add more Get requests to test aggregation
requests.push(RequestDetails {
op: Operation::Get,
path: Path::from("test2"),
timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
duration: Some(Duration::from_secs(8)),
size: Some(150),
range: None,
extra_display: None,
});
requests.push(RequestDetails {
op: Operation::Get,
path: Path::from("test3"),
timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
duration: Some(Duration::from_secs(2)),
size: Some(50),
range: None,
extra_display: None,
});
assert_snapshot!(RequestSummaries::new(&requests), @r"
+-----------+----------+-----------+-----------+-----------+------------+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+----------+-----------+-----------+-----------+------------+-------+
| Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
| Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
+-----------+----------+-----------+-----------+-----------+------------+-------+
");
// Add Put requests to test grouping
requests.push(RequestDetails {
op: Operation::Put,
path: Path::from("test4"),
timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
duration: Some(Duration::from_millis(200)),
size: Some(75),
range: None,
extra_display: None,
});
assert_snapshot!(RequestSummaries::new(&requests), @r"
+-----------+----------+-----------+-----------+-----------+------------+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+----------+-----------+-----------+-----------+------------+-------+
| Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
| Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
| Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
| Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
+-----------+----------+-----------+-----------+-----------+------------+-------+
");
}
#[test]
fn request_summary_only_duration() {
// Test request with only duration (no size)
let only_duration = vec![RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: Some(Duration::from_secs(3)),
size: None,
range: None,
extra_display: None,
}];
assert_snapshot!(RequestSummaries::new(&only_duration), @r"
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Get | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1 |
| Get | size | | | | | 1 |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
");
}
#[test]
fn request_summary_only_size() {
// Test request with only size (no duration)
let only_size = vec![RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: None,
size: Some(200),
range: None,
extra_display: None,
}];
assert_snapshot!(RequestSummaries::new(&only_size), @r"
+-----------+----------+-------+-------+-------+-------+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+----------+-------+-------+-------+-------+-------+
| Get | duration | | | | | 1 |
| Get | size | 200 B | 200 B | 200 B | 200 B | 1 |
+-----------+----------+-------+-------+-------+-------+-------+
");
}
#[test]
fn request_summary_neither_duration_or_size() {
// Test request with neither duration nor size
let no_stats = vec![RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: None,
size: None,
range: None,
extra_display: None,
}];
assert_snapshot!(RequestSummaries::new(&no_stats), @r"
+-----------+----------+-----+-----+-----+-----+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+----------+-----+-----+-----+-----+-------+
| Get | duration | | | | | 1 |
| Get | size | | | | | 1 |
+-----------+----------+-----+-----+-----+-----+-------+
");
}
}