Add R create()
diff --git a/amber/src/main/python/core/models/big_object_manager.R b/amber/src/main/python/core/models/big_object_manager.R
index 9e33f23..5ce785c 100644
--- a/amber/src/main/python/core/models/big_object_manager.R
+++ b/amber/src/main/python/core/models/big_object_manager.R
@@ -20,51 +20,89 @@
 # This file defines the BigObjectManager API and BigObjectStream class.
 # Users call BigObjectManager$open() in their R UDF code to read big objects.
 
-# Load required packages
-if (!require("aws.s3", quietly = TRUE)) {
-    warning("Package 'aws.s3' not installed. Install with: install.packages('aws.s3')")
+.py_cache <- new.env(parent = emptyenv())
+
+get_py_object <- function(module, attr = NULL) {
+    if (is.null(getOption("reticulate.configure_signal_handlers"))) {
+        options(reticulate.configure_signal_handlers = FALSE)
+    }
+    if (!requireNamespace("reticulate", quietly = TRUE)) {
+        stop(
+            "Package 'reticulate' is required for BigObjectManager. ",
+            "Install with: install.packages('reticulate')",
+            call. = FALSE
+        )
+    }
+    key <- if (is.null(attr)) module else paste(module, attr, sep = "::")
+    if (!exists(key, envir = .py_cache, inherits = FALSE)) {
+        module_obj <- reticulate::import(module, convert = FALSE)
+        .py_cache[[key]] <- if (is.null(attr)) module_obj else module_obj[[attr]]
+    }
+    .py_cache[[key]]
 }
 
-# BigObjectStream Reference Class
-# Provides stream-like access to big object content
-# 
-# Uses S3 streaming connection (downloads on-demand, not upfront!)
-# Memory usage: O(1) - only current chunk in memory
-BigObjectStream <- setRefClass("BigObjectStream",
-    fields = list(conn = "ANY", uri = "character", is_closed = "logical"),
+py_manager <- function() get_py_object("pytexera.storage.big_object_manager", "BigObjectManager")
+py_pointer_class <- function() get_py_object("core.models.schema.big_object_pointer", "BigObjectPointer")
+as_python_payload <- function(value) {
+    cleanup <- NULL
+    obj <- NULL
+
+    if (is.raw(value)) {
+        obj <- reticulate::r_to_py(value, convert = TRUE)
+    } else if (inherits(value, "connection")) {
+        if (!isOpen(value, "rb")) {
+            open(value, "rb")
+            cleanup <- function() close(value)
+        }
+        obj <- reticulate::r_to_py(readBin(value, what = "raw", n = -1L), convert = TRUE)
+    } else if (inherits(value, "BigObjectStream")) {
+        obj <- value$py_stream
+    } else if (inherits(value, "BigObjectPointer")) {
+        py_stream <- py_manager()$open(py_pointer_class()(value$uri))
+        cleanup <- function() try(py_stream$close(), silent = TRUE)
+        obj <- py_stream
+    } else if (is.character(value) && length(value) == 1L) {
+        if (file.exists(value)) {
+            py_file <- get_py_object("builtins")$open(value, "rb")
+            cleanup <- function() py_file$close()
+            obj <- py_file
+        } else {
+            obj <- reticulate::r_to_py(charToRaw(value), convert = TRUE)
+        }
+    } else {
+        stop(
+            "Unsupported data type for BigObjectManager$create(). ",
+            "Provide raw vector, binary connection, file path, or character scalar.",
+            call. = FALSE
+        )
+    }
+
+    list(obj = obj, cleanup = cleanup)
+}
+
+# BigObjectStream wraps the Python stream for seamless R access.
+BigObjectStream <- setRefClass(
+    "BigObjectStream",
+    fields = list(py_stream = "ANY", uri = "character", is_closed = "logical"),
     methods = list(
-        initialize = function(s3_conn, uri_val) {
-            # Store the S3 connection (NOT raw bytes!)
-            # This enables true streaming (downloads on-demand)
-            conn <<- s3_conn
+        initialize = function(py_stream_obj, uri_val) {
+            py_stream <<- py_stream_obj
             uri <<- uri_val
             is_closed <<- FALSE
         },
         read = function(n = -1L) {
             if (is_closed) stop("Stream is closed")
-            
-            if (n == -1L) {
-                # Read all remaining data in chunks to avoid memory spike
-                # Downloads incrementally from S3 (not all at once!)
-                result <- raw(0)
-                chunk_size <- 10 * 1024 * 1024  # 10MB chunks
-                repeat {
-                    chunk <- tryCatch(
-                        readBin(conn, "raw", chunk_size),
-                        error = function(e) raw(0)
-                    )
-                    if (length(chunk) == 0) break
-                    result <- c(result, chunk)
-                }
-                result
+            bytes <- if (n == -1L) {
+                py_stream$read()
             } else {
-                # Read exactly n bytes (downloads only n bytes from S3!)
-                readBin(conn, "raw", n)
+                py_stream$read(as.integer(n))
             }
+            r_obj <- reticulate::py_to_r(bytes)
+            if (is.null(r_obj)) raw(0) else as.raw(r_obj)
         },
         close = function() {
             if (!is_closed) {
-                base::close(conn)
+                try(py_stream$close(), silent = TRUE)
                 is_closed <<- TRUE
             }
         },
@@ -72,130 +110,49 @@
     )
 )
 
-# BigObjectManager API
-# Main interface for accessing big objects from S3
+# Public API exposed to R UDFs.
 BigObjectManager <- list(
-    open = function(pointer_or_uri) {
-        # Extract from list if needed (for backward compatibility)
-        if (is.list(pointer_or_uri) && length(pointer_or_uri) == 1) {
-            pointer_or_uri <- pointer_or_uri[[1]]
+    create = function(data) {
+        payload <- as_python_payload(data)
+        if (!is.null(payload$cleanup)) {
+            on.exit(payload$cleanup(), add = TRUE)
         }
-        
-        # Get URI string
-        uri <- if (inherits(pointer_or_uri, "BigObjectPointer")) {
-            pointer_or_uri$uri
-        } else if (is.character(pointer_or_uri)) {
-            pointer_or_uri
-        } else {
-            stop("Expected BigObjectPointer or character URI")
-        }
-        
-        if (!grepl("^s3://", uri)) stop(paste("Invalid S3 URI:", uri))
-        
-        # Parse s3://bucket/key
-        parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]]
-        if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri))
-        
-        bucket <- parts[1]
-        key <- paste(parts[-1], collapse = "/")
-        
-        # Configure S3 credentials from environment variables
-        Sys.setenv(
-            AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", "texera_minio"),
-            AWS_SECRET_ACCESS_KEY = Sys.getenv("STORAGE_S3_AUTH_PASSWORD", "password"),
-            AWS_S3_ENDPOINT = Sys.getenv("STORAGE_S3_ENDPOINT", "localhost:9000"),
-            AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2")
-        )
-        
-        # Create TRUE STREAMING connection to S3
-        # This does NOT download the file - downloads happen on-demand as you read()!
-        message("BigObjectManager: Opening streaming connection for: ", uri)
-        start_time <- Sys.time()
-        
-        s3_conn <- tryCatch(
-            aws.s3::s3connection(
-                object = key,
-                bucket = bucket,
-                region = Sys.getenv("AWS_DEFAULT_REGION"),
-                base_url = Sys.getenv("AWS_S3_ENDPOINT"),
-                use_https = grepl("^https://", Sys.getenv("AWS_S3_ENDPOINT"))
-            ),
-            error = function(e) stop(paste("Failed to open streaming connection for", uri, ":", conditionMessage(e)))
-        )
-        
-        open_time <- Sys.time()
-        message(sprintf("BigObjectManager: Streaming connection established in %.2f seconds", 
-                       as.numeric(difftime(open_time, start_time, units = "secs"))))
-        message("BigObjectManager: Data will be downloaded on-demand as you read() - O(1) memory!")
-        
-        BigObjectStream$new(s3_conn, uri)
+        pointer <- py_manager()$create(payload$obj)
+        BigObjectPointer$new(uri_val = reticulate::py_to_r(pointer$uri))
     },
-    
-    # Fast path for reading RDS files - downloads directly to disk, skips memory
-    readRDS = function(pointer_or_uri) {
-        # Extract from list if needed (for backward compatibility)
-        if (is.list(pointer_or_uri) && length(pointer_or_uri) == 1) {
-            pointer_or_uri <- pointer_or_uri[[1]]
+
+    open = function(pointer) {
+        ptr <- pointer
+        if (is.list(ptr) && length(ptr) == 1) {
+            ptr <- ptr[[1]]
         }
-        
-        # Get URI string
-        uri <- if (inherits(pointer_or_uri, "BigObjectPointer")) {
-            pointer_or_uri$uri
-        } else if (is.character(pointer_or_uri)) {
-            pointer_or_uri
-        } else {
-            stop("Expected BigObjectPointer or character URI")
+        if (!inherits(ptr, "BigObjectPointer")) {
+            stop("Expected BigObjectPointer")
         }
-        
-        if (!grepl("^s3://", uri)) stop(paste("Invalid S3 URI:", uri))
-        
-        # Parse s3://bucket/key
-        parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]]
-        if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri))
-        
-        # Configure S3 credentials from environment variables
-        Sys.setenv(
-            AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", "texera_minio"),
-            AWS_SECRET_ACCESS_KEY = Sys.getenv("STORAGE_S3_AUTH_PASSWORD", "password"),
-            AWS_S3_ENDPOINT = Sys.getenv("STORAGE_S3_ENDPOINT", "localhost:9000"),
-            AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2")
-        )
-        
-        message("BigObjectManager: Fast readRDS for: ", uri)
-        start_time <- Sys.time()
-        
-        # Download directly to temp file (skips loading into memory!)
+        if (!startsWith(ptr$uri, "s3://")) {
+            stop(paste("Invalid S3 URI:", ptr$uri))
+        }
+        py_stream <- py_manager()$open(py_pointer_class()(ptr$uri))
+        BigObjectStream$new(py_stream, ptr$uri)
+    },
+
+    readRDS = function(pointer) {
+        stream <- BigObjectManager$open(pointer)
+        on.exit(stream$close(), add = TRUE)
+
         temp_file <- tempfile(fileext = ".rds")
-        tryCatch(
-            aws.s3::save_object(
-                object = paste(parts[-1], collapse = "/"),
-                bucket = parts[1],
-                file = temp_file,
-                region = Sys.getenv("AWS_DEFAULT_REGION"),
-                base_url = Sys.getenv("AWS_S3_ENDPOINT"),
-                use_https = grepl("^https://", Sys.getenv("AWS_S3_ENDPOINT"))
-            ),
-            error = function(e) stop(paste("Failed to download", uri, ":", conditionMessage(e)))
-        )
-        
-        download_time <- Sys.time()
-        file_size_gb <- file.info(temp_file)$size / 1e9
-        message(sprintf("BigObjectManager: Downloaded %.2f GB to disk in %.2f seconds", 
-                       file_size_gb,
-                       as.numeric(difftime(download_time, start_time, units = "secs"))))
-        
-        # Read RDS from temp file
+        on.exit(unlink(temp_file), add = TRUE)
+        con <- file(temp_file, open = "wb")
+        on.exit(close(con), add = TRUE)
+
+        repeat {
+            chunk <- stream$read(16L * 1024L * 1024L)  # 16 MB
+            if (length(chunk) == 0) break
+            writeBin(chunk, con, useBytes = TRUE)
+        }
+
         result <- base::readRDS(temp_file)
-        read_time <- Sys.time()
-        message(sprintf("BigObjectManager: readRDS() took %.2f seconds", 
-                       as.numeric(difftime(read_time, download_time, units = "secs"))))
-        
-        unlink(temp_file)  # Clean up
-        
-        total_time <- Sys.time()
-        message(sprintf("BigObjectManager: Total time: %.2f seconds", 
-                       as.numeric(difftime(total_time, start_time, units = "secs"))))
-        
+
         result
     }
 )
diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py b/amber/src/main/python/pytexera/storage/big_object_manager.py
index eb640eb..889f798 100644
--- a/amber/src/main/python/pytexera/storage/big_object_manager.py
+++ b/amber/src/main/python/pytexera/storage/big_object_manager.py
@@ -190,12 +190,32 @@
         uri = f"s3://{cls.DEFAULT_BUCKET}/{object_key}"
         s3 = cls._get_s3_client()
 
+        # Normalize payload for S3 upload
+        fileobj: Optional[BinaryIO] = None
+        payload: Optional[bytes] = None
+        if isinstance(data, bytes):
+            payload = data
+        elif isinstance(data, bytearray):
+            payload = bytes(data)
+        elif isinstance(data, memoryview):
+            payload = data.tobytes()
+        elif hasattr(data, "read"):
+            fileobj = data  # type: ignore[assignment]
+            payload = None
+        else:
+            try:
+                payload = bytes(data)  # type: ignore[arg-type]
+            except TypeError as exc:
+                raise RuntimeError(
+                    "Unsupported data type for BigObjectManager.create; expected bytes-like object or file-like with read()."
+                ) from exc
+
         # Upload to S3
         try:
-            if isinstance(data, bytes):
-                s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, Body=data)
+            if payload is not None:
+                s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, Body=payload)
             else:
-                s3.upload_fileobj(data, cls.DEFAULT_BUCKET, object_key)
+                s3.upload_fileobj(fileobj, cls.DEFAULT_BUCKET, object_key)  # type: ignore[arg-type]
         except Exception as e:
             raise RuntimeError(f"Failed to upload big object to S3: {e}")
 
diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
index c7b8ede..550f6d2 100644
--- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -95,7 +95,7 @@
     val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
 
     // Upload to S3
-    S3StorageClient.uploadObject(DEFAULT_BUCKET, objectKey, stream)
+      S3StorageClient.uploadObject(DEFAULT_BUCKET, objectKey, stream)
 
     // Register in database
     try {
@@ -159,8 +159,8 @@
     ExecutionContext.getBigObjectEventCallback.foreach { callback =>
       try {
         callback(operatorId, uri, eventType)
-      } catch {
-        case e: Exception =>
+    } catch {
+      case e: Exception =>
           logger.warn(s"Failed to send BigObjectEvent: ${e.getMessage}")
       }
     }
@@ -197,7 +197,7 @@
       } catch {
         case e: Exception =>
           logger.error(s"Failed to delete big object: $uri", e)
-      }
+}
     }
 
     // Delete database records