blob: 5f4327982cd030dacf0bc4c64932733dc030f404 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! An object store implementation for Azure blob storage
use crate::{
path::{Path, DELIMITER},
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
use azure_core::{prelude::*, HttpClient};
use azure_storage::core::prelude::{AsStorageClient, StorageAccountClient};
use azure_storage_blobs::blob::responses::ListBlobsResponse;
use azure_storage_blobs::blob::Blob;
use azure_storage_blobs::{
prelude::{AsBlobClient, AsContainerClient, ContainerClient},
DeleteSnapshotsMethod,
};
use bytes::Bytes;
use futures::{
stream::{self, BoxStream},
StreamExt, TryStreamExt,
};
use snafu::{ResultExt, Snafu};
use std::collections::BTreeSet;
use std::{convert::TryInto, sync::Arc};
/// A specialized `Error` for Azure object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display(
"Unable to DELETE data. Container: {}, Location: {}, Error: {} ({:?})",
container,
path,
source,
source,
))]
UnableToDeleteData {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
path: String,
},
#[snafu(display(
"Unable to GET data. Container: {}, Location: {}, Error: {} ({:?})",
container,
path,
source,
source,
))]
UnableToGetData {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
path: String,
},
#[snafu(display(
"Unable to HEAD data. Container: {}, Location: {}, Error: {} ({:?})",
container,
path,
source,
source,
))]
UnableToHeadData {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
path: String,
},
#[snafu(display(
"Unable to GET part of the data. Container: {}, Location: {}, Error: {} ({:?})",
container,
path,
source,
source,
))]
UnableToGetPieceOfData {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
path: String,
},
#[snafu(display(
"Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})",
container,
path,
source,
source,
))]
UnableToPutData {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
path: String,
},
#[snafu(display(
"Unable to list data. Bucket: {}, Error: {} ({:?})",
container,
source,
source,
))]
UnableToListData {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
},
#[snafu(display(
"Unable to copy object. Container: {}, From: {}, To: {}, Error: {}",
container,
from,
to,
source
))]
UnableToCopyFile {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
from: String,
to: String,
},
#[snafu(display(
"Unable parse source url. Container: {}, Error: {}",
container,
source
))]
UnableToParseUrl {
source: url::ParseError,
container: String,
},
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
AlreadyExists {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[cfg(not(feature = "azure_test"))]
#[snafu(display(
"Azurite (azure emulator) support not compiled in, please add `azure_test` feature"
))]
NoEmulatorFeature,
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NotFound { path, source } => Self::NotFound { path, source },
Error::AlreadyExists { path, source } => Self::AlreadyExists { path, source },
_ => Self::Generic {
store: "Azure Blob Storage",
source: Box::new(source),
},
}
}
}
/// Configuration for connecting to [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/).
#[derive(Debug)]
pub struct MicrosoftAzure {
container_client: Arc<ContainerClient>,
container_name: String,
blob_base_url: String,
is_emulator: bool,
}
impl std::fmt::Display for MicrosoftAzure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.is_emulator {
true => write!(f, "MicrosoftAzureEmulator({})", self.container_name),
false => write!(f, "MicrosoftAzure({})", self.container_name),
}
}
}
#[allow(clippy::borrowed_box)]
fn check_err_not_found(err: &Box<dyn std::error::Error + Send + Sync>) -> bool {
if let Some(azure_core::HttpError::StatusCode { status, .. }) =
err.downcast_ref::<azure_core::HttpError>()
{
return status.as_u16() == 404;
};
false
}
#[async_trait]
impl ObjectStore for MicrosoftAzure {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let bytes = bytes::BytesMut::from(&*bytes);
self.container_client
.as_blob_client(location.as_ref())
.put_block_blob(bytes)
.execute()
.await
.context(UnableToPutDataSnafu {
container: &self.container_name,
path: location.to_owned(),
})?;
Ok(())
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let blob = self
.container_client
.as_blob_client(location.as_ref())
.get()
.execute()
.await
.map_err(|err| {
if check_err_not_found(&err) {
return Error::NotFound {
source: err,
path: location.to_string(),
};
};
Error::UnableToGetData {
source: err,
container: self.container_name.clone(),
path: location.to_string(),
}
})?;
Ok(GetResult::Stream(
futures::stream::once(async move { Ok(blob.data) }).boxed(),
))
}
async fn get_range(
&self,
location: &Path,
range: std::ops::Range<usize>,
) -> Result<Bytes> {
let blob = self
.container_client
.as_blob_client(location.as_ref())
.get()
.range(range)
.execute()
.await
.map_err(|err| {
if check_err_not_found(&err) {
return Error::NotFound {
source: err,
path: location.to_string(),
};
};
Error::UnableToGetPieceOfData {
source: err,
container: self.container_name.clone(),
path: location.to_string(),
}
})?;
Ok(blob.data)
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let res = self
.container_client
.as_blob_client(location.as_ref())
.get_properties()
.execute()
.await
.map_err(|err| {
if check_err_not_found(&err) {
return Error::NotFound {
source: err,
path: location.to_string(),
};
};
Error::UnableToHeadData {
source: err,
container: self.container_name.clone(),
path: location.to_string(),
}
})?;
convert_object_meta(res.blob)?.ok_or_else(|| super::Error::NotFound {
path: location.to_string(),
source: "is directory".to_string().into(),
})
}
async fn delete(&self, location: &Path) -> Result<()> {
self.container_client
.as_blob_client(location.as_ref())
.delete()
.delete_snapshots_method(DeleteSnapshotsMethod::Include)
.execute()
.await
.context(UnableToDeleteDataSnafu {
container: &self.container_name,
path: location.to_string(),
})?;
Ok(())
}
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.list_impl(prefix, false)
.await?
.map_ok(|resp| {
let names = resp
.blobs
.blobs
.into_iter()
.filter_map(|blob| convert_object_meta(blob).transpose());
futures::stream::iter(names)
})
.try_flatten()
.boxed();
Ok(stream)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let mut stream = self.list_impl(prefix, true).await?;
let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
while let Some(res) = stream.next().await {
let response = res?;
let prefixes = response.blobs.blob_prefix.unwrap_or_default();
for p in prefixes {
common_prefixes.insert(Path::parse(&p.name)?);
}
let blobs = response.blobs.blobs;
objects.reserve(blobs.len());
for blob in blobs {
if let Some(meta) = convert_object_meta(blob)? {
objects.push(meta);
}
}
}
Ok(ListResult {
common_prefixes: common_prefixes.into_iter().collect(),
objects,
})
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from_url = self.get_copy_from_url(from)?;
self.container_client
.as_blob_client(to.as_ref())
.copy(&from_url)
.execute()
.await
.context(UnableToCopyFileSnafu {
container: &self.container_name,
from: from.as_ref(),
to: to.as_ref(),
})?;
Ok(())
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let from_url = self.get_copy_from_url(from)?;
self.container_client
.as_blob_client(to.as_ref())
.copy(&from_url)
.if_match_condition(IfMatchCondition::NotMatch("*".to_string()))
.execute()
.await
.map_err(|err| {
if let Some(azure_core::HttpError::StatusCode { status, .. }) =
err.downcast_ref::<azure_core::HttpError>()
{
if status.as_u16() == 409 {
return Error::AlreadyExists {
source: err,
path: to.to_string(),
};
};
};
Error::UnableToCopyFile {
source: err,
container: self.container_name.clone(),
from: from.to_string(),
to: to.to_string(),
}
})?;
Ok(())
}
}
impl MicrosoftAzure {
/// helper function to create a source url for copy function
fn get_copy_from_url(&self, from: &Path) -> Result<reqwest::Url> {
Ok(reqwest::Url::parse(&format!(
"{}/{}/{}",
&self.blob_base_url, self.container_name, from
))
.context(UnableToParseUrlSnafu {
container: &self.container_name,
})?)
}
async fn list_impl(
&self,
prefix: Option<&Path>,
delimiter: bool,
) -> Result<BoxStream<'_, Result<ListBlobsResponse>>> {
enum ListState {
Start,
HasMore(String),
Done,
}
let prefix_raw = format_prefix(prefix);
Ok(stream::unfold(ListState::Start, move |state| {
let mut request = self.container_client.list_blobs();
if let Some(p) = prefix_raw.as_deref() {
request = request.prefix(p);
}
if delimiter {
request = request.delimiter(Delimiter::new(DELIMITER));
}
async move {
match state {
ListState::HasMore(ref marker) => {
request = request.next_marker(marker as &str);
}
ListState::Done => {
return None;
}
ListState::Start => {}
}
let resp = match request.execute().await.context(UnableToListDataSnafu {
container: &self.container_name,
}) {
Ok(resp) => resp,
Err(err) => return Some((Err(crate::Error::from(err)), state)),
};
let next_state = if let Some(marker) = &resp.next_marker {
ListState::HasMore(marker.as_str().to_string())
} else {
ListState::Done
};
Some((Ok(resp), next_state))
}
})
.boxed())
}
}
/// Returns `None` if is a directory
fn convert_object_meta(blob: Blob) -> Result<Option<ObjectMeta>> {
let location = Path::parse(blob.name)?;
let last_modified = blob.properties.last_modified;
let size = blob
.properties
.content_length
.try_into()
.expect("unsupported size on this platform");
// This is needed to filter out gen2 directories
// https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-known-issues#blob-storage-apis
Ok((size > 0).then(|| ObjectMeta {
location,
last_modified,
size,
}))
}
#[cfg(feature = "azure_test")]
fn check_if_emulator_works() -> Result<()> {
Ok(())
}
#[cfg(not(feature = "azure_test"))]
fn check_if_emulator_works() -> Result<()> {
Err(Error::NoEmulatorFeature.into())
}
/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
///
/// The credentials `account` and `access_key` must provide access to the
/// store.
pub fn new_azure(
account: impl Into<String>,
access_key: impl Into<String>,
container_name: impl Into<String>,
use_emulator: bool,
) -> Result<MicrosoftAzure> {
let account = account.into();
let access_key = access_key.into();
let http_client: Arc<dyn HttpClient> = Arc::new(reqwest::Client::new());
let (is_emulator, storage_account_client) = if use_emulator {
check_if_emulator_works()?;
(true, StorageAccountClient::new_emulator_default())
} else {
(
false,
StorageAccountClient::new_access_key(
Arc::clone(&http_client),
&account,
&access_key,
),
)
};
let storage_client = storage_account_client.as_storage_client();
let blob_base_url = storage_account_client
.blob_storage_url()
.as_ref()
// make url ending consistent between the emulator and remote storage account
.trim_end_matches('/')
.to_string();
let container_name = container_name.into();
let container_client = storage_client.as_container_client(&container_name);
Ok(MicrosoftAzure {
container_client,
container_name,
blob_base_url,
is_emulator,
})
}
#[cfg(test)]
mod tests {
use crate::azure::new_azure;
use crate::tests::{
copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy,
};
use std::env;
#[derive(Debug)]
struct AzureConfig {
storage_account: String,
access_key: String,
bucket: String,
use_emulator: bool,
}
// Helper macro to skip tests if TEST_INTEGRATION and the Azure environment
// variables are not set.
macro_rules! maybe_skip_integration {
() => {{
dotenv::dotenv().ok();
let use_emulator = std::env::var("AZURE_USE_EMULATOR").is_ok();
let mut required_vars = vec!["OBJECT_STORE_BUCKET"];
if !use_emulator {
required_vars.push("AZURE_STORAGE_ACCOUNT");
required_vars.push("AZURE_STORAGE_ACCESS_KEY");
}
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = std::env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
} else if force.is_err() {
eprintln!(
"skipping Azure integration test - set {}TEST_INTEGRATION to run",
if unset_var_names.is_empty() {
String::new()
} else {
format!("{} and ", unset_var_names)
}
);
return;
} else {
AzureConfig {
storage_account: env::var("AZURE_STORAGE_ACCOUNT")
.unwrap_or_default(),
access_key: env::var("AZURE_STORAGE_ACCESS_KEY").unwrap_or_default(),
bucket: env::var("OBJECT_STORE_BUCKET")
.expect("already checked OBJECT_STORE_BUCKET"),
use_emulator,
}
}
}};
}
#[tokio::test]
async fn azure_blob_test() {
let config = maybe_skip_integration!();
let integration = new_azure(
config.storage_account,
config.access_key,
config.bucket,
config.use_emulator,
)
.unwrap();
put_get_delete_list(&integration).await.unwrap();
list_uses_directories_correctly(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
rename_and_copy(&integration).await.unwrap();
copy_if_not_exists(&integration).await.unwrap();
}
}