blob: 89a8fbecd36b0fe8a5e0809167c63e83c026b214 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Utility functions to deserialize objects from Java.
# nolint start
# Type mapping from Java to R
#
# void -> NULL
# Int -> integer
# String -> character
# Boolean -> logical
# Float -> double
# Double -> double
# Long -> double
# Array[Byte] -> raw
# Date -> Date
# Time -> POSIXct
#
# Array[T] -> list()
# Object -> jobj
#
# nolint end
readObject <- function(con) {
# Read type first
type <- readType(con)
readTypedObject(con, type)
}
readTypedObject <- function(con, type) {
switch(type,
"i" = readInt(con),
"c" = readString(con),
"b" = readBoolean(con),
"d" = readDouble(con),
"r" = readRaw(con),
"D" = readDate(con),
"t" = readTime(con),
"a" = readArray(con),
"l" = readList(con),
"e" = readEnv(con),
"s" = readStruct(con),
"n" = NULL,
"j" = getJobj(readString(con)),
stop("Unsupported type for deserialization ", type))
}
readStringData <- function(con, len) {
raw <- readBin(con, raw(), len, endian = "big")
string <- rawToChar(raw)
Encoding(string) <- "UTF-8"
string
}
readString <- function(con) {
stringLen <- readInt(con)
readStringData(con, stringLen)
}
readInt <- function(con) {
readBin(con, integer(), n = 1, endian = "big")
}
readDouble <- function(con) {
readBin(con, double(), n = 1, endian = "big")
}
readBoolean <- function(con) {
as.logical(readInt(con))
}
readType <- function(con) {
rawToChar(readBin(con, "raw", n = 1L))
}
readDate <- function(con) {
as.Date(readString(con))
}
readTime <- function(con) {
t <- readDouble(con)
as.POSIXct(t, origin = "1970-01-01")
}
readArray <- function(con) {
type <- readType(con)
len <- readInt(con)
if (len > 0) {
l <- vector("list", len)
for (i in 1:len) {
l[[i]] <- readTypedObject(con, type)
}
l
} else {
list()
}
}
# Read a list. Types of each element may be different.
# Null objects are read as NA.
readList <- function(con) {
len <- readInt(con)
if (len > 0) {
l <- vector("list", len)
for (i in 1:len) {
elem <- readObject(con)
if (is.null(elem)) {
elem <- NA
}
l[[i]] <- elem
}
l
} else {
list()
}
}
readEnv <- function(con) {
env <- new.env()
len <- readInt(con)
if (len > 0) {
for (i in 1:len) {
key <- readString(con)
value <- readObject(con)
env[[key]] <- value
}
}
env
}
# Read a field of StructType from SparkDataFrame
# into a named list in R whose class is "struct"
readStruct <- function(con) {
names <- readObject(con)
fields <- readObject(con)
names(fields) <- names
listToStruct(fields)
}
readRaw <- function(con) {
dataLen <- readInt(con)
readBin(con, raw(), as.integer(dataLen), endian = "big")
}
readRawLen <- function(con, dataLen) {
readBin(con, raw(), as.integer(dataLen), endian = "big")
}
readDeserialize <- function(con) {
# We have two cases that are possible - In one, the entire partition is
# encoded as a byte array, so we have only one value to read. If so just
# return firstData
dataLen <- readInt(con)
firstData <- unserialize(
readBin(con, raw(), as.integer(dataLen), endian = "big"))
# Else, read things into a list
dataLen <- readInt(con)
if (length(dataLen) > 0 && dataLen > 0) {
data <- list(firstData)
while (length(dataLen) > 0 && dataLen > 0) {
data[[length(data) + 1L]] <- unserialize(
readBin(con, raw(), as.integer(dataLen), endian = "big"))
dataLen <- readInt(con)
}
unlist(data, recursive = FALSE)
} else {
firstData
}
}
readMultipleObjects <- function(inputCon) {
# readMultipleObjects will read multiple continuous objects from
# a DataOutputStream. There is no preceding field telling the count
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream.
data <- list()
while (TRUE) {
# If reaching the end of the stream, type returned should be "".
type <- readType(inputCon)
if (type == "") {
break
}
data[[length(data) + 1L]] <- readTypedObject(inputCon, type)
}
data # this is a list of named lists now
}
readMultipleObjectsWithKeys <- function(inputCon) {
# readMultipleObjectsWithKeys will read multiple continuous objects from
# a DataOutputStream. There is no preceding field telling the count
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream. This function
# is for use by gapply. Each group of rows is followed by the grouping
# key for this group which is then followed by next group.
keys <- list()
data <- list()
subData <- list()
while (TRUE) {
# If reaching the end of the stream, type returned should be "".
type <- readType(inputCon)
if (type == "") {
break
} else if (type == "r") {
type <- readType(inputCon)
# A grouping boundary detected
key <- readTypedObject(inputCon, type)
index <- length(data) + 1L
data[[index]] <- subData
keys[[index]] <- key
subData <- list()
} else {
subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type)
}
}
list(keys = keys, data = data) # this is a list of keys and corresponding data
}
readDeserializeInArrow <- function(inputCon) {
if (requireNamespace("arrow", quietly = TRUE)) {
# Currently, there looks no way to read batch by batch by socket connection in R side,
# See ARROW-4512. Therefore, it reads the whole Arrow streaming-formatted binary at once
# for now.
dataLen <- readInt(inputCon)
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches()
lapply(batches, function(batch) as.data.frame(batch))
} else {
stop("'arrow' package should be installed.")
}
}
readDeserializeWithKeysInArrow <- function(inputCon) {
data <- readDeserializeInArrow(inputCon)
keys <- readMultipleObjects(inputCon)
# Read keys to map with each grouped batch later.
list(keys = keys, data = data)
}
readRowList <- function(obj) {
# readRowList is meant for use inside an lapply. As a result, it is
# necessary to open a standalone connection for the row and consume
# the numCols bytes inside the read function in order to correctly
# deserialize the row.
rawObj <- rawConnection(obj, "r+")
on.exit(close(rawObj))
readObject(rawObj)
}