blob: 7760d9be16f0b1a8dae925bbc6909afa3f5af735 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Utility functions to serialize R objects so they can be read in Java.
# nolint start
# Type mapping from R to Java
#
# NULL -> Void
# integer -> Int
# character -> String
# logical -> Boolean
# double, numeric -> Double
# raw -> Array[Byte]
# Date -> Date
# POSIXct,POSIXlt -> Time
#
# list[T] -> Array[T], where T is one of above mentioned types
# Multi-element vector of any of the above (except raw) -> Array[T]
# environment -> Map[String, T], where T is a native type
# jobj -> Object, where jobj is an object created in the backend
# nolint end
getSerdeType <- function(object) {
type <- class(object)[[1]]
if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
"array"
} else if (type != "list") {
type
} else {
# Check if all elements are of same type
elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
if (length(elemType) <= 1) {
"array"
} else {
"list"
}
}
}
writeObject <- function(con, object, writeType = TRUE) {
# NOTE: In R vectors have same type as objects
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
# Checking types is needed here, since 'is.na' only handles atomic vectors,
# lists and pairlists
if (type %in% c("integer", "character", "logical", "double", "numeric")) {
if (is.na(object)) {
object <- NULL
type <- "NULL"
}
}
serdeType <- getSerdeType(object)
if (writeType) {
writeType(con, serdeType)
}
switch(serdeType,
NULL = writeVoid(con),
integer = writeInt(con, object),
character = writeString(con, object),
logical = writeBoolean(con, object),
double = writeDouble(con, object),
numeric = writeDouble(con, object),
raw = writeRaw(con, object),
array = writeArray(con, object),
list = writeList(con, object),
struct = writeList(con, object),
jobj = writeJobj(con, object),
environment = writeEnv(con, object),
Date = writeDate(con, object),
POSIXlt = writeTime(con, object),
POSIXct = writeTime(con, object),
stop("Unsupported type for serialization ", type))
}
writeVoid <- function(con) {
# no value for NULL
}
writeJobj <- function(con, value) {
if (!isValidJobj(value)) {
stop("invalid jobj ", value$id)
}
writeString(con, value$id)
}
writeString <- function(con, value) {
utfVal <- enc2utf8(value)
writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
writeBin(utfVal, con, endian = "big", useBytes = TRUE)
}
writeInt <- function(con, value) {
writeBin(as.integer(value), con, endian = "big")
}
writeDouble <- function(con, value) {
writeBin(value, con, endian = "big")
}
writeBoolean <- function(con, value) {
# TRUE becomes 1, FALSE becomes 0
writeInt(con, as.integer(value))
}
writeRawSerialize <- function(outputCon, batch) {
outputSer <- serialize(batch, ascii = FALSE, connection = NULL)
writeRaw(outputCon, outputSer)
}
writeRowSerialize <- function(outputCon, rows) {
invisible(lapply(rows, function(r) {
bytes <- serializeRow(r)
writeRaw(outputCon, bytes)
}))
}
serializeRow <- function(row) {
rawObj <- rawConnection(raw(0), "wb")
on.exit(close(rawObj))
writeList(rawObj, row)
rawConnectionValue(rawObj)
}
writeRaw <- function(con, batch) {
writeInt(con, length(batch))
writeBin(batch, con, endian = "big")
}
writeType <- function(con, class) {
type <- switch(class,
NULL = "n",
integer = "i",
character = "c",
logical = "b",
double = "d",
numeric = "d",
raw = "r",
array = "a",
list = "l",
struct = "s",
jobj = "j",
environment = "e",
Date = "D",
POSIXlt = "t",
POSIXct = "t",
stop("Unsupported type for serialization ", class))
writeBin(charToRaw(type), con)
}
# Used to pass arrays where all the elements are of the same type
writeArray <- function(con, arr) {
# TODO: Empty lists are given type "character" right now.
# This may not work if the Java side expects array of any other type.
if (length(arr) == 0) {
elemType <- class("somestring")
} else {
elemType <- getSerdeType(arr[[1]])
}
writeType(con, elemType)
writeInt(con, length(arr))
if (length(arr) > 0) {
for (a in arr) {
writeObject(con, a, FALSE)
}
}
}
# Used to pass arrays where the elements can be of different types
writeList <- function(con, list) {
writeInt(con, length(list))
for (elem in list) {
writeObject(con, elem)
}
}
# Used to pass in hash maps required on Java side.
writeEnv <- function(con, env) {
len <- length(env)
writeInt(con, len)
if (len > 0) {
writeArray(con, as.list(ls(env)))
vals <- lapply(ls(env), function(x) { env[[x]] })
writeList(con, as.list(vals))
}
}
writeDate <- function(con, date) {
writeString(con, as.character(date))
}
writeTime <- function(con, time) {
writeDouble(con, as.double(time))
}
# Used to serialize in a list of objects where each
# object can be of a different type. Serialization format is
# <object type> <object> for each object
writeArgs <- function(con, args) {
if (length(args) > 0) {
for (a in args) {
writeObject(con, a)
}
}
}
writeSerializeInArrow <- function(conn, df) {
if (requireNamespace("arrow", quietly = TRUE)) {
# There looks no way to send each batch in streaming format via socket
# connection. See ARROW-4512.
# So, it writes the whole Arrow streaming-formatted binary at once for now.
writeRaw(conn, arrow::write_arrow(df, raw()))
} else {
stop("'arrow' package should be installed.")
}
}