blob: 985d721d1b1de932b4247bea69ef129c199e32bb [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Apply [`ObjectStoreRegistration`] entries from the session-options proto to a
//! [`SessionContext`]'s [`RuntimeEnv`].
//!
//! Each backend (`s3` / `gcs` / `http`) is gated behind its own Cargo feature;
//! the corresponding arm of the proto `oneof` returns a clear error if the
//! feature was not enabled at build time. The default build of this crate
//! enables all three so Java callers don't have to think about features.
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use url::Url;
use crate::proto_gen::object_store_registration::Backend;
use crate::proto_gen::ObjectStoreRegistration;
use datafusion_jni_common::errors::JniResult;
#[cfg(feature = "object-store-gcp")]
use crate::proto_gen::GcsOptions;
#[cfg(feature = "object-store-http")]
use crate::proto_gen::HttpOptions;
#[cfg(feature = "object-store-aws")]
use crate::proto_gen::S3Options;
/// Apply every registration in `regs`, in order, to the context's `RuntimeEnv`.
/// If two registrations resolve to the same URL, the later one wins (matching
/// upstream `RuntimeEnv::register_object_store`).
pub(crate) fn apply_registrations(
ctx: &SessionContext,
regs: &[ObjectStoreRegistration],
) -> JniResult<()> {
for reg in regs {
let backend = reg
.backend
.as_ref()
.ok_or("ObjectStoreRegistration.backend is required")?;
let (url, store) = build_store(reg.url.as_deref(), backend)?;
ctx.runtime_env().register_object_store(&url, store);
}
Ok(())
}
#[allow(unused_variables)] // `url_override` is unused on builds with no features
fn build_store(
url_override: Option<&str>,
backend: &Backend,
) -> JniResult<(Url, Arc<dyn object_store::ObjectStore>)> {
match backend {
#[cfg(feature = "object-store-aws")]
Backend::S3(opts) => build_s3(url_override, opts),
#[cfg(not(feature = "object-store-aws"))]
Backend::S3(_) => Err(
"object-store-aws Cargo feature is not enabled in this build of datafusion-jni".into(),
),
#[cfg(feature = "object-store-gcp")]
Backend::Gcs(opts) => build_gcs(url_override, opts),
#[cfg(not(feature = "object-store-gcp"))]
Backend::Gcs(_) => Err(
"object-store-gcp Cargo feature is not enabled in this build of datafusion-jni".into(),
),
#[cfg(feature = "object-store-http")]
Backend::Http(opts) => build_http(url_override, opts),
#[cfg(not(feature = "object-store-http"))]
Backend::Http(_) => Err(
"object-store-http Cargo feature is not enabled in this build of datafusion-jni".into(),
),
}
}
#[cfg(feature = "object-store-aws")]
fn build_s3(
url_override: Option<&str>,
opts: &S3Options,
) -> JniResult<(Url, Arc<dyn object_store::ObjectStore>)> {
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
if opts.bucket.is_empty() {
return Err("S3Options.bucket is required".into());
}
// Seed from the standard AWS env (AWS_ACCESS_KEY_ID, AWS_DEFAULT_REGION,
// AWS_WEB_IDENTITY_TOKEN_FILE, ECS task creds, etc.) before applying
// explicit Java overrides. Without this, callers running on EC2/ECS/EKS
// who omit Java-side credentials and expect the SDK default-credential
// chain would get an empty builder and fail at request time.
//
// Walk env explicitly (instead of AmazonS3Builder::from_env()) so we can
// skip endpoint keys when the Java caller set `endpoint(...)`.
// AmazonS3Builder::build() picks `s3_endpoint` over `endpoint`
// (object_store 0.13 aws/builder.rs:1209), so a stray
// AWS_ENDPOINT_URL_S3 in the JVM env would otherwise silently override
// the explicit Java endpoint -- breaking MinIO/R2/etc. registrations
// run out of a process with global AWS env vars.
let java_has_endpoint = opts.endpoint.is_some();
let mut b = AmazonS3Builder::default();
for (os_key, os_value) in std::env::vars_os() {
let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) else {
continue;
};
if !key.starts_with("AWS_") {
continue;
}
if java_has_endpoint
&& (key == "AWS_ENDPOINT" || key == "AWS_ENDPOINT_URL" || key == "AWS_ENDPOINT_URL_S3")
{
continue;
}
// Skip AWS_BUCKET / AWS_BUCKET_NAME: the Java caller's bucket is the
// identity of this registration (it's also the host portion of the
// registry-key URL we return). An env-supplied bucket would overwrite
// it here while the URL still says `s3://<opts.bucket>`, so a query
// for the Java bucket would be served by the env bucket. Apply the
// Java bucket explicitly *after* the loop so it can't be clobbered.
if key == "AWS_BUCKET" || key == "AWS_BUCKET_NAME" {
continue;
}
if let Ok(config_key) = key.to_ascii_lowercase().parse() {
b = b.with_config(config_key, value);
}
}
b = b.with_bucket_name(&opts.bucket);
if let Some(ref v) = opts.region {
b = b.with_region(v);
}
if let Some(ref v) = opts.endpoint {
b = b.with_endpoint(v);
}
if let Some(ref v) = opts.access_key_id {
b = b.with_access_key_id(v);
}
if let Some(ref v) = opts.secret_access_key {
b = b.with_secret_access_key(v);
}
if let Some(ref v) = opts.session_token {
b = b.with_token(v);
}
if let Some(v) = opts.allow_http {
b = b.with_allow_http(v);
}
if let Some(v) = opts.skip_signature {
b = b.with_skip_signature(v);
}
if let Some(v) = opts.imdsv1_fallback {
// AmazonS3Builder::with_imdsv1_fallback() only sets to `true` (no
// boolean arg). To honor explicit `imdsv1Fallback(false)` and
// override an env-seeded `AWS_IMDSV1_FALLBACK=true`, write through
// with_config(...) which always sets the field, both directions.
b = b.with_config(AmazonS3ConfigKey::ImdsV1Fallback, v.to_string());
}
let store = b.build()?;
let url = parse_url(url_override, format!("s3://{}", opts.bucket))?;
Ok((url, Arc::new(store)))
}
#[cfg(feature = "object-store-gcp")]
fn build_gcs(
url_override: Option<&str>,
opts: &GcsOptions,
) -> JniResult<(Url, Arc<dyn object_store::ObjectStore>)> {
use object_store::gcp::GoogleCloudStorageBuilder;
if opts.bucket.is_empty() {
return Err("GcsOptions.bucket is required".into());
}
if opts.service_account_key.is_some() && opts.service_account_path.is_some() {
return Err(
"GcsOptions: service_account_key and service_account_path are mutually exclusive"
.into(),
);
}
// Seed from the standard GCS env (GOOGLE_BUCKET, GOOGLE_SERVICE_ACCOUNT,
// etc.) before applying explicit Java overrides.
//
// Walk env explicitly (instead of GoogleCloudStorageBuilder::from_env())
// so we can skip credential keys that conflict with Java-supplied
// credentials. GoogleCloudStorageBuilder::build() rejects the combo of
// service_account_path AND service_account_key with
// ServiceAccountPathAndKeyProvided (object_store 0.13 gcp/builder.rs:525)
// -- so if env has GOOGLE_SERVICE_ACCOUNT_KEY and Java sets
// serviceAccountPath(...), the build would fail. Skip env credential
// keys when Java provides any of the three credential fields.
let java_has_credential = opts.service_account_key.is_some()
|| opts.service_account_path.is_some()
|| opts.application_credentials.is_some();
let mut b = GoogleCloudStorageBuilder::default();
for (os_key, os_value) in std::env::vars_os() {
let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) else {
continue;
};
if !key.starts_with("GOOGLE_") {
continue;
}
if java_has_credential
&& (key == "GOOGLE_SERVICE_ACCOUNT"
|| key == "GOOGLE_SERVICE_ACCOUNT_PATH"
|| key == "GOOGLE_SERVICE_ACCOUNT_KEY"
|| key == "GOOGLE_APPLICATION_CREDENTIALS")
{
continue;
}
// Skip GOOGLE_BUCKET / GOOGLE_BUCKET_NAME: the Java caller's bucket is
// the identity of this registration (matches the registry-key URL).
// Apply the Java bucket explicitly *after* the loop so an env-supplied
// bucket can't silently route the registered URL to a different
// backend bucket.
if key == "GOOGLE_BUCKET" || key == "GOOGLE_BUCKET_NAME" {
continue;
}
if let Ok(config_key) = key.to_ascii_lowercase().parse() {
b = b.with_config(config_key, value);
}
}
b = b.with_bucket_name(&opts.bucket);
if let Some(ref v) = opts.service_account_key {
b = b.with_service_account_key(v);
}
if let Some(ref v) = opts.service_account_path {
b = b.with_service_account_path(v);
}
if let Some(ref v) = opts.application_credentials {
b = b.with_application_credentials(v);
}
let store = b.build()?;
let url = parse_url(url_override, format!("gs://{}", opts.bucket))?;
Ok((url, Arc::new(store)))
}
#[cfg(feature = "object-store-http")]
fn build_http(
url_override: Option<&str>,
opts: &HttpOptions,
) -> JniResult<(Url, Arc<dyn object_store::ObjectStore>)> {
use object_store::http::HttpBuilder;
let listing = url_override.ok_or(
"HttpOptions: ObjectStoreRegistration.url is required for the HTTP backend (no scheme-default)",
)?;
let listing_url =
Url::parse(listing).map_err(|e| format!("invalid HTTP URL {listing:?}: {e}"))?;
// DataFusion's DefaultObjectStoreRegistry::get_url_key strips paths from
// the registry key (it keeps only scheme + host:port), so a registration
// base like `https://example.com/data/` ends up under key
// `https://example.com`. If we hand HttpBuilder the same pathful URL,
// HttpStore prepends `/data/` to every object path on top of the path
// DataFusion already supplied -- a request for
// `https://example.com/data/file.parquet` becomes
// `https://example.com/data/data/file.parquet`. Strip the URL to
// scheme + authority for both the store base and the registry key so
// they match upstream's lookup semantics; the SQL-side URL still carries
// the full path, and HttpStore appends it once.
if !listing_url.path().is_empty() && listing_url.path() != "/" {
return Err(format!(
"HttpOptions: listing URL must be a host root (no path component); \
got {listing:?}. DataFusion's object-store registry keys ignore the \
path portion of a URL, so a pathful base would cause every object \
read to double-prepend the path. Register one HTTP store per \
scheme+host and let SQL paths carry the rest of the URL."
)
.into());
}
let mut b = HttpBuilder::new().with_url(listing);
if let Some(v) = opts.allow_http {
// allow_http lives on ClientOptions in object_store 0.13, not directly
// on HttpBuilder; route through with_client_options.
b = b.with_client_options(object_store::ClientOptions::new().with_allow_http(v));
}
let store = b.build()?;
Ok((listing_url, Arc::new(store)))
}
fn parse_url(url_override: Option<&str>, default: String) -> JniResult<Url> {
let s = url_override.unwrap_or(&default);
Url::parse(s).map_err(|e| format!("invalid object store URL {s:?}: {e}").into())
}