Add R create()
diff --git a/amber/src/main/python/core/models/big_object_manager.R b/amber/src/main/python/core/models/big_object_manager.R
index 9e33f23..5ce785c 100644
--- a/amber/src/main/python/core/models/big_object_manager.R
+++ b/amber/src/main/python/core/models/big_object_manager.R
@@ -20,51 +20,89 @@
# This file defines the BigObjectManager API and BigObjectStream class.
# Users call BigObjectManager$open() in their R UDF code to read big objects.
-# Load required packages
-if (!require("aws.s3", quietly = TRUE)) {
- warning("Package 'aws.s3' not installed. Install with: install.packages('aws.s3')")
+.py_cache <- new.env(parent = emptyenv())
+
+get_py_object <- function(module, attr = NULL) {
+ if (is.null(getOption("reticulate.configure_signal_handlers"))) {
+ options(reticulate.configure_signal_handlers = FALSE)
+ }
+ if (!requireNamespace("reticulate", quietly = TRUE)) {
+ stop(
+ "Package 'reticulate' is required for BigObjectManager. ",
+ "Install with: install.packages('reticulate')",
+ call. = FALSE
+ )
+ }
+ key <- if (is.null(attr)) module else paste(module, attr, sep = "::")
+ if (!exists(key, envir = .py_cache, inherits = FALSE)) {
+ module_obj <- reticulate::import(module, convert = FALSE)
+ .py_cache[[key]] <- if (is.null(attr)) module_obj else module_obj[[attr]]
+ }
+ .py_cache[[key]]
}
-# BigObjectStream Reference Class
-# Provides stream-like access to big object content
-#
-# Uses S3 streaming connection (downloads on-demand, not upfront!)
-# Memory usage: O(1) - only current chunk in memory
-BigObjectStream <- setRefClass("BigObjectStream",
- fields = list(conn = "ANY", uri = "character", is_closed = "logical"),
+py_manager <- function() get_py_object("pytexera.storage.big_object_manager", "BigObjectManager")
+py_pointer_class <- function() get_py_object("core.models.schema.big_object_pointer", "BigObjectPointer")
+as_python_payload <- function(value) {
+ cleanup <- NULL
+ obj <- NULL
+
+ if (is.raw(value)) {
+ obj <- reticulate::r_to_py(value, convert = TRUE)
+ } else if (inherits(value, "connection")) {
+ if (!isOpen(value, "rb")) {
+ open(value, "rb")
+ cleanup <- function() close(value)
+ }
+ obj <- reticulate::r_to_py(readBin(value, what = "raw", n = -1L), convert = TRUE)
+ } else if (inherits(value, "BigObjectStream")) {
+ obj <- value$py_stream
+ } else if (inherits(value, "BigObjectPointer")) {
+ py_stream <- py_manager()$open(py_pointer_class()(value$uri))
+ cleanup <- function() try(py_stream$close(), silent = TRUE)
+ obj <- py_stream
+ } else if (is.character(value) && length(value) == 1L) {
+ if (file.exists(value)) {
+ py_file <- get_py_object("builtins")$open(value, "rb")
+ cleanup <- function() py_file$close()
+ obj <- py_file
+ } else {
+ obj <- reticulate::r_to_py(charToRaw(value), convert = TRUE)
+ }
+ } else {
+ stop(
+ "Unsupported data type for BigObjectManager$create(). ",
+ "Provide raw vector, binary connection, file path, or character scalar.",
+ call. = FALSE
+ )
+ }
+
+ list(obj = obj, cleanup = cleanup)
+}
+
+# BigObjectStream wraps the Python stream for seamless R access.
+BigObjectStream <- setRefClass(
+ "BigObjectStream",
+ fields = list(py_stream = "ANY", uri = "character", is_closed = "logical"),
methods = list(
- initialize = function(s3_conn, uri_val) {
- # Store the S3 connection (NOT raw bytes!)
- # This enables true streaming (downloads on-demand)
- conn <<- s3_conn
+ initialize = function(py_stream_obj, uri_val) {
+ py_stream <<- py_stream_obj
uri <<- uri_val
is_closed <<- FALSE
},
read = function(n = -1L) {
if (is_closed) stop("Stream is closed")
-
- if (n == -1L) {
- # Read all remaining data in chunks to avoid memory spike
- # Downloads incrementally from S3 (not all at once!)
- result <- raw(0)
- chunk_size <- 10 * 1024 * 1024 # 10MB chunks
- repeat {
- chunk <- tryCatch(
- readBin(conn, "raw", chunk_size),
- error = function(e) raw(0)
- )
- if (length(chunk) == 0) break
- result <- c(result, chunk)
- }
- result
+ bytes <- if (n == -1L) {
+ py_stream$read()
} else {
- # Read exactly n bytes (downloads only n bytes from S3!)
- readBin(conn, "raw", n)
+ py_stream$read(as.integer(n))
}
+ r_obj <- reticulate::py_to_r(bytes)
+ if (is.null(r_obj)) raw(0) else as.raw(r_obj)
},
close = function() {
if (!is_closed) {
- base::close(conn)
+ try(py_stream$close(), silent = TRUE)
is_closed <<- TRUE
}
},
@@ -72,130 +110,49 @@
)
)
-# BigObjectManager API
-# Main interface for accessing big objects from S3
+# Public API exposed to R UDFs.
BigObjectManager <- list(
- open = function(pointer_or_uri) {
- # Extract from list if needed (for backward compatibility)
- if (is.list(pointer_or_uri) && length(pointer_or_uri) == 1) {
- pointer_or_uri <- pointer_or_uri[[1]]
+ create = function(data) {
+ payload <- as_python_payload(data)
+ if (!is.null(payload$cleanup)) {
+ on.exit(payload$cleanup(), add = TRUE)
}
-
- # Get URI string
- uri <- if (inherits(pointer_or_uri, "BigObjectPointer")) {
- pointer_or_uri$uri
- } else if (is.character(pointer_or_uri)) {
- pointer_or_uri
- } else {
- stop("Expected BigObjectPointer or character URI")
- }
-
- if (!grepl("^s3://", uri)) stop(paste("Invalid S3 URI:", uri))
-
- # Parse s3://bucket/key
- parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]]
- if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri))
-
- bucket <- parts[1]
- key <- paste(parts[-1], collapse = "/")
-
- # Configure S3 credentials from environment variables
- Sys.setenv(
- AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", "texera_minio"),
- AWS_SECRET_ACCESS_KEY = Sys.getenv("STORAGE_S3_AUTH_PASSWORD", "password"),
- AWS_S3_ENDPOINT = Sys.getenv("STORAGE_S3_ENDPOINT", "localhost:9000"),
- AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2")
- )
-
- # Create TRUE STREAMING connection to S3
- # This does NOT download the file - downloads happen on-demand as you read()!
- message("BigObjectManager: Opening streaming connection for: ", uri)
- start_time <- Sys.time()
-
- s3_conn <- tryCatch(
- aws.s3::s3connection(
- object = key,
- bucket = bucket,
- region = Sys.getenv("AWS_DEFAULT_REGION"),
- base_url = Sys.getenv("AWS_S3_ENDPOINT"),
- use_https = grepl("^https://", Sys.getenv("AWS_S3_ENDPOINT"))
- ),
- error = function(e) stop(paste("Failed to open streaming connection for", uri, ":", conditionMessage(e)))
- )
-
- open_time <- Sys.time()
- message(sprintf("BigObjectManager: Streaming connection established in %.2f seconds",
- as.numeric(difftime(open_time, start_time, units = "secs"))))
- message("BigObjectManager: Data will be downloaded on-demand as you read() - O(1) memory!")
-
- BigObjectStream$new(s3_conn, uri)
+ pointer <- py_manager()$create(payload$obj)
+ BigObjectPointer$new(uri_val = reticulate::py_to_r(pointer$uri))
},
-
- # Fast path for reading RDS files - downloads directly to disk, skips memory
- readRDS = function(pointer_or_uri) {
- # Extract from list if needed (for backward compatibility)
- if (is.list(pointer_or_uri) && length(pointer_or_uri) == 1) {
- pointer_or_uri <- pointer_or_uri[[1]]
+
+ open = function(pointer) {
+ ptr <- pointer
+ if (is.list(ptr) && length(ptr) == 1) {
+ ptr <- ptr[[1]]
}
-
- # Get URI string
- uri <- if (inherits(pointer_or_uri, "BigObjectPointer")) {
- pointer_or_uri$uri
- } else if (is.character(pointer_or_uri)) {
- pointer_or_uri
- } else {
- stop("Expected BigObjectPointer or character URI")
+ if (!inherits(ptr, "BigObjectPointer")) {
+ stop("Expected BigObjectPointer")
}
-
- if (!grepl("^s3://", uri)) stop(paste("Invalid S3 URI:", uri))
-
- # Parse s3://bucket/key
- parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]]
- if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri))
-
- # Configure S3 credentials from environment variables
- Sys.setenv(
- AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", "texera_minio"),
- AWS_SECRET_ACCESS_KEY = Sys.getenv("STORAGE_S3_AUTH_PASSWORD", "password"),
- AWS_S3_ENDPOINT = Sys.getenv("STORAGE_S3_ENDPOINT", "localhost:9000"),
- AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2")
- )
-
- message("BigObjectManager: Fast readRDS for: ", uri)
- start_time <- Sys.time()
-
- # Download directly to temp file (skips loading into memory!)
+ if (!startsWith(ptr$uri, "s3://")) {
+ stop(paste("Invalid S3 URI:", ptr$uri))
+ }
+ py_stream <- py_manager()$open(py_pointer_class()(ptr$uri))
+ BigObjectStream$new(py_stream, ptr$uri)
+ },
+
+ readRDS = function(pointer) {
+ stream <- BigObjectManager$open(pointer)
+ on.exit(stream$close(), add = TRUE)
+
temp_file <- tempfile(fileext = ".rds")
- tryCatch(
- aws.s3::save_object(
- object = paste(parts[-1], collapse = "/"),
- bucket = parts[1],
- file = temp_file,
- region = Sys.getenv("AWS_DEFAULT_REGION"),
- base_url = Sys.getenv("AWS_S3_ENDPOINT"),
- use_https = grepl("^https://", Sys.getenv("AWS_S3_ENDPOINT"))
- ),
- error = function(e) stop(paste("Failed to download", uri, ":", conditionMessage(e)))
- )
-
- download_time <- Sys.time()
- file_size_gb <- file.info(temp_file)$size / 1e9
- message(sprintf("BigObjectManager: Downloaded %.2f GB to disk in %.2f seconds",
- file_size_gb,
- as.numeric(difftime(download_time, start_time, units = "secs"))))
-
- # Read RDS from temp file
+ on.exit(unlink(temp_file), add = TRUE)
+ con <- file(temp_file, open = "wb")
+ on.exit(close(con), add = TRUE)
+
+ repeat {
+ chunk <- stream$read(16L * 1024L * 1024L) # 16 MB
+ if (length(chunk) == 0) break
+ writeBin(chunk, con, useBytes = TRUE)
+ }
+
result <- base::readRDS(temp_file)
- read_time <- Sys.time()
- message(sprintf("BigObjectManager: readRDS() took %.2f seconds",
- as.numeric(difftime(read_time, download_time, units = "secs"))))
-
- unlink(temp_file) # Clean up
-
- total_time <- Sys.time()
- message(sprintf("BigObjectManager: Total time: %.2f seconds",
- as.numeric(difftime(total_time, start_time, units = "secs"))))
-
+
result
}
)
diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py b/amber/src/main/python/pytexera/storage/big_object_manager.py
index eb640eb..889f798 100644
--- a/amber/src/main/python/pytexera/storage/big_object_manager.py
+++ b/amber/src/main/python/pytexera/storage/big_object_manager.py
@@ -190,12 +190,32 @@
uri = f"s3://{cls.DEFAULT_BUCKET}/{object_key}"
s3 = cls._get_s3_client()
+ # Normalize payload for S3 upload
+ fileobj: Optional[BinaryIO] = None
+ payload: Optional[bytes] = None
+ if isinstance(data, bytes):
+ payload = data
+ elif isinstance(data, bytearray):
+ payload = bytes(data)
+ elif isinstance(data, memoryview):
+ payload = data.tobytes()
+ elif hasattr(data, "read"):
+ fileobj = data # type: ignore[assignment]
+ payload = None
+ else:
+ try:
+ payload = bytes(data) # type: ignore[arg-type]
+ except TypeError as exc:
+ raise RuntimeError(
+ "Unsupported data type for BigObjectManager.create; expected bytes-like object or file-like with read()."
+ ) from exc
+
# Upload to S3
try:
- if isinstance(data, bytes):
- s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, Body=data)
+ if payload is not None:
+ s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, Body=payload)
else:
- s3.upload_fileobj(data, cls.DEFAULT_BUCKET, object_key)
+ s3.upload_fileobj(fileobj, cls.DEFAULT_BUCKET, object_key) # type: ignore[arg-type]
except Exception as e:
raise RuntimeError(f"Failed to upload big object to S3: {e}")
diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
index c7b8ede..550f6d2 100644
--- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -95,7 +95,7 @@
val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
// Upload to S3
- S3StorageClient.uploadObject(DEFAULT_BUCKET, objectKey, stream)
+ S3StorageClient.uploadObject(DEFAULT_BUCKET, objectKey, stream)
// Register in database
try {
@@ -159,8 +159,8 @@
ExecutionContext.getBigObjectEventCallback.foreach { callback =>
try {
callback(operatorId, uri, eventType)
- } catch {
- case e: Exception =>
+ } catch {
+ case e: Exception =>
logger.warn(s"Failed to send BigObjectEvent: ${e.getMessage}")
}
}
@@ -197,7 +197,7 @@
} catch {
case e: Exception =>
logger.error(s"Failed to delete big object: $uri", e)
- }
+}
}
// Delete database records