blob: 0a789e6c379d61c763e75e5d1537dd077e903501 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Methods to call into SparkRBackend.
# Returns TRUE if object is an instance of given class
isInstanceOf <- function(jobj, className) {
stopifnot(class(jobj) == "jobj")
cls <- callJStatic("java.lang.Class", "forName", className)
callJMethod(cls, "isInstance", jobj)
}
# Call a Java method named methodName on the object
# specified by objId. objId should be a "jobj" returned
# from the SparkRBackend.
callJMethod <- function(objId, methodName, ...) {
stopifnot(class(objId) == "jobj")
if (!isValidJobj(objId)) {
stop("Invalid jobj ", objId$id,
". If SparkR was restarted, Spark operations need to be re-executed.")
}
invokeJava(isStatic = FALSE, objId$id, methodName, ...)
}
# Call a static method on a specified className
callJStatic <- function(className, methodName, ...) {
invokeJava(isStatic = TRUE, className, methodName, ...)
}
# Create a new object of the specified class name
newJObject <- function(className, ...) {
invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
}
# Remove an object from the SparkR backend. This is done
# automatically when a jobj is garbage collected.
removeJObject <- function(objId) {
invokeJava(isStatic = TRUE, "SparkRHandler", "rm", objId)
}
isRemoveMethod <- function(isStatic, objId, methodName) {
isStatic == TRUE && objId == "SparkRHandler" && methodName == "rm"
}
# Invoke a Java method on the SparkR backend. Users
# should typically use one of the higher level methods like
# callJMethod, callJStatic etc. instead of using this.
#
# isStatic - TRUE if the method to be called is static
# objId - String that refers to the object on which method is invoked
# Should be a jobj id for non-static methods and the classname
# for static methods
# methodName - name of method to be invoked
invokeJava <- function(isStatic, objId, methodName, ...) {
if (!exists(".sparkRCon", .sparkREnv)) {
stop("No connection to backend found. Please re-run sparkR.session()")
}
# If this isn't a removeJObject call
if (!isRemoveMethod(isStatic, objId, methodName)) {
objsToRemove <- ls(.toRemoveJobjs)
if (length(objsToRemove) > 0) {
sapply(objsToRemove,
function(e) {
removeJObject(e)
})
rm(list = objsToRemove, envir = .toRemoveJobjs)
}
}
rc <- rawConnection(raw(0), "r+")
writeBoolean(rc, isStatic)
writeString(rc, objId)
writeString(rc, methodName)
args <- list(...)
writeInt(rc, length(args))
writeArgs(rc, args)
# Construct the whole request message to send it once,
# avoiding write-write-read pattern in case of Nagle's algorithm.
# Refer to http://en.wikipedia.org/wiki/Nagle%27s_algorithm for the details.
bytesToSend <- rawConnectionValue(rc)
close(rc)
rc <- rawConnection(raw(0), "r+")
writeInt(rc, length(bytesToSend))
writeBin(bytesToSend, rc)
requestMessage <- rawConnectionValue(rc)
close(rc)
conn <- get(".sparkRCon", .sparkREnv)
writeBin(requestMessage, conn)
returnStatus <- readInt(conn)
handleErrors(returnStatus, conn)
# Backend will send +1 as keep alive value to prevent various connection timeouts
# on very long running jobs. See spark.r.heartBeatInterval
while (returnStatus == 1) {
returnStatus <- readInt(conn)
handleErrors(returnStatus, conn)
}
readObject(conn)
}
# Helper function to check for returned errors and print appropriate error message to user
handleErrors <- function(returnStatus, conn) {
if (length(returnStatus) == 0) {
stop("No status is returned. Java SparkR backend might have failed.")
}
# 0 is success and +1 is reserved for heartbeats. Other negative values indicate errors.
if (returnStatus < 0) {
stop(readString(conn))
}
}