blob: 04a3388080e6d3bdb897572e2361a01a7be5b098 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::Debug;
use opendal_core::*;
use serde::Deserialize;
use serde::Serialize;
use super::LAKEFS_SCHEME;
use super::backend::LakefsBuilder;
/// Configuration for Lakefs service support.
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
#[non_exhaustive]
pub struct LakefsConfig {
/// Base url.
///
/// This is required.
pub endpoint: Option<String>,
/// Username for Lakefs basic authentication.
///
/// This is required.
pub username: Option<String>,
/// Password for Lakefs basic authentication.
///
/// This is required.
pub password: Option<String>,
/// Root of this backend. Can be "/path/to/dir".
///
/// Default is "/".
pub root: Option<String>,
/// The repository name
///
/// This is required.
pub repository: Option<String>,
/// Name of the branch or a commit ID. Default is main.
///
/// This is optional.
pub branch: Option<String>,
}
impl Debug for LakefsConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LakefsConfig")
.field("endpoint", &self.endpoint)
.field("root", &self.root)
.field("repository", &self.repository)
.field("branch", &self.branch)
.finish_non_exhaustive()
}
}
impl Configurator for LakefsConfig {
type Builder = LakefsBuilder;
fn from_uri(uri: &OperatorUri) -> Result<Self> {
let raw_path = uri.root().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "uri path must contain repository")
.with_context("service", LAKEFS_SCHEME)
})?;
let (repository, remainder) = match raw_path.split_once('/') {
Some((repo, rest)) => (repo, Some(rest)),
None => (raw_path, None),
};
let repository = if repository.is_empty() {
None
} else {
Some(repository)
}
.ok_or_else(|| {
Error::new(
ErrorKind::ConfigInvalid,
"repository is required in uri path",
)
.with_context("service", LAKEFS_SCHEME)
})?;
let mut map = uri.options().clone();
if let Some(authority) = uri.authority() {
map.insert("endpoint".to_string(), format!("https://{authority}"));
}
map.insert("repository".to_string(), repository.to_string());
if let Some(rest) = remainder {
if map.contains_key("branch") {
if !rest.is_empty() {
map.insert("root".to_string(), rest.to_string());
}
} else {
let (branch, maybe_root) = match rest.split_once('/') {
Some((branch_part, root_part)) => (branch_part, Some(root_part)),
None => (rest, None),
};
if !branch.is_empty() {
map.insert("branch".to_string(), branch.to_string());
}
if let Some(root_part) = maybe_root {
if !root_part.is_empty() {
map.insert("root".to_string(), root_part.to_string());
}
}
}
}
Self::from_iter(map)
}
fn into_builder(self) -> Self::Builder {
LakefsBuilder { config: self }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn from_uri_sets_endpoint_repository_branch_and_root() -> Result<()> {
let uri = OperatorUri::new(
"lakefs://api.example.com/sample/main/data/dir",
Vec::<(String, String)>::new(),
)?;
let cfg = LakefsConfig::from_uri(&uri)?;
assert_eq!(cfg.endpoint.as_deref(), Some("https://api.example.com"));
assert_eq!(cfg.repository.as_deref(), Some("sample"));
assert_eq!(cfg.branch.as_deref(), Some("main"));
assert_eq!(cfg.root.as_deref(), Some("data/dir"));
Ok(())
}
#[test]
fn from_uri_requires_repository() -> Result<()> {
let uri = OperatorUri::new("lakefs://api.example.com", Vec::<(String, String)>::new())?;
assert!(LakefsConfig::from_uri(&uri).is_err());
Ok(())
}
#[test]
fn from_uri_respects_branch_override_and_sets_root() -> Result<()> {
let uri = OperatorUri::new(
"lakefs://api.example.com/sample/content",
vec![("branch".to_string(), "develop".to_string())],
)?;
let cfg = LakefsConfig::from_uri(&uri)?;
assert_eq!(cfg.branch.as_deref(), Some("develop"));
assert_eq!(cfg.root.as_deref(), Some("content"));
Ok(())
}
}