| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! An object store implementation for Google Cloud Storage |
| //! |
| //! ## Multipart uploads |
| //! |
| //! [Multipart uploads](https://cloud.google.com/storage/docs/multipart-uploads) |
| //! can be initiated with the [ObjectStore::put_multipart] method. If neither |
| //! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked, you may |
| //! have parts uploaded to GCS but not used, that you will be charged for. It is recommended |
| //! you configure a [lifecycle rule] to abort incomplete multipart uploads after a certain |
| //! period of time to avoid being charged for storing partial uploads. |
| //! |
| //! ## Using HTTP/2 |
| //! |
| //! Google Cloud Storage supports both HTTP/2 and HTTP/1. HTTP/1 is used by default |
| //! because it allows much higher throughput in our benchmarks (see |
| //! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be |
| //! enabled by setting [crate::ClientConfigKey::Http1Only] to false. |
| //! |
| //! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| use crate::client::CredentialProvider; |
| use crate::gcp::credential::GCSAuthorizer; |
| use crate::signer::Signer; |
| use crate::{ |
| multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, |
| ObjectMeta, ObjectStore, PutOptions, PutPayload, PutResult, Result, UploadPart, |
| }; |
| use async_trait::async_trait; |
| use client::GoogleCloudStorageClient; |
| use futures::stream::BoxStream; |
| use hyper::Method; |
| use url::Url; |
| |
| use crate::client::get::GetClientExt; |
| use crate::client::list::ListClientExt; |
| use crate::client::parts::Parts; |
| use crate::multipart::MultipartStore; |
| pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; |
| pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey}; |
| |
| mod builder; |
| mod client; |
| mod credential; |
| |
| const STORE: &str = "GCS"; |
| |
| /// [`CredentialProvider`] for [`GoogleCloudStorage`] |
| pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>; |
| |
| /// [`GcpSigningCredential`] for [`GoogleCloudStorage`] |
| pub type GcpSigningCredentialProvider = |
| Arc<dyn CredentialProvider<Credential = GcpSigningCredential>>; |
| |
| /// Interface for [Google Cloud Storage](https://cloud.google.com/storage/). |
| #[derive(Debug)] |
| pub struct GoogleCloudStorage { |
| client: Arc<GoogleCloudStorageClient>, |
| } |
| |
| impl std::fmt::Display for GoogleCloudStorage { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!( |
| f, |
| "GoogleCloudStorage({})", |
| self.client.config().bucket_name |
| ) |
| } |
| } |
| |
| impl GoogleCloudStorage { |
| /// Returns the [`GcpCredentialProvider`] used by [`GoogleCloudStorage`] |
| pub fn credentials(&self) -> &GcpCredentialProvider { |
| &self.client.config().credentials |
| } |
| |
| /// Returns the [`GcpSigningCredentialProvider`] used by [`GoogleCloudStorage`] |
| pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider { |
| &self.client.config().signing_credentials |
| } |
| } |
| |
| #[derive(Debug)] |
| struct GCSMultipartUpload { |
| state: Arc<UploadState>, |
| part_idx: usize, |
| } |
| |
| #[derive(Debug)] |
| struct UploadState { |
| client: Arc<GoogleCloudStorageClient>, |
| path: Path, |
| multipart_id: MultipartId, |
| parts: Parts, |
| } |
| |
| #[async_trait] |
| impl MultipartUpload for GCSMultipartUpload { |
| fn put_part(&mut self, payload: PutPayload) -> UploadPart { |
| let idx = self.part_idx; |
| self.part_idx += 1; |
| let state = Arc::clone(&self.state); |
| Box::pin(async move { |
| let part = state |
| .client |
| .put_part(&state.path, &state.multipart_id, idx, payload) |
| .await?; |
| state.parts.put(idx, part); |
| Ok(()) |
| }) |
| } |
| |
| async fn complete(&mut self) -> Result<PutResult> { |
| let parts = self.state.parts.finish(self.part_idx)?; |
| |
| self.state |
| .client |
| .multipart_complete(&self.state.path, &self.state.multipart_id, parts) |
| .await |
| } |
| |
| async fn abort(&mut self) -> Result<()> { |
| self.state |
| .client |
| .multipart_cleanup(&self.state.path, &self.state.multipart_id) |
| .await |
| } |
| } |
| |
| #[async_trait] |
| impl ObjectStore for GoogleCloudStorage { |
| async fn put_opts( |
| &self, |
| location: &Path, |
| payload: PutPayload, |
| opts: PutOptions, |
| ) -> Result<PutResult> { |
| self.client.put(location, payload, opts).await |
| } |
| |
| async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> { |
| let upload_id = self.client.multipart_initiate(location).await?; |
| |
| Ok(Box::new(GCSMultipartUpload { |
| part_idx: 0, |
| state: Arc::new(UploadState { |
| client: Arc::clone(&self.client), |
| path: location.clone(), |
| multipart_id: upload_id.clone(), |
| parts: Default::default(), |
| }), |
| })) |
| } |
| |
| async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> { |
| self.client.get_opts(location, options).await |
| } |
| |
| async fn delete(&self, location: &Path) -> Result<()> { |
| self.client.delete_request(location).await |
| } |
| |
| fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> { |
| self.client.list(prefix) |
| } |
| |
| fn list_with_offset( |
| &self, |
| prefix: Option<&Path>, |
| offset: &Path, |
| ) -> BoxStream<'_, Result<ObjectMeta>> { |
| self.client.list_with_offset(prefix, offset) |
| } |
| |
| async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { |
| self.client.list_with_delimiter(prefix).await |
| } |
| |
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { |
| self.client.copy_request(from, to, false).await |
| } |
| |
| async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
| self.client.copy_request(from, to, true).await |
| } |
| } |
| |
| #[async_trait] |
| impl MultipartStore for GoogleCloudStorage { |
| async fn create_multipart(&self, path: &Path) -> Result<MultipartId> { |
| self.client.multipart_initiate(path).await |
| } |
| |
| async fn put_part( |
| &self, |
| path: &Path, |
| id: &MultipartId, |
| part_idx: usize, |
| payload: PutPayload, |
| ) -> Result<PartId> { |
| self.client.put_part(path, id, part_idx, payload).await |
| } |
| |
| async fn complete_multipart( |
| &self, |
| path: &Path, |
| id: &MultipartId, |
| parts: Vec<PartId>, |
| ) -> Result<PutResult> { |
| self.client.multipart_complete(path, id, parts).await |
| } |
| |
| async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { |
| self.client.multipart_cleanup(path, id).await |
| } |
| } |
| |
| #[async_trait] |
| impl Signer for GoogleCloudStorage { |
| async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> { |
| if expires_in.as_secs() > 604800 { |
| return Err(crate::Error::Generic { |
| store: STORE, |
| source: "Expiration Time can't be longer than 604800 seconds (7 days).".into(), |
| }); |
| } |
| |
| let config = self.client.config(); |
| let path_url = config.path_url(path); |
| let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic { |
| store: STORE, |
| source: format!("Unable to parse url {path_url}: {e}").into(), |
| })?; |
| |
| let signing_credentials = self.signing_credentials().get_credential().await?; |
| let authorizer = GCSAuthorizer::new(signing_credentials); |
| |
| authorizer |
| .sign(method, &mut url, expires_in, &self.client) |
| .await?; |
| |
| Ok(url) |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| |
| use credential::DEFAULT_GCS_BASE_URL; |
| |
| use crate::tests::*; |
| |
| use super::*; |
| |
| const NON_EXISTENT_NAME: &str = "nonexistentname"; |
| |
| #[tokio::test] |
| async fn gcs_test() { |
| crate::test_util::maybe_skip_integration!(); |
| let integration = GoogleCloudStorageBuilder::from_env().build().unwrap(); |
| |
| put_get_delete_list(&integration).await; |
| list_uses_directories_correctly(&integration).await; |
| list_with_delimiter(&integration).await; |
| rename_and_copy(&integration).await; |
| if integration.client.config().base_url == DEFAULT_GCS_BASE_URL { |
| // Fake GCS server doesn't currently honor ifGenerationMatch |
| // https://github.com/fsouza/fake-gcs-server/issues/994 |
| copy_if_not_exists(&integration).await; |
| // Fake GCS server does not yet implement XML Multipart uploads |
| // https://github.com/fsouza/fake-gcs-server/issues/852 |
| stream_get(&integration).await; |
| multipart(&integration, &integration).await; |
| // Fake GCS server doesn't currently honor preconditions |
| get_opts(&integration).await; |
| put_opts(&integration, true).await; |
| } |
| } |
| |
| #[tokio::test] |
| #[ignore] |
| async fn gcs_test_sign() { |
| crate::test_util::maybe_skip_integration!(); |
| let integration = GoogleCloudStorageBuilder::from_env().build().unwrap(); |
| |
| let client = reqwest::Client::new(); |
| |
| let path = Path::from("test_sign"); |
| let url = integration |
| .signed_url(Method::PUT, &path, Duration::from_secs(3600)) |
| .await |
| .unwrap(); |
| println!("PUT {url}"); |
| |
| let resp = client.put(url).body("data").send().await.unwrap(); |
| resp.error_for_status().unwrap(); |
| |
| let url = integration |
| .signed_url(Method::GET, &path, Duration::from_secs(3600)) |
| .await |
| .unwrap(); |
| println!("GET {url}"); |
| |
| let resp = client.get(url).send().await.unwrap(); |
| let resp = resp.error_for_status().unwrap(); |
| let data = resp.bytes().await.unwrap(); |
| assert_eq!(data.as_ref(), b"data"); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_get_nonexistent_location() { |
| crate::test_util::maybe_skip_integration!(); |
| let integration = GoogleCloudStorageBuilder::from_env().build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.get(&location).await.unwrap_err(); |
| |
| assert!( |
| matches!(err, crate::Error::NotFound { .. }), |
| "unexpected error type: {err}" |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_get_nonexistent_bucket() { |
| crate::test_util::maybe_skip_integration!(); |
| let config = GoogleCloudStorageBuilder::from_env(); |
| let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = get_nonexistent_object(&integration, Some(location)) |
| .await |
| .unwrap_err(); |
| |
| assert!( |
| matches!(err, crate::Error::NotFound { .. }), |
| "unexpected error type: {err}" |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_delete_nonexistent_location() { |
| crate::test_util::maybe_skip_integration!(); |
| let integration = GoogleCloudStorageBuilder::from_env().build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.delete(&location).await.unwrap_err(); |
| assert!( |
| matches!(err, crate::Error::NotFound { .. }), |
| "unexpected error type: {err}" |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_delete_nonexistent_bucket() { |
| crate::test_util::maybe_skip_integration!(); |
| let config = GoogleCloudStorageBuilder::from_env(); |
| let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.delete(&location).await.unwrap_err(); |
| assert!( |
| matches!(err, crate::Error::NotFound { .. }), |
| "unexpected error type: {err}" |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_put_nonexistent_bucket() { |
| crate::test_util::maybe_skip_integration!(); |
| let config = GoogleCloudStorageBuilder::from_env(); |
| let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| let data = PutPayload::from("arbitrary data"); |
| |
| let err = integration |
| .put(&location, data) |
| .await |
| .unwrap_err() |
| .to_string(); |
| assert!( |
| err.contains("Client error with status 404 Not Found"), |
| "{}", |
| err |
| ) |
| } |
| } |