blob: 01e6b28b0ab8cda2c327d2c9e9e79140c7cabc19 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#[cfg(feature = "tokio")]
use std::task::{Context, Poll};
#[cfg(feature = "tokio")]
use crate::PutPayloadMut;
use crate::{PutPayload, PutResult, Result};
use async_trait::async_trait;
use futures_util::future::BoxFuture;
#[cfg(feature = "tokio")]
use tokio::task::JoinSet;
/// An upload part request
pub type UploadPart = BoxFuture<'static, Result<()>>;
/// A trait allowing writing an object in fixed size chunks
///
/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling
/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`]
/// may be polled in parallel, allowing for concurrent uploads.
///
/// Once all part uploads have been polled to completion, the upload can be completed by
/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible
/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`]
/// is called before all [`UploadPart`] have been polled to completion.
#[async_trait]
pub trait MultipartUpload: Send + std::fmt::Debug {
/// Upload the next part
///
/// Most stores require that all parts excluding the last are at least 5 MiB, and some
/// further require that all parts excluding the last be the same size, e.g. [R2].
/// Clients wanting to maximise compatibility should therefore perform writes in
/// fixed size blocks larger than 5 MiB.
///
/// Implementations may invoke this method multiple times and then await on the
/// returned futures in parallel
///
/// ```no_run
/// # use futures_util::StreamExt;
/// # use object_store::MultipartUpload;
/// #
/// # async fn test() {
/// #
/// let mut upload: Box<&dyn MultipartUpload> = todo!();
/// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
/// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
/// futures_util::future::try_join(p1, p2).await.unwrap();
/// upload.complete().await.unwrap();
/// # }
/// ```
///
/// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
fn put_part(&mut self, data: PutPayload) -> UploadPart;
/// Complete the multipart upload
///
/// It is implementation defined behaviour if this method is called before polling
/// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
/// it is implementation defined behaviour to call [`MultipartUpload::complete`]
/// on an already completed or aborted [`MultipartUpload`].
async fn complete(&mut self) -> Result<PutResult>;
/// Abort the multipart upload
///
/// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
/// some object stores will automatically clean up any previously uploaded parts.
/// However, some stores, such as S3 and GCS, cannot perform cleanup on drop.
/// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup.
///
/// It will not be possible to call `abort` in all failure scenarios, for example
/// non-graceful shutdown of the calling application. It is therefore recommended
/// object stores are configured with lifecycle rules to automatically cleanup
/// unused parts older than some threshold. See [crate::aws] and [crate::gcp]
/// for more information.
///
/// It is implementation defined behaviour to call [`MultipartUpload::abort`]
/// on an already completed or aborted [`MultipartUpload`]
async fn abort(&mut self) -> Result<()>;
}
#[async_trait]
impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
(**self).put_part(data)
}
async fn complete(&mut self) -> Result<PutResult> {
(**self).complete().await
}
async fn abort(&mut self) -> Result<()> {
(**self).abort().await
}
}
/// A synchronous write API for uploading data in parallel in fixed size chunks
///
/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel
///
/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`]
/// allowing back pressure on producers, prior to buffering the next part. However, unlike
/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers
///
/// [`Sink`]: futures_util::sink::Sink
#[cfg(feature = "tokio")]
#[derive(Debug)]
pub struct WriteMultipart {
upload: Box<dyn MultipartUpload>,
buffer: PutPayloadMut,
chunk_size: usize,
tasks: JoinSet<Result<()>>,
}
#[cfg(feature = "tokio")]
impl WriteMultipart {
/// Create a new [`WriteMultipart`] that will upload using 5MB chunks
pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
}
/// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
Self {
upload,
chunk_size,
buffer: PutPayloadMut::new(),
tasks: Default::default(),
}
}
/// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
///
/// See [`Self::wait_for_capacity`] for an async version of this function
pub fn poll_for_capacity(
&mut self,
cx: &mut Context<'_>,
max_concurrency: usize,
) -> Poll<Result<()>> {
while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
futures_core::ready!(self.tasks.poll_join_next(cx)).unwrap()??
}
Poll::Ready(Ok(()))
}
/// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
///
/// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
futures_util::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
}
/// Write data to this [`WriteMultipart`]
///
/// Data is buffered using [`PutPayloadMut::extend_from_slice`]. Implementations looking to
/// write data from owned buffers may prefer [`Self::put`] as this avoids copying.
///
/// Note this method is synchronous (not `async`) and will immediately
/// start new uploads as soon as the internal `chunk_size` is hit,
/// regardless of how many outstanding uploads are already in progress.
///
/// Back pressure can optionally be applied to producers by calling
/// [`Self::wait_for_capacity`] prior to calling this method
pub fn write(&mut self, mut buf: &[u8]) {
while !buf.is_empty() {
let remaining = self.chunk_size - self.buffer.content_length();
let to_read = buf.len().min(remaining);
self.buffer.extend_from_slice(&buf[..to_read]);
if to_read == remaining {
let buffer = std::mem::take(&mut self.buffer);
self.put_part(buffer.into())
}
buf = &buf[to_read..]
}
}
/// Put a chunk of data into this [`WriteMultipart`] without copying
///
/// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
/// perform writes from non-owned buffers should prefer [`Self::write`] as this
/// will allow multiple calls to share the same underlying allocation.
///
/// See [`Self::write`] for information on backpressure
pub fn put(&mut self, mut bytes: bytes::Bytes) {
while !bytes.is_empty() {
let remaining = self.chunk_size - self.buffer.content_length();
if bytes.len() < remaining {
self.buffer.push(bytes);
return;
}
self.buffer.push(bytes.split_to(remaining));
let buffer = std::mem::take(&mut self.buffer);
self.put_part(buffer.into())
}
}
pub(crate) fn put_part(&mut self, part: PutPayload) {
self.tasks.spawn(self.upload.put_part(part));
}
/// Abort this upload, attempting to clean up any successfully uploaded parts
pub async fn abort(mut self) -> Result<()> {
self.tasks.shutdown().await;
self.upload.abort().await
}
/// Flush final chunk, and await completion of all in-flight requests
pub async fn finish(mut self) -> Result<PutResult> {
if !self.buffer.is_empty() {
let part = std::mem::take(&mut self.buffer);
self.put_part(part.into())
}
self.wait_for_capacity(0).await?;
match self.upload.complete().await {
Err(e) => {
self.tasks.shutdown().await;
self.upload.abort().await?;
Err(e)
}
Ok(result) => Ok(result),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures_util::FutureExt;
use parking_lot::Mutex;
use rand::prelude::StdRng;
use rand::{RngExt, SeedableRng};
use crate::ObjectStoreExt;
use crate::memory::InMemory;
use crate::path::Path;
use crate::throttle::{ThrottleConfig, ThrottledStore};
use super::*;
#[tokio::test]
async fn test_concurrency() {
let config = ThrottleConfig {
wait_put_per_call: Duration::from_millis(1),
..Default::default()
};
let path = Path::from("foo");
let store = ThrottledStore::new(InMemory::new(), config);
let upload = store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new_with_chunk_size(upload, 10);
for _ in 0..20 {
write.write(&[0; 5]);
}
assert!(write.wait_for_capacity(10).now_or_never().is_none());
write.wait_for_capacity(10).await.unwrap()
}
#[derive(Debug, Default)]
struct InstrumentedUpload {
chunks: Arc<Mutex<Vec<PutPayload>>>,
}
#[async_trait]
impl MultipartUpload for InstrumentedUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
self.chunks.lock().push(data);
futures_util::future::ready(Ok(())).boxed()
}
async fn complete(&mut self) -> Result<PutResult> {
Ok(PutResult {
e_tag: None,
version: None,
})
}
async fn abort(&mut self) -> Result<()> {
unimplemented!()
}
}
#[tokio::test]
async fn test_write_multipart() {
let mut rng = StdRng::seed_from_u64(42);
for method in [0.0, 0.5, 1.0] {
for _ in 0..10 {
for chunk_size in [1, 17, 23] {
let upload = Box::<InstrumentedUpload>::default();
let chunks = Arc::clone(&upload.chunks);
let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);
let mut expected = Vec::with_capacity(1024);
for _ in 0..50 {
let chunk_size = rng.random_range(0..30);
let data: Vec<_> = (0..chunk_size).map(|_| rng.random()).collect();
expected.extend_from_slice(&data);
match rng.random_bool(method) {
true => write.put(data.into()),
false => write.write(&data),
}
}
write.finish().await.unwrap();
let chunks = chunks.lock();
let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
assert_eq!(expected, actual);
for chunk in chunks.iter().take(chunks.len() - 1) {
assert_eq!(chunk.content_length(), chunk_size)
}
let last_chunk = chunks.last().unwrap().content_length();
assert!(last_chunk <= chunk_size, "{chunk_size}");
}
}
}
}
}