blob: 5c2cdddcfa36a7eb87e4d7784973c7f828cdbc6e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# References to objects that exist on the JVM backend
# are maintained using the jobj.
#' @include generics.R
NULL
# Maintain a reference count of Java object references
# This allows us to GC the java object when it is safe
.validJobjs <- new.env(parent = emptyenv())
# List of object ids to be removed
.toRemoveJobjs <- new.env(parent = emptyenv())
# Check if jobj was created with the current SparkContext
isValidJobj <- function(jobj) {
if (exists(".scStartTime", envir = .sparkREnv)) {
jobj$appId == get(".scStartTime", envir = .sparkREnv)
} else {
FALSE
}
}
getJobj <- function(objId) {
newObj <- jobj(objId)
if (exists(objId, .validJobjs)) {
.validJobjs[[objId]] <- .validJobjs[[objId]] + 1
} else {
.validJobjs[[objId]] <- 1
}
newObj
}
# Handler for a java object that exists on the backend.
jobj <- function(objId) {
if (!is.character(objId)) {
stop("object id must be a character")
}
# NOTE: We need a new env for a jobj as we can only register
# finalizers for environments or external references pointers.
obj <- structure(new.env(parent = emptyenv()), class = "jobj")
obj$id <- objId
obj$appId <- get(".scStartTime", envir = .sparkREnv)
# Register a finalizer to remove the Java object when this reference
# is garbage collected in R
reg.finalizer(obj, cleanup.jobj)
obj
}
#' Print a JVM object reference.
#'
#' This function prints the type and id for an object stored
#' in the SparkR JVM backend.
#'
#' @param x The JVM object reference
#' @param ... further arguments passed to or from other methods
#' @note print.jobj since 1.4.0
#' @keywords internal
print.jobj <- function(x, ...) {
name <- getClassName.jobj(x)
cat("Java ref type", name, "id", x$id, "\n", sep = " ")
}
getClassName.jobj <- function(x) {
cls <- callJMethod(x, "getClass")
callJMethod(cls, "getName")
}
cleanup.jobj <- function(jobj) {
if (isValidJobj(jobj)) {
objId <- jobj$id
# If we don't know anything about this jobj, ignore it
if (exists(objId, envir = .validJobjs)) {
.validJobjs[[objId]] <- .validJobjs[[objId]] - 1
if (.validJobjs[[objId]] == 0) {
rm(list = objId, envir = .validJobjs)
# NOTE: We cannot call removeJObject here as the finalizer may be run
# in the middle of another RPC. Thus we queue up this object Id to be removed
# and then run all the removeJObject when the next RPC is called.
.toRemoveJobjs[[objId]] <- 1
}
}
}
}
clearJobjs <- function() {
valid <- ls(.validJobjs)
rm(list = valid, envir = .validJobjs)
removeList <- ls(.toRemoveJobjs)
rm(list = removeList, envir = .toRemoveJobjs)
}