| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! An object store implementation for Google Cloud Storage |
| use std::collections::BTreeSet; |
| use std::fs::File; |
| use std::io::BufReader; |
| use std::ops::Range; |
| |
| use async_trait::async_trait; |
| use bytes::Bytes; |
| use chrono::{DateTime, Utc}; |
| use futures::{stream::BoxStream, StreamExt, TryStreamExt}; |
| use percent_encoding::{percent_encode, NON_ALPHANUMERIC}; |
| use reqwest::header::RANGE; |
| use reqwest::{header, Client, Method, Response, StatusCode}; |
| use snafu::{ResultExt, Snafu}; |
| |
| use crate::util::format_http_range; |
| use crate::{ |
| oauth::OAuthProvider, |
| path::{Path, DELIMITER}, |
| token::TokenCache, |
| util::format_prefix, |
| GetResult, ListResult, ObjectMeta, ObjectStore, Result, |
| }; |
| |
| #[derive(Debug, Snafu)] |
| enum Error { |
| #[snafu(display("Unable to open service account file: {}", source))] |
| OpenCredentials { source: std::io::Error }, |
| |
| #[snafu(display("Unable to decode service account file: {}", source))] |
| DecodeCredentials { source: serde_json::Error }, |
| |
| #[snafu(display("Error performing list request: {}", source))] |
| ListRequest { source: reqwest::Error }, |
| |
| #[snafu(display("Error performing get request {}: {}", path, source))] |
| GetRequest { |
| source: reqwest::Error, |
| path: String, |
| }, |
| |
| #[snafu(display("Error performing delete request {}: {}", path, source))] |
| DeleteRequest { |
| source: reqwest::Error, |
| path: String, |
| }, |
| |
| #[snafu(display("Error performing copy request {}: {}", path, source))] |
| CopyRequest { |
| source: reqwest::Error, |
| path: String, |
| }, |
| |
| #[snafu(display("Error performing put request: {}", source))] |
| PutRequest { source: reqwest::Error }, |
| |
| #[snafu(display("Error decoding object size: {}", source))] |
| InvalidSize { source: std::num::ParseIntError }, |
| } |
| |
| impl From<Error> for super::Error { |
| fn from(err: Error) -> Self { |
| match err { |
| Error::GetRequest { source, path } |
| | Error::DeleteRequest { source, path } |
| | Error::CopyRequest { source, path } |
| if matches!(source.status(), Some(StatusCode::NOT_FOUND)) => |
| { |
| Self::NotFound { |
| path, |
| source: Box::new(source), |
| } |
| } |
| _ => Self::Generic { |
| store: "GCS", |
| source: Box::new(err), |
| }, |
| } |
| } |
| } |
| |
| /// A deserialized `service-account-********.json`-file. |
| #[derive(serde::Deserialize, Debug)] |
| struct ServiceAccountCredentials { |
| /// The private key in RSA format. |
| pub private_key: String, |
| |
| /// The email address associated with the service account. |
| pub client_email: String, |
| |
| /// Base URL for GCS |
| #[serde(default = "default_gcs_base_url")] |
| pub gcs_base_url: String, |
| |
| /// Disable oauth and use empty tokens. |
| #[serde(default = "default_disable_oauth")] |
| pub disable_oauth: bool, |
| } |
| |
| fn default_gcs_base_url() -> String { |
| "https://storage.googleapis.com".to_owned() |
| } |
| |
| fn default_disable_oauth() -> bool { |
| false |
| } |
| |
| #[derive(serde::Deserialize, Debug)] |
| #[serde(rename_all = "camelCase")] |
| struct ListResponse { |
| next_page_token: Option<String>, |
| #[serde(default)] |
| prefixes: Vec<String>, |
| #[serde(default)] |
| items: Vec<Object>, |
| } |
| |
| #[derive(serde::Deserialize, Debug)] |
| struct Object { |
| name: String, |
| size: String, |
| updated: DateTime<Utc>, |
| } |
| |
| /// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/). |
| #[derive(Debug)] |
| pub struct GoogleCloudStorage { |
| client: Client, |
| base_url: String, |
| |
| oauth_provider: Option<OAuthProvider>, |
| token_cache: TokenCache<String>, |
| |
| bucket_name: String, |
| bucket_name_encoded: String, |
| |
| // TODO: Hook this up in tests |
| max_list_results: Option<String>, |
| } |
| |
| impl std::fmt::Display for GoogleCloudStorage { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "GoogleCloudStorage({})", self.bucket_name) |
| } |
| } |
| |
| impl GoogleCloudStorage { |
| async fn get_token(&self) -> Result<String> { |
| if let Some(oauth_provider) = &self.oauth_provider { |
| Ok(self |
| .token_cache |
| .get_or_insert_with(|| oauth_provider.fetch_token(&self.client)) |
| .await?) |
| } else { |
| Ok("".to_owned()) |
| } |
| } |
| |
| fn object_url(&self, path: &Path) -> String { |
| let encoded = |
| percent_encoding::utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); |
| format!( |
| "{}/storage/v1/b/{}/o/{}", |
| self.base_url, self.bucket_name_encoded, encoded |
| ) |
| } |
| |
| /// Perform a get request <https://cloud.google.com/storage/docs/json_api/v1/objects/get> |
| async fn get_request( |
| &self, |
| path: &Path, |
| range: Option<Range<usize>>, |
| head: bool, |
| ) -> Result<Response> { |
| let token = self.get_token().await?; |
| let url = self.object_url(path); |
| |
| let mut builder = self.client.request(Method::GET, url); |
| |
| if let Some(range) = range { |
| builder = builder.header(RANGE, format_http_range(range)); |
| } |
| |
| let alt = match head { |
| true => "json", |
| false => "media", |
| }; |
| |
| let response = builder |
| .bearer_auth(token) |
| .query(&[("alt", alt)]) |
| .send() |
| .await |
| .context(GetRequestSnafu { |
| path: path.as_ref(), |
| })? |
| .error_for_status() |
| .context(GetRequestSnafu { |
| path: path.as_ref(), |
| })?; |
| |
| Ok(response) |
| } |
| |
| /// Perform a put request <https://cloud.google.com/storage/docs/json_api/v1/objects/insert> |
| async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> { |
| let token = self.get_token().await?; |
| let url = format!( |
| "{}/upload/storage/v1/b/{}/o", |
| self.base_url, self.bucket_name_encoded |
| ); |
| |
| self.client |
| .request(Method::POST, url) |
| .bearer_auth(token) |
| .header(header::CONTENT_TYPE, "application/octet-stream") |
| .header(header::CONTENT_LENGTH, payload.len()) |
| .query(&[("uploadType", "media"), ("name", path.as_ref())]) |
| .body(payload) |
| .send() |
| .await |
| .context(PutRequestSnafu)? |
| .error_for_status() |
| .context(PutRequestSnafu)?; |
| |
| Ok(()) |
| } |
| |
| /// Perform a delete request <https://cloud.google.com/storage/docs/json_api/v1/objects/delete> |
| async fn delete_request(&self, path: &Path) -> Result<()> { |
| let token = self.get_token().await?; |
| let url = self.object_url(path); |
| |
| let builder = self.client.request(Method::DELETE, url); |
| builder |
| .bearer_auth(token) |
| .send() |
| .await |
| .context(DeleteRequestSnafu { |
| path: path.as_ref(), |
| })? |
| .error_for_status() |
| .context(DeleteRequestSnafu { |
| path: path.as_ref(), |
| })?; |
| |
| Ok(()) |
| } |
| |
| /// Perform a copy request <https://cloud.google.com/storage/docs/json_api/v1/objects/copy> |
| async fn copy_request( |
| &self, |
| from: &Path, |
| to: &Path, |
| if_not_exists: bool, |
| ) -> Result<()> { |
| let token = self.get_token().await?; |
| |
| let source = |
| percent_encoding::utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC); |
| let destination = |
| percent_encoding::utf8_percent_encode(to.as_ref(), NON_ALPHANUMERIC); |
| let url = format!( |
| "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}", |
| self.base_url, |
| self.bucket_name_encoded, |
| source, |
| self.bucket_name_encoded, |
| destination |
| ); |
| |
| let mut builder = self.client.request(Method::POST, url); |
| |
| if if_not_exists { |
| builder = builder.query(&[("ifGenerationMatch", "0")]); |
| } |
| |
| builder |
| .bearer_auth(token) |
| .send() |
| .await |
| .context(CopyRequestSnafu { |
| path: from.as_ref(), |
| })? |
| .error_for_status() |
| .context(CopyRequestSnafu { |
| path: from.as_ref(), |
| })?; |
| |
| Ok(()) |
| } |
| |
| /// Perform a list request <https://cloud.google.com/storage/docs/json_api/v1/objects/list> |
| async fn list_request( |
| &self, |
| prefix: Option<&str>, |
| delimiter: bool, |
| page_token: Option<&str>, |
| ) -> Result<ListResponse> { |
| let token = self.get_token().await?; |
| |
| let url = format!( |
| "{}/storage/v1/b/{}/o", |
| self.base_url, self.bucket_name_encoded |
| ); |
| |
| let mut query = Vec::with_capacity(4); |
| if delimiter { |
| query.push(("delimiter", DELIMITER)) |
| } |
| |
| if let Some(prefix) = &prefix { |
| query.push(("prefix", prefix)) |
| } |
| |
| if let Some(page_token) = page_token { |
| query.push(("pageToken", page_token)) |
| } |
| |
| if let Some(max_results) = &self.max_list_results { |
| query.push(("maxResults", max_results)) |
| } |
| |
| let response: ListResponse = self |
| .client |
| .request(Method::GET, url) |
| .query(&query) |
| .bearer_auth(token) |
| .send() |
| .await |
| .context(ListRequestSnafu)? |
| .error_for_status() |
| .context(ListRequestSnafu)? |
| .json() |
| .await |
| .context(ListRequestSnafu)?; |
| |
| Ok(response) |
| } |
| |
| /// Perform a list operation automatically handling pagination |
| fn list_paginated( |
| &self, |
| prefix: Option<&Path>, |
| delimiter: bool, |
| ) -> Result<BoxStream<'_, Result<ListResponse>>> { |
| let prefix = format_prefix(prefix); |
| |
| enum ListState { |
| Start, |
| HasMore(String), |
| Done, |
| } |
| |
| Ok(futures::stream::unfold(ListState::Start, move |state| { |
| let prefix = prefix.clone(); |
| |
| async move { |
| let page_token = match &state { |
| ListState::Start => None, |
| ListState::HasMore(page_token) => Some(page_token.as_str()), |
| ListState::Done => { |
| return None; |
| } |
| }; |
| |
| let resp = match self |
| .list_request(prefix.as_deref(), delimiter, page_token) |
| .await |
| { |
| Ok(resp) => resp, |
| Err(e) => return Some((Err(e), state)), |
| }; |
| |
| let next_state = match &resp.next_page_token { |
| Some(token) => ListState::HasMore(token.clone()), |
| None => ListState::Done, |
| }; |
| |
| Some((Ok(resp), next_state)) |
| } |
| }) |
| .boxed()) |
| } |
| } |
| |
| #[async_trait] |
| impl ObjectStore for GoogleCloudStorage { |
| async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { |
| self.put_request(location, bytes).await |
| } |
| |
| async fn get(&self, location: &Path) -> Result<GetResult> { |
| let response = self.get_request(location, None, false).await?; |
| let stream = response |
| .bytes_stream() |
| .map_err(|source| crate::Error::Generic { |
| store: "GCS", |
| source: Box::new(source), |
| }) |
| .boxed(); |
| |
| Ok(GetResult::Stream(stream)) |
| } |
| |
| async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> { |
| let response = self.get_request(location, Some(range), false).await?; |
| Ok(response.bytes().await.context(GetRequestSnafu { |
| path: location.as_ref(), |
| })?) |
| } |
| |
| async fn head(&self, location: &Path) -> Result<ObjectMeta> { |
| let response = self.get_request(location, None, true).await?; |
| let object = response.json().await.context(GetRequestSnafu { |
| path: location.as_ref(), |
| })?; |
| convert_object_meta(&object) |
| } |
| |
| async fn delete(&self, location: &Path) -> Result<()> { |
| self.delete_request(location).await |
| } |
| |
| async fn list( |
| &self, |
| prefix: Option<&Path>, |
| ) -> Result<BoxStream<'_, Result<ObjectMeta>>> { |
| let stream = self |
| .list_paginated(prefix, false)? |
| .map_ok(|r| { |
| futures::stream::iter( |
| r.items.into_iter().map(|x| convert_object_meta(&x)), |
| ) |
| }) |
| .try_flatten() |
| .boxed(); |
| |
| Ok(stream) |
| } |
| |
| async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { |
| let mut stream = self.list_paginated(prefix, true)?; |
| |
| let mut common_prefixes = BTreeSet::new(); |
| let mut objects = Vec::new(); |
| |
| while let Some(result) = stream.next().await { |
| let response = result?; |
| |
| for p in response.prefixes { |
| common_prefixes.insert(Path::parse(p)?); |
| } |
| |
| objects.reserve(response.items.len()); |
| for object in &response.items { |
| objects.push(convert_object_meta(object)?); |
| } |
| } |
| |
| Ok(ListResult { |
| common_prefixes: common_prefixes.into_iter().collect(), |
| objects, |
| }) |
| } |
| |
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { |
| self.copy_request(from, to, false).await |
| } |
| |
| async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
| self.copy_request(from, to, true).await |
| } |
| } |
| |
| fn reader_credentials_file( |
| service_account_path: impl AsRef<std::path::Path>, |
| ) -> Result<ServiceAccountCredentials> { |
| let file = File::open(service_account_path).context(OpenCredentialsSnafu)?; |
| let reader = BufReader::new(file); |
| Ok(serde_json::from_reader(reader).context(DecodeCredentialsSnafu)?) |
| } |
| |
| /// Configure a connection to Google Cloud Storage. |
| pub fn new_gcs( |
| service_account_path: impl AsRef<std::path::Path>, |
| bucket_name: impl Into<String>, |
| ) -> Result<GoogleCloudStorage> { |
| let credentials = reader_credentials_file(service_account_path)?; |
| let client = Client::new(); |
| |
| // TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes |
| let scope = "https://www.googleapis.com/auth/devstorage.full_control"; |
| let audience = "https://www.googleapis.com/oauth2/v4/token".to_string(); |
| |
| let oauth_provider = (!credentials.disable_oauth) |
| .then(|| { |
| OAuthProvider::new( |
| credentials.client_email, |
| credentials.private_key, |
| scope.to_string(), |
| audience, |
| ) |
| }) |
| .transpose()?; |
| |
| let bucket_name = bucket_name.into(); |
| let encoded_bucket_name = |
| percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string(); |
| |
| // The cloud storage crate currently only supports authentication via |
| // environment variables. Set the environment variable explicitly so |
| // that we can optionally accept command line arguments instead. |
| Ok(GoogleCloudStorage { |
| client, |
| base_url: credentials.gcs_base_url, |
| oauth_provider, |
| token_cache: Default::default(), |
| bucket_name, |
| bucket_name_encoded: encoded_bucket_name, |
| max_list_results: None, |
| }) |
| } |
| |
| fn convert_object_meta(object: &Object) -> Result<ObjectMeta> { |
| let location = Path::parse(&object.name)?; |
| let last_modified = object.updated; |
| let size = object.size.parse().context(InvalidSizeSnafu)?; |
| |
| Ok(ObjectMeta { |
| location, |
| last_modified, |
| size, |
| }) |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use std::env; |
| |
| use bytes::Bytes; |
| |
| use crate::{ |
| tests::{ |
| get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, |
| put_get_delete_list, rename_and_copy, |
| }, |
| Error as ObjectStoreError, ObjectStore, |
| }; |
| |
| use super::*; |
| |
| const NON_EXISTENT_NAME: &str = "nonexistentname"; |
| |
| #[derive(Debug)] |
| struct GoogleCloudConfig { |
| bucket: String, |
| service_account: String, |
| } |
| |
| // Helper macro to skip tests if TEST_INTEGRATION and the GCP environment variables are not set. |
| macro_rules! maybe_skip_integration { |
| () => {{ |
| dotenv::dotenv().ok(); |
| |
| let required_vars = ["OBJECT_STORE_BUCKET", "GOOGLE_SERVICE_ACCOUNT"]; |
| let unset_vars: Vec<_> = required_vars |
| .iter() |
| .filter_map(|&name| match env::var(name) { |
| Ok(_) => None, |
| Err(_) => Some(name), |
| }) |
| .collect(); |
| let unset_var_names = unset_vars.join(", "); |
| |
| let force = std::env::var("TEST_INTEGRATION"); |
| |
| if force.is_ok() && !unset_var_names.is_empty() { |
| panic!( |
| "TEST_INTEGRATION is set, \ |
| but variable(s) {} need to be set", |
| unset_var_names |
| ) |
| } else if force.is_err() { |
| eprintln!( |
| "skipping Google Cloud integration test - set {}TEST_INTEGRATION to run", |
| if unset_var_names.is_empty() { |
| String::new() |
| } else { |
| format!("{} and ", unset_var_names) |
| } |
| ); |
| return; |
| } else { |
| GoogleCloudConfig { |
| bucket: env::var("OBJECT_STORE_BUCKET") |
| .expect("already checked OBJECT_STORE_BUCKET"), |
| service_account: env::var("GOOGLE_SERVICE_ACCOUNT") |
| .expect("already checked GOOGLE_SERVICE_ACCOUNT"), |
| } |
| } |
| }}; |
| } |
| |
| #[tokio::test] |
| async fn gcs_test() { |
| let config = maybe_skip_integration!(); |
| let integration = new_gcs(config.service_account, config.bucket).unwrap(); |
| |
| put_get_delete_list(&integration).await.unwrap(); |
| list_uses_directories_correctly(&integration).await.unwrap(); |
| list_with_delimiter(&integration).await.unwrap(); |
| rename_and_copy(&integration).await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_get_nonexistent_location() { |
| let config = maybe_skip_integration!(); |
| let integration = new_gcs(config.service_account, &config.bucket).unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.get(&location).await.unwrap_err(); |
| |
| assert!( |
| matches!(err, ObjectStoreError::NotFound { .. }), |
| "unexpected error type: {}", |
| err |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_get_nonexistent_bucket() { |
| let mut config = maybe_skip_integration!(); |
| config.bucket = NON_EXISTENT_NAME.into(); |
| let integration = new_gcs(config.service_account, &config.bucket).unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = get_nonexistent_object(&integration, Some(location)) |
| .await |
| .unwrap_err(); |
| |
| assert!( |
| matches!(err, ObjectStoreError::NotFound { .. }), |
| "unexpected error type: {}", |
| err |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_delete_nonexistent_location() { |
| let config = maybe_skip_integration!(); |
| let integration = new_gcs(config.service_account, &config.bucket).unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.delete(&location).await.unwrap_err(); |
| assert!( |
| matches!(err, ObjectStoreError::NotFound { .. }), |
| "unexpected error type: {}", |
| err |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_delete_nonexistent_bucket() { |
| let mut config = maybe_skip_integration!(); |
| config.bucket = NON_EXISTENT_NAME.into(); |
| let integration = new_gcs(config.service_account, &config.bucket).unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.delete(&location).await.unwrap_err(); |
| assert!( |
| matches!(err, ObjectStoreError::NotFound { .. }), |
| "unexpected error type: {}", |
| err |
| ); |
| } |
| |
| #[tokio::test] |
| async fn gcs_test_put_nonexistent_bucket() { |
| let mut config = maybe_skip_integration!(); |
| config.bucket = NON_EXISTENT_NAME.into(); |
| let integration = new_gcs(config.service_account, &config.bucket).unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| let data = Bytes::from("arbitrary data"); |
| |
| let err = integration |
| .put(&location, data) |
| .await |
| .unwrap_err() |
| .to_string(); |
| assert!( |
| err.contains( |
| "Error performing put request: HTTP status client error (404 Not Found)" |
| ), |
| "{}", |
| err |
| ) |
| } |
| } |