# Methods to call into SparkRBackend.
# Returns TRUE if object is an instance of given class
isInstanceOf <- function(jobj, className) {
stopifnot(class(jobj) == "jobj")
cls <- callJStatic("java.lang.Class", "forName", className)
callJMethod(cls, "isInstance", jobj)
# Call a Java method named methodName on the object
# specified by objId. objId should be a "jobj" returned
# from the SparkRBackend.
callJMethod <- function(objId, methodName, ...) {
stopifnot(class(objId) == "jobj")
if (!isValidJobj(objId)) {
stop("Invalid jobj ", objId$id,
". If SparkR was restarted, Spark operations need to be re-executed.")
invokeJava(isStatic = FALSE, objId$id, methodName, ...)
# Call a static method on a specified className
callJStatic <- function(className, methodName, ...) {
invokeJava(isStatic = TRUE, className, methodName, ...)
# Create a new object of the specified class name
newJObject <- function(className, ...) {
invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
# Remove an object from the SparkR backend. This is done
# automatically when a jobj is garbage collected.
removeJObject <- function(objId) {
invokeJava(isStatic = TRUE, "SparkRHandler", "rm", objId)
isRemoveMethod <- function(isStatic, objId, methodName) {
isStatic == TRUE && objId == "SparkRHandler" && methodName == "rm"
# Invoke a Java method on the SparkR backend. Users
# should typically use one of the higher level methods like
# callJMethod, callJStatic etc. instead of using this.
# isStatic - TRUE if the method to be called is static
# objId - String that refers to the object on which method is invoked
# Should be a jobj id for non-static methods and the classname
# for static methods
# methodName - name of method to be invoked
invokeJava <- function(isStatic, objId, methodName, ...) {
if (!exists(".sparkRCon", .sparkREnv)) {
stop("No connection to backend found. Please re-run sparkR.session()")
# If this isn't a removeJObject call
if (!isRemoveMethod(isStatic, objId, methodName)) {
objsToRemove <- ls(.toRemoveJobjs)
if (length(objsToRemove) > 0) {
function(e) {
rm(list = objsToRemove, envir = .toRemoveJobjs)
rc <- rawConnection(raw(0), "r+")
writeBoolean(rc, isStatic)
writeString(rc, objId)
writeString(rc, methodName)
args <- list(...)
writeInt(rc, length(args))
writeArgs(rc, args)
# Construct the whole request message to send it once,
# avoiding write-write-read pattern in case of Nagle's algorithm.
# Refer to for the details.
bytesToSend <- rawConnectionValue(rc)
rc <- rawConnection(raw(0), "r+")
writeInt(rc, length(bytesToSend))
writeBin(bytesToSend, rc)
requestMessage <- rawConnectionValue(rc)
conn <- get(".sparkRCon", .sparkREnv)
writeBin(requestMessage, conn)
returnStatus <- readInt(conn)
handleErrors(returnStatus, conn)
# Backend will send +1 as keep alive value to prevent various connection timeouts
# on very long running jobs. See spark.r.heartBeatInterval
while (returnStatus == 1) {
returnStatus <- readInt(conn)
handleErrors(returnStatus, conn)
# Helper function to check for returned errors and print appropriate error message to user
handleErrors <- function(returnStatus, conn) {
if (length(returnStatus) == 0) {
stop("No status is returned. Java SparkR backend might have failed.")
# 0 is success and +1 is reserved for heartbeats. Other negative values indicate errors.
if (returnStatus < 0) {