blob: 84fb572bd77ba872b33fc9470def8c85bfad47a5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! An object store implementation for Google Cloud Storage
use std::collections::BTreeSet;
use std::fs::File;
use std::io::BufReader;
use std::ops::Range;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
use reqwest::header::RANGE;
use reqwest::{header, Client, Method, Response, StatusCode};
use snafu::{ResultExt, Snafu};
use crate::util::format_http_range;
use crate::{
oauth::OAuthProvider,
path::{Path, DELIMITER},
token::TokenCache,
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
};
#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Unable to open service account file: {}", source))]
OpenCredentials { source: std::io::Error },
#[snafu(display("Unable to decode service account file: {}", source))]
DecodeCredentials { source: serde_json::Error },
#[snafu(display("Error performing list request: {}", source))]
ListRequest { source: reqwest::Error },
#[snafu(display("Error performing get request {}: {}", path, source))]
GetRequest {
source: reqwest::Error,
path: String,
},
#[snafu(display("Error performing delete request {}: {}", path, source))]
DeleteRequest {
source: reqwest::Error,
path: String,
},
#[snafu(display("Error performing copy request {}: {}", path, source))]
CopyRequest {
source: reqwest::Error,
path: String,
},
#[snafu(display("Error performing put request: {}", source))]
PutRequest { source: reqwest::Error },
#[snafu(display("Error decoding object size: {}", source))]
InvalidSize { source: std::num::ParseIntError },
}
impl From<Error> for super::Error {
fn from(err: Error) -> Self {
match err {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::CopyRequest { source, path }
if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
{
Self::NotFound {
path,
source: Box::new(source),
}
}
_ => Self::Generic {
store: "GCS",
source: Box::new(err),
},
}
}
}
/// A deserialized `service-account-********.json`-file.
#[derive(serde::Deserialize, Debug)]
struct ServiceAccountCredentials {
/// The private key in RSA format.
pub private_key: String,
/// The email address associated with the service account.
pub client_email: String,
/// Base URL for GCS
#[serde(default = "default_gcs_base_url")]
pub gcs_base_url: String,
/// Disable oauth and use empty tokens.
#[serde(default = "default_disable_oauth")]
pub disable_oauth: bool,
}
fn default_gcs_base_url() -> String {
"https://storage.googleapis.com".to_owned()
}
fn default_disable_oauth() -> bool {
false
}
#[derive(serde::Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct ListResponse {
next_page_token: Option<String>,
#[serde(default)]
prefixes: Vec<String>,
#[serde(default)]
items: Vec<Object>,
}
#[derive(serde::Deserialize, Debug)]
struct Object {
name: String,
size: String,
updated: DateTime<Utc>,
}
/// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/).
#[derive(Debug)]
pub struct GoogleCloudStorage {
client: Client,
base_url: String,
oauth_provider: Option<OAuthProvider>,
token_cache: TokenCache<String>,
bucket_name: String,
bucket_name_encoded: String,
// TODO: Hook this up in tests
max_list_results: Option<String>,
}
impl std::fmt::Display for GoogleCloudStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GoogleCloudStorage({})", self.bucket_name)
}
}
impl GoogleCloudStorage {
async fn get_token(&self) -> Result<String> {
if let Some(oauth_provider) = &self.oauth_provider {
Ok(self
.token_cache
.get_or_insert_with(|| oauth_provider.fetch_token(&self.client))
.await?)
} else {
Ok("".to_owned())
}
}
fn object_url(&self, path: &Path) -> String {
let encoded =
percent_encoding::utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
format!(
"{}/storage/v1/b/{}/o/{}",
self.base_url, self.bucket_name_encoded, encoded
)
}
/// Perform a get request <https://cloud.google.com/storage/docs/json_api/v1/objects/get>
async fn get_request(
&self,
path: &Path,
range: Option<Range<usize>>,
head: bool,
) -> Result<Response> {
let token = self.get_token().await?;
let url = self.object_url(path);
let mut builder = self.client.request(Method::GET, url);
if let Some(range) = range {
builder = builder.header(RANGE, format_http_range(range));
}
let alt = match head {
true => "json",
false => "media",
};
let response = builder
.bearer_auth(token)
.query(&[("alt", alt)])
.send()
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
/// Perform a put request <https://cloud.google.com/storage/docs/json_api/v1/objects/insert>
async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
let token = self.get_token().await?;
let url = format!(
"{}/upload/storage/v1/b/{}/o",
self.base_url, self.bucket_name_encoded
);
self.client
.request(Method::POST, url)
.bearer_auth(token)
.header(header::CONTENT_TYPE, "application/octet-stream")
.header(header::CONTENT_LENGTH, payload.len())
.query(&[("uploadType", "media"), ("name", path.as_ref())])
.body(payload)
.send()
.await
.context(PutRequestSnafu)?
.error_for_status()
.context(PutRequestSnafu)?;
Ok(())
}
/// Perform a delete request <https://cloud.google.com/storage/docs/json_api/v1/objects/delete>
async fn delete_request(&self, path: &Path) -> Result<()> {
let token = self.get_token().await?;
let url = self.object_url(path);
let builder = self.client.request(Method::DELETE, url);
builder
.bearer_auth(token)
.send()
.await
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?;
Ok(())
}
/// Perform a copy request <https://cloud.google.com/storage/docs/json_api/v1/objects/copy>
async fn copy_request(
&self,
from: &Path,
to: &Path,
if_not_exists: bool,
) -> Result<()> {
let token = self.get_token().await?;
let source =
percent_encoding::utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
let destination =
percent_encoding::utf8_percent_encode(to.as_ref(), NON_ALPHANUMERIC);
let url = format!(
"{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
self.base_url,
self.bucket_name_encoded,
source,
self.bucket_name_encoded,
destination
);
let mut builder = self.client.request(Method::POST, url);
if if_not_exists {
builder = builder.query(&[("ifGenerationMatch", "0")]);
}
builder
.bearer_auth(token)
.send()
.await
.context(CopyRequestSnafu {
path: from.as_ref(),
})?
.error_for_status()
.context(CopyRequestSnafu {
path: from.as_ref(),
})?;
Ok(())
}
/// Perform a list request <https://cloud.google.com/storage/docs/json_api/v1/objects/list>
async fn list_request(
&self,
prefix: Option<&str>,
delimiter: bool,
page_token: Option<&str>,
) -> Result<ListResponse> {
let token = self.get_token().await?;
let url = format!(
"{}/storage/v1/b/{}/o",
self.base_url, self.bucket_name_encoded
);
let mut query = Vec::with_capacity(4);
if delimiter {
query.push(("delimiter", DELIMITER))
}
if let Some(prefix) = &prefix {
query.push(("prefix", prefix))
}
if let Some(page_token) = page_token {
query.push(("pageToken", page_token))
}
if let Some(max_results) = &self.max_list_results {
query.push(("maxResults", max_results))
}
let response: ListResponse = self
.client
.request(Method::GET, url)
.query(&query)
.bearer_auth(token)
.send()
.await
.context(ListRequestSnafu)?
.error_for_status()
.context(ListRequestSnafu)?
.json()
.await
.context(ListRequestSnafu)?;
Ok(response)
}
/// Perform a list operation automatically handling pagination
fn list_paginated(
&self,
prefix: Option<&Path>,
delimiter: bool,
) -> Result<BoxStream<'_, Result<ListResponse>>> {
let prefix = format_prefix(prefix);
enum ListState {
Start,
HasMore(String),
Done,
}
Ok(futures::stream::unfold(ListState::Start, move |state| {
let prefix = prefix.clone();
async move {
let page_token = match &state {
ListState::Start => None,
ListState::HasMore(page_token) => Some(page_token.as_str()),
ListState::Done => {
return None;
}
};
let resp = match self
.list_request(prefix.as_deref(), delimiter, page_token)
.await
{
Ok(resp) => resp,
Err(e) => return Some((Err(e), state)),
};
let next_state = match &resp.next_page_token {
Some(token) => ListState::HasMore(token.clone()),
None => ListState::Done,
};
Some((Ok(resp), next_state))
}
})
.boxed())
}
}
#[async_trait]
impl ObjectStore for GoogleCloudStorage {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.put_request(location, bytes).await
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let response = self.get_request(location, None, false).await?;
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
store: "GCS",
source: Box::new(source),
})
.boxed();
Ok(GetResult::Stream(stream))
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let response = self.get_request(location, Some(range), false).await?;
Ok(response.bytes().await.context(GetRequestSnafu {
path: location.as_ref(),
})?)
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let response = self.get_request(location, None, true).await?;
let object = response.json().await.context(GetRequestSnafu {
path: location.as_ref(),
})?;
convert_object_meta(&object)
}
async fn delete(&self, location: &Path) -> Result<()> {
self.delete_request(location).await
}
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.list_paginated(prefix, false)?
.map_ok(|r| {
futures::stream::iter(
r.items.into_iter().map(|x| convert_object_meta(&x)),
)
})
.try_flatten()
.boxed();
Ok(stream)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let mut stream = self.list_paginated(prefix, true)?;
let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
while let Some(result) = stream.next().await {
let response = result?;
for p in response.prefixes {
common_prefixes.insert(Path::parse(p)?);
}
objects.reserve(response.items.len());
for object in &response.items {
objects.push(convert_object_meta(object)?);
}
}
Ok(ListResult {
common_prefixes: common_prefixes.into_iter().collect(),
objects,
})
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.copy_request(from, to, false).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.copy_request(from, to, true).await
}
}
fn reader_credentials_file(
service_account_path: impl AsRef<std::path::Path>,
) -> Result<ServiceAccountCredentials> {
let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
let reader = BufReader::new(file);
Ok(serde_json::from_reader(reader).context(DecodeCredentialsSnafu)?)
}
/// Configure a connection to Google Cloud Storage.
pub fn new_gcs(
service_account_path: impl AsRef<std::path::Path>,
bucket_name: impl Into<String>,
) -> Result<GoogleCloudStorage> {
let credentials = reader_credentials_file(service_account_path)?;
let client = Client::new();
// TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes
let scope = "https://www.googleapis.com/auth/devstorage.full_control";
let audience = "https://www.googleapis.com/oauth2/v4/token".to_string();
let oauth_provider = (!credentials.disable_oauth)
.then(|| {
OAuthProvider::new(
credentials.client_email,
credentials.private_key,
scope.to_string(),
audience,
)
})
.transpose()?;
let bucket_name = bucket_name.into();
let encoded_bucket_name =
percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
// The cloud storage crate currently only supports authentication via
// environment variables. Set the environment variable explicitly so
// that we can optionally accept command line arguments instead.
Ok(GoogleCloudStorage {
client,
base_url: credentials.gcs_base_url,
oauth_provider,
token_cache: Default::default(),
bucket_name,
bucket_name_encoded: encoded_bucket_name,
max_list_results: None,
})
}
fn convert_object_meta(object: &Object) -> Result<ObjectMeta> {
let location = Path::parse(&object.name)?;
let last_modified = object.updated;
let size = object.size.parse().context(InvalidSizeSnafu)?;
Ok(ObjectMeta {
location,
last_modified,
size,
})
}
#[cfg(test)]
mod test {
use std::env;
use bytes::Bytes;
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy,
},
Error as ObjectStoreError, ObjectStore,
};
use super::*;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[derive(Debug)]
struct GoogleCloudConfig {
bucket: String,
service_account: String,
}
// Helper macro to skip tests if TEST_INTEGRATION and the GCP environment variables are not set.
macro_rules! maybe_skip_integration {
() => {{
dotenv::dotenv().ok();
let required_vars = ["OBJECT_STORE_BUCKET", "GOOGLE_SERVICE_ACCOUNT"];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = std::env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
} else if force.is_err() {
eprintln!(
"skipping Google Cloud integration test - set {}TEST_INTEGRATION to run",
if unset_var_names.is_empty() {
String::new()
} else {
format!("{} and ", unset_var_names)
}
);
return;
} else {
GoogleCloudConfig {
bucket: env::var("OBJECT_STORE_BUCKET")
.expect("already checked OBJECT_STORE_BUCKET"),
service_account: env::var("GOOGLE_SERVICE_ACCOUNT")
.expect("already checked GOOGLE_SERVICE_ACCOUNT"),
}
}
}};
}
#[tokio::test]
async fn gcs_test() {
let config = maybe_skip_integration!();
let integration = new_gcs(config.service_account, config.bucket).unwrap();
put_get_delete_list(&integration).await.unwrap();
list_uses_directories_correctly(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
rename_and_copy(&integration).await.unwrap();
}
#[tokio::test]
async fn gcs_test_get_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.get(&location).await.unwrap_err();
assert!(
matches!(err, ObjectStoreError::NotFound { .. }),
"unexpected error type: {}",
err
);
}
#[tokio::test]
async fn gcs_test_get_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
assert!(
matches!(err, ObjectStoreError::NotFound { .. }),
"unexpected error type: {}",
err
);
}
#[tokio::test]
async fn gcs_test_delete_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.delete(&location).await.unwrap_err();
assert!(
matches!(err, ObjectStoreError::NotFound { .. }),
"unexpected error type: {}",
err
);
}
#[tokio::test]
async fn gcs_test_delete_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.delete(&location).await.unwrap_err();
assert!(
matches!(err, ObjectStoreError::NotFound { .. }),
"unexpected error type: {}",
err
);
}
#[tokio::test]
async fn gcs_test_put_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let data = Bytes::from("arbitrary data");
let err = integration
.put(&location, data)
.await
.unwrap_err()
.to_string();
assert!(
err.contains(
"Error performing put request: HTTP status client error (404 Not Found)"
),
"{}",
err
)
}
}