blob: 408a3ff25b2b2d049216d1583a50c31672e4e8e6 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# RDD in R implemented in S4 OO system.
setOldClass("jobj")
#' S4 class that represents an RDD
#'
#' RDD can be created using functions like
#' \code{parallelize}, \code{textFile} etc.
#'
#' @rdname RDD
#' @seealso parallelize, textFile
#' @slot env An R environment that stores bookkeeping states of the RDD
#' @slot jrdd Java object reference to the backing JavaRDD
#' to an RDD
#' @noRd
setClass("RDD",
slots = list(env = "environment",
jrdd = "jobj"))
setClass("PipelinedRDD",
slots = list(prev = "RDD",
func = "function",
prev_jrdd = "jobj"),
contains = "RDD")
setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
isCached, isCheckpointed) {
# Check that RDD constructor is using the correct version of serializedMode
stopifnot(class(serializedMode) == "character")
stopifnot(serializedMode %in% c("byte", "string", "row"))
# RDD has three serialization types:
# byte: The RDD stores data serialized in R.
# string: The RDD stores data as strings.
# row: The RDD stores the serialized rows of a SparkDataFrame.
# We use an environment to store mutable states inside an RDD object.
# Note that R's call-by-value semantics makes modifying slots inside an
# object (passed as an argument into a function, such as cache()) difficult:
# i.e. one needs to make a copy of the RDD object and sets the new slot value
# there.
# The slots are inheritable from superclass. Here, both `env' and `jrdd' are
# inherited from RDD, but only the former is used.
.Object@env <- new.env()
.Object@env$isCached <- isCached
.Object@env$isCheckpointed <- isCheckpointed
.Object@env$serializedMode <- serializedMode
.Object@jrdd <- jrdd
.Object
})
setMethod("showRDD", "RDD",
function(object) {
cat(paste0(callJMethod(getJRDD(object), "toString"), "\n"))
})
setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
.Object@env <- new.env()
.Object@env$isCached <- FALSE
.Object@env$isCheckpointed <- FALSE
.Object@env$jrdd_val <- jrdd_val
if (!is.null(jrdd_val)) {
# This tracks the serialization mode for jrdd_val
.Object@env$serializedMode <- prev@env$serializedMode
}
.Object@prev <- prev
isPipelinable <- function(rdd) {
e <- rdd@env
# nolint start
!(e$isCached || e$isCheckpointed)
# nolint end
}
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
# This transformation is the first in its stage:
.Object@func <- cleanClosure(func)
.Object@prev_jrdd <- getJRDD(prev)
.Object@env$prev_serializedMode <- prev@env$serializedMode
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(partIndex, part) {
f <- prev@func
func(partIndex, f(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
# Get the serialization mode of the parent RDD
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
}
.Object
})
#' @rdname RDD
#' @noRd
#' @param jrdd Java object reference to the backing JavaRDD
#' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
#' stores strings, and "row" if the RDD stores the rows of a SparkDataFrame
#' @param isCached TRUE if the RDD is cached
#' @param isCheckpointed TRUE if the RDD has been checkpointed
RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
isCheckpointed = FALSE) {
new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
}
PipelinedRDD <- function(prev, func) {
new("PipelinedRDD", prev, func, NULL)
}
# Return the serialization mode for an RDD.
setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") })
# For normal RDDs we can directly read the serializedMode
setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode)
# For pipelined RDDs if jrdd_val is set then serializedMode should exist
# if not we return the defaultSerialization mode of "byte" as we don't know the serialization
# mode at this point in time.
setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"),
function(rdd) {
if (!is.null(rdd@env$jrdd_val)) {
return(rdd@env$serializedMode)
} else {
return("byte")
}
})
# The jrdd accessor function.
setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd)
setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
function(rdd, serializedMode = "byte") {
if (!is.null(rdd@env$jrdd_val)) {
return(rdd@env$jrdd_val)
}
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
serializedFuncArr <- serialize(rdd@func, connection = NULL)
prev_jrdd <- rdd@prev_jrdd
if (serializedMode == "string") {
rddRef <- newJObject("org.apache.spark.api.r.StringRRDD",
callJMethod(prev_jrdd, "rdd"),
serializedFuncArr,
rdd@env$prev_serializedMode,
packageNamesArr,
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
} else {
rddRef <- newJObject("org.apache.spark.api.r.RRDD",
callJMethod(prev_jrdd, "rdd"),
serializedFuncArr,
rdd@env$prev_serializedMode,
serializedMode,
packageNamesArr,
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
# Save the serialization flag after we create a RRDD
rdd@env$serializedMode <- serializedMode
rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD")
rdd@env$jrdd_val
})
setValidity("RDD",
function(object) {
jrdd <- getJRDD(object)
cls <- callJMethod(jrdd, "getClass")
className <- callJMethod(cls, "getName")
if (grep("spark.api.java.*RDD*", className) == 1) {
TRUE
} else {
paste("Invalid RDD class ", className)
}
})
############ Actions and Transformations ############
#' Persist an RDD
#'
#' Persist this RDD with the default storage level (MEMORY_ONLY).
#'
#' @param x The RDD to cache
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' cache(rdd)
#'}
#' @rdname cache-methods
#' @aliases cache,RDD-method
#' @noRd
setMethod("cacheRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "cache")
x@env$isCached <- TRUE
x
})
#' Persist an RDD
#'
#' Persist this RDD with the specified storage level. For details of the
#' supported storage levels, refer to
#'\url{http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence}.
#'
#' @param x The RDD to persist
#' @param newLevel The new storage level to be assigned
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' persistRDD(rdd, "MEMORY_AND_DISK")
#'}
#' @rdname persist
#' @aliases persist,RDD-method
#' @noRd
setMethod("persistRDD",
signature(x = "RDD", newLevel = "character"),
function(x, newLevel = "MEMORY_ONLY") {
callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
x@env$isCached <- TRUE
x
})
#' Unpersist an RDD
#'
#' Mark the RDD as non-persistent, and remove all blocks for it from memory and
#' disk.
#'
#' @param x The RDD to unpersist
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' cache(rdd) # rdd@@env$isCached == TRUE
#' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
#'}
#' @rdname unpersist
#' @aliases unpersist,RDD-method
#' @noRd
setMethod("unpersistRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "unpersist")
x@env$isCached <- FALSE
x
})
#' Checkpoint an RDD
#'
#' Mark this RDD for checkpointing. It will be saved to a file inside the
#' checkpoint directory set with setCheckpointDir() and all references to its
#' parent RDDs will be removed. This function must be called before any job has
#' been executed on this RDD. It is strongly recommended that this RDD is
#' persisted in memory, otherwise saving it on a file will require recomputation.
#'
#' @param x The RDD to checkpoint
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "checkpoint")
#' rdd <- parallelize(sc, 1:10, 2L)
#' checkpoint(rdd)
#'}
#' @rdname checkpoint-methods
#' @aliases checkpoint,RDD-method
#' @noRd
setMethod("checkpointRDD",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
callJMethod(jrdd, "checkpoint")
x@env$isCheckpointed <- TRUE
x
})
#' Gets the number of partitions of an RDD
#'
#' @param x A RDD.
#' @return the number of partitions of rdd as an integer.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' getNumPartitions(rdd) # 2L
#'}
#' @rdname getNumPartitions
#' @aliases getNumPartitions,RDD-method
#' @noRd
setMethod("getNumPartitionsRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "getNumPartitions")
})
#' Gets the number of partitions of an RDD, the same as getNumPartitions.
#' But this function has been deprecated, please use getNumPartitions.
#'
#' @rdname getNumPartitions
#' @aliases numPartitions,RDD-method
#' @noRd
setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
.Deprecated("getNumPartitions")
getNumPartitionsRDD(x)
})
#' Collect elements of an RDD
#'
#' @description
#' \code{collect} returns a list that contains all of the elements in this RDD.
#'
#' @param x The RDD to collect
#' @param ... Other optional arguments to collect
#' @param flatten FALSE if the list should not flattened
#' @return a list containing elements in the RDD
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' collectRDD(rdd) # list from 1 to 10
#' collectPartition(rdd, 0L) # list from 1 to 5
#'}
#' @rdname collect-methods
#' @aliases collect,RDD-method
#' @noRd
setMethod("collectRDD",
signature(x = "RDD"),
function(x, flatten = TRUE) {
# Assumes a pairwise RDD is backed by a JavaPairRDD.
collected <- callJMethod(getJRDD(x), "collect")
convertJListToRList(collected, flatten,
serializedMode = getSerializedMode(x))
})
#' @description
#' \code{collectPartition} returns a list that contains all of the elements
#' in the specified partition of the RDD.
#' @param partitionId the partition to collect (starts from 0)
#' @rdname collect-methods
#' @aliases collectPartition,integer,RDD-method
#' @noRd
setMethod("collectPartition",
signature(x = "RDD", partitionId = "integer"),
function(x, partitionId) {
jPartitionsList <- callJMethod(getJRDD(x),
"collectPartitions",
as.list(as.integer(partitionId)))
jList <- jPartitionsList[[1]]
convertJListToRList(jList, flatten = TRUE,
serializedMode = getSerializedMode(x))
})
#' @description
#' \code{collectAsMap} returns a named list as a map that contains all of the elements
#' in a key-value pair RDD.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
#'}
# nolint end
#' @rdname collect-methods
#' @aliases collectAsMap,RDD-method
#' @noRd
setMethod("collectAsMap",
signature(x = "RDD"),
function(x) {
pairList <- collectRDD(x)
map <- new.env()
lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) })
as.list(map)
})
#' Return the number of elements in the RDD.
#'
#' @param x The RDD to count
#' @return number of elements in the RDD.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' countRDD(rdd) # 10
#' length(rdd) # Same as count
#'}
#' @rdname count
#' @aliases count,RDD-method
#' @noRd
setMethod("countRDD",
signature(x = "RDD"),
function(x) {
countPartition <- function(part) {
as.integer(length(part))
}
valsRDD <- lapplyPartition(x, countPartition)
vals <- collectRDD(valsRDD)
sum(as.integer(vals))
})
#' Return the number of elements in the RDD
#' @rdname count
#' @noRd
setMethod("lengthRDD",
signature(x = "RDD"),
function(x) {
countRDD(x)
})
#' Return the count of each unique value in this RDD as a list of
#' (value, count) pairs.
#'
#' Same as countByValue in Spark.
#'
#' @param x The RDD to count
#' @return list of (value, count) pairs, where count is number of each unique
#' value in rdd.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, c(1,2,3,2,1))
#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
#'}
# nolint end
#' @rdname countByValue
#' @aliases countByValue,RDD-method
#' @noRd
setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
})
#' Apply a function to all elements
#'
#' This function creates a new RDD by applying the given transformation to all
#' elements of the given RDD
#'
#' @param X The RDD to apply the transformation.
#' @param FUN the transformation to apply on each element
#' @return a new RDD created by the transformation.
#' @rdname lapply
#' @noRd
#' @aliases lapply
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
#' collectRDD(multiplyByTwo) # 2,4,6...
#'}
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
func <- function(partIndex, part) {
lapply(part, FUN)
}
lapplyPartitionsWithIndex(X, func)
})
#' @rdname lapply
#' @aliases map,RDD,function-method
#' @noRd
setMethod("map",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})
#' Flatten results after applying a function to all elements
#'
#' This function returns a new RDD by first applying a function to all
#' elements of this RDD, and then flattening the results.
#'
#' @param X The RDD to apply the transformation.
#' @param FUN the transformation to apply on each element
#' @return a new RDD created by the transformation.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
#' collectRDD(multiplyByTwo) # 2,20,4,40,6,60...
#'}
#' @rdname flatMap
#' @aliases flatMap,RDD,function-method
#' @noRd
setMethod("flatMap",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
partitionFunc <- function(part) {
unlist(
lapply(part, FUN),
recursive = F
)
}
lapplyPartition(X, partitionFunc)
})
#' Apply a function to each partition of an RDD
#'
#' Return a new RDD by applying a function to each partition of this RDD.
#'
#' @param X The RDD to apply the transformation.
#' @param FUN the transformation to apply on each partition.
#' @return a new RDD created by the transformation.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
#' collectRDD(partitionSum) # 15, 40
#'}
#' @rdname lapplyPartition
#' @aliases lapplyPartition,RDD,function-method
#' @noRd
setMethod("lapplyPartition",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
})
#' mapPartitions is the same as lapplyPartition.
#'
#' @rdname lapplyPartition
#' @aliases mapPartitions,RDD,function-method
#' @noRd
setMethod("mapPartitions",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})
#' Return a new RDD by applying a function to each partition of this RDD, while
#' tracking the index of the original partition.
#'
#' @param X The RDD to apply the transformation.
#' @param FUN the transformation to apply on each partition; takes the partition
#' index and a list of elements in the particular partition.
#' @return a new RDD created by the transformation.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 5L)
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
#' partIndex * Reduce("+", part) })
#' collectRDD(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#'}
#' @rdname lapplyPartitionsWithIndex
#' @aliases lapplyPartitionsWithIndex,RDD,function-method
#' @noRd
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
PipelinedRDD(X, FUN)
})
#' @rdname lapplyPartitionsWithIndex
#' @aliases mapPartitionsWithIndex,RDD,function-method
#' @noRd
setMethod("mapPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartitionsWithIndex(X, FUN)
})
#' This function returns a new RDD containing only the elements that satisfy
#' a predicate (i.e. returning TRUE in a given logical function).
#' The same as `filter()' in Spark.
#'
#' @param x The RDD to be filtered.
#' @param f A unary predicate function.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' unlist(collectRDD(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
#'}
# nolint end
#' @rdname filterRDD
#' @aliases filterRDD,RDD,function-method
#' @noRd
setMethod("filterRDD",
signature(x = "RDD", f = "function"),
function(x, f) {
filter.func <- function(part) {
Filter(f, part)
}
lapplyPartition(x, filter.func)
})
#' @rdname filterRDD
#' @aliases Filter
#' @noRd
setMethod("Filter",
signature(f = "function", x = "RDD"),
function(f, x) {
filterRDD(x, f)
})
#' Reduce across elements of an RDD.
#'
#' This function reduces the elements of this RDD using the
#' specified commutative and associative binary operator.
#'
#' @param x The RDD to reduce
#' @param func Commutative and associative function to apply on elements
#' of the RDD.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' reduce(rdd, "+") # 55
#'}
#' @rdname reduce
#' @aliases reduce,RDD,ANY-method
#' @noRd
setMethod("reduce",
signature(x = "RDD", func = "ANY"),
function(x, func) {
reducePartition <- function(part) {
Reduce(func, part)
}
partitionList <- collectRDD(lapplyPartition(x, reducePartition),
flatten = FALSE)
Reduce(func, partitionList)
})
#' Get the maximum element of an RDD.
#'
#' @param x The RDD to get the maximum element from
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' maximum(rdd) # 10
#'}
#' @rdname maximum
#' @aliases maximum,RDD
#' @noRd
setMethod("maximum",
signature(x = "RDD"),
function(x) {
reduce(x, max)
})
#' Get the minimum element of an RDD.
#'
#' @param x The RDD to get the minimum element from
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' minimum(rdd) # 1
#'}
#' @rdname minimum
#' @aliases minimum,RDD
#' @noRd
setMethod("minimum",
signature(x = "RDD"),
function(x) {
reduce(x, min)
})
#' Add up the elements in an RDD.
#'
#' @param x The RDD to add up the elements in
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' sumRDD(rdd) # 55
#'}
#' @rdname sumRDD
#' @aliases sumRDD,RDD
#' @noRd
setMethod("sumRDD",
signature(x = "RDD"),
function(x) {
reduce(x, "+")
})
#' Applies a function to all elements in an RDD, and forces evaluation.
#'
#' @param x The RDD to apply the function
#' @param func The function to be applied.
#' @return invisible NULL.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' foreach(rdd, function(x) { save(x, file=...) })
#'}
#' @rdname foreach
#' @aliases foreach,RDD,function-method
#' @noRd
setMethod("foreach",
signature(x = "RDD", func = "function"),
function(x, func) {
partition.func <- function(x) {
lapply(x, func)
NULL
}
invisible(collectRDD(mapPartitions(x, partition.func)))
})
#' Applies a function to each partition in an RDD, and forces evaluation.
#'
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' foreachPartition(rdd, function(part) { save(part, file=...); NULL })
#'}
#' @rdname foreach
#' @aliases foreachPartition,RDD,function-method
#' @noRd
setMethod("foreachPartition",
signature(x = "RDD", func = "function"),
function(x, func) {
invisible(collectRDD(mapPartitions(x, func)))
})
#' Take elements from an RDD.
#'
#' This function takes the first NUM elements in the RDD and
#' returns them in a list.
#'
#' @param x The RDD to take elements from
#' @param num Number of elements to take
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' takeRDD(rdd, 2L) # list(1, 2)
#'}
# nolint end
#' @rdname take
#' @aliases take,RDD,numeric-method
#' @noRd
setMethod("takeRDD",
signature(x = "RDD", num = "numeric"),
function(x, num) {
resList <- list()
index <- -1
jrdd <- getJRDD(x)
numPartitions <- getNumPartitionsRDD(x)
serializedModeRDD <- getSerializedMode(x)
# TODO(shivaram): Collect more than one partition based on size
# estimates similar to the scala version of `take`.
while (TRUE) {
index <- index + 1
if (length(resList) >= num || index >= numPartitions)
break
# a JList of byte arrays
partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
partition <- partitionArr[[1]]
size <- num - length(resList)
# elems is capped to have at most `size` elements
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = size,
serializedMode = serializedModeRDD)
resList <- append(resList, elems)
}
resList
})
#' First
#'
#' Return the first element of an RDD
#'
#' @rdname first
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' firstRDD(rdd)
#' }
#' @noRd
setMethod("firstRDD",
signature(x = "RDD"),
function(x) {
takeRDD(x, 1)[[1]]
})
#' Removes the duplicates from RDD.
#'
#' This function returns a new RDD containing the distinct elements in the
#' given RDD. The same as `distinct()' in Spark.
#'
#' @param x The RDD to remove duplicates from.
#' @param numPartitions Number of partitions to create.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
#' sort(unlist(collectRDD(distinctRDD(rdd)))) # c(1, 2, 3)
#'}
# nolint end
#' @rdname distinct
#' @aliases distinct,RDD-method
#' @noRd
setMethod("distinctRDD",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
numPartitions)
resRDD <- lapply(reduced, function(x) { x[[1]] })
resRDD
})
#' Return an RDD that is a sampled subset of the given RDD.
#'
#' The same as `sample()' in Spark. (We rename it due to signature
#' inconsistencies with the `sample()' function in R's base package.)
#'
#' @param x The RDD to sample elements from
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @param seed Randomness seed value
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' collectRDD(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
#' collectRDD(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#'}
#' @rdname sampleRDD
#' @aliases sampleRDD,RDD
#' @noRd
setMethod("sampleRDD",
signature(x = "RDD", withReplacement = "logical",
fraction = "numeric", seed = "integer"),
function(x, withReplacement, fraction, seed) {
# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(partIndex, part) {
set.seed(seed)
res <- vector("list", length(part))
len <- 0
# Discards some random values to ensure each partition has a
# different random seed.
stats::runif(partIndex)
for (elem in part) {
if (withReplacement) {
count <- stats::rpois(1, fraction)
if (count > 0) {
res[(len + 1) : (len + count)] <- rep(list(elem), count)
len <- len + count
}
} else {
if (stats::runif(1) < fraction) {
len <- len + 1
res[[len]] <- elem
}
}
}
# TODO(zongheng): look into the performance of the current
# implementation. Look into some iterator package? Note that
# Scala avoids many calls to creating an empty list and PySpark
# similarly achieves this using `yield'.
if (len > 0)
res[1:len]
else
list()
}
lapplyPartitionsWithIndex(x, samplingFunc)
})
#' Return a list of the elements that are a sampled subset of the given RDD.
#'
#' @param x The RDD to sample elements from
#' @param withReplacement Sampling with replacement or not
#' @param num Number of elements to return
#' @param seed Randomness seed value
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:100)
#' # exactly 5 elements sampled, which may not be distinct
#' takeSample(rdd, TRUE, 5L, 1618L)
#' # exactly 5 distinct elements sampled
#' takeSample(rdd, FALSE, 5L, 16181618L)
#'}
#' @rdname takeSample
#' @aliases takeSample,RDD
#' @noRd
setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
num = "integer", seed = "integer"),
function(x, withReplacement, num, seed) {
# This function is ported from RDD.scala.
fraction <- 0.0
total <- 0
multiplier <- 3.0
initialCount <- countRDD(x)
maxSelected <- 0
MAXINT <- .Machine$integer.max
if (num < 0)
stop("Negative number of elements requested")
if (initialCount > MAXINT - 1) {
maxSelected <- MAXINT - 1
} else {
maxSelected <- initialCount
}
if (num > initialCount && !withReplacement) {
total <- maxSelected
fraction <- multiplier * (maxSelected + 1) / initialCount
} else {
total <- num
fraction <- multiplier * (num + 1) / initialCount
}
set.seed(seed)
samples <- collectRDD(sampleRDD(x, withReplacement, fraction,
as.integer(ceiling(stats::runif(1,
-MAXINT,
MAXINT)))))
# If the first sample didn't turn out large enough, keep trying to
# take samples; this shouldn't happen often because we use a big
# multiplier for the initial size
while (length(samples) < total)
samples <- collectRDD(sampleRDD(x, withReplacement, fraction,
as.integer(ceiling(stats::runif(1,
-MAXINT,
MAXINT)))))
# TODO(zongheng): investigate if this call is an in-place shuffle?
base::sample(samples)[1:total]
})
#' Creates tuples of the elements in this RDD by applying a function.
#'
#' @param x The RDD.
#' @param func The function to be applied.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3))
#' collectRDD(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
#'}
# nolint end
#' @rdname keyBy
#' @aliases keyBy,RDD
#' @noRd
setMethod("keyBy",
signature(x = "RDD", func = "function"),
function(x, func) {
apply.func <- function(x) {
list(func(x), x)
}
lapply(x, apply.func)
})
#' Return a new RDD that has exactly numPartitions partitions.
#' Can increase or decrease the level of parallelism in this RDD. Internally,
#' this uses a shuffle to redistribute data.
#' If you are decreasing the number of partitions in this RDD, consider using
#' coalesce, which can avoid performing a shuffle.
#'
#' @param x The RDD.
#' @param numPartitions Number of partitions to create.
#' @seealso coalesce
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
#' getNumPartitions(rdd) # 4
#' getNumPartitions(repartitionRDD(rdd, 2L)) # 2
#'}
#' @rdname repartition
#' @aliases repartition,RDD
#' @noRd
setMethod("repartitionRDD",
signature(x = "RDD"),
function(x, numPartitions) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
coalesceRDD(x, numPartitions, TRUE)
} else {
stop("Please, specify the number of partitions")
}
})
#' Return a new RDD that is reduced into numPartitions partitions.
#'
#' @param x The RDD.
#' @param numPartitions Number of partitions to create.
#' @seealso repartition
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
#' getNumPartitions(rdd) # 3
#' getNumPartitions(coalesce(rdd, 1L)) # 1
#'}
#' @rdname coalesce
#' @aliases coalesce,RDD
#' @noRd
setMethod("coalesceRDD",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
pos <- (start + i) %% numPartitions
list(pos, part[[i]])
})
}
shuffled <- lapplyPartitionsWithIndex(x, func)
repartitioned <- partitionByRDD(shuffled, numPartitions)
values(repartitioned)
} else {
jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle)
RDD(jrdd)
}
})
#' Save this RDD as a SequenceFile of serialized objects.
#'
#' @param x The RDD to save
#' @param path The directory where the file is saved
#' @seealso objectFile
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:3)
#' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
#'}
#' @rdname saveAsObjectFile
#' @aliases saveAsObjectFile,RDD
#' @noRd
setMethod("saveAsObjectFile",
signature(x = "RDD", path = "character"),
function(x, path) {
# If serializedMode == "string" we need to serialize the data before saving it since
# objectFile() assumes serializedMode == "byte".
if (getSerializedMode(x) != "byte") {
x <- serializeToBytes(x)
}
# Return nothing
invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
})
#' Save this RDD as a text file, using string representations of elements.
#'
#' @param x The RDD to save
#' @param path The directory where the partitions of the text file are saved
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:3)
#' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
#'}
#' @rdname saveAsTextFile
#' @aliases saveAsTextFile,RDD
#' @noRd
setMethod("saveAsTextFile",
signature(x = "RDD", path = "character"),
function(x, path) {
func <- function(str) {
toString(str)
}
stringRdd <- lapply(x, func)
# Return nothing
invisible(
callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
})
#' Sort an RDD by the given key function.
#'
#' @param x An RDD to be sorted.
#' @param func A function used to compute the sort key for each element.
#' @param ascending A flag to indicate whether the sorting is ascending or descending.
#' @param numPartitions Number of partitions to create.
#' @return An RDD where all elements are sorted.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(3, 2, 1))
#' collectRDD(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
#'}
# nolint end
#' @rdname sortBy
#' @aliases sortBy,RDD,RDD-method
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})
# Helper function to get first N elements from an RDD in the specified order.
# Param:
# x An RDD.
# num Number of elements to return.
# ascending A flag to indicate whether the sorting is ascending or descending.
# Return:
# A list of the first N elements from the RDD in the specified order.
#
takeOrderedElem <- function(x, num, ascending = TRUE) {
if (num <= 0L) {
return(list())
}
partitionFunc <- function(part) {
if (num < length(part)) {
# R limitation: order works only on primitive types!
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
part[ord[1:num]]
} else {
part
}
}
newRdd <- mapPartitions(x, partitionFunc)
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- getNumPartitionsRDD(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)
while (TRUE) {
index <- index + 1
if (index >= numPartitions) {
ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
resList <- resList[ord[1:num]]
break
}
# a JList of byte arrays
partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
partition <- partitionArr[[1]]
# elems is capped to have at most `num` elements
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = num,
serializedMode = serializedModeRDD)
resList <- append(resList, elems)
}
resList
}
#' Returns the first N elements from an RDD in ascending order.
#'
#' @param x An RDD.
#' @param num Number of elements to return.
#' @return The first N elements from the RDD in ascending order.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
#'}
# nolint end
#' @rdname takeOrdered
#' @aliases takeOrdered,RDD,RDD-method
#' @noRd
setMethod("takeOrdered",
signature(x = "RDD", num = "integer"),
function(x, num) {
takeOrderedElem(x, num)
})
#' Returns the top N elements from an RDD.
#'
#' @param x An RDD.
#' @param num Number of elements to return.
#' @return The top N elements from the RDD.
#' @rdname top
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
#'}
# nolint end
#' @aliases top,RDD,RDD-method
#' @noRd
setMethod("top",
signature(x = "RDD", num = "integer"),
function(x, num) {
takeOrderedElem(x, num, FALSE)
})
#' Fold an RDD using a given associative function and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using a given associative function and a neutral "zero value".
#'
#' @param x An RDD.
#' @param zeroValue A neutral "zero value".
#' @param op An associative function for the folding operation.
#' @return The folding result.
#' @rdname fold
#' @seealso reduce
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
#' fold(rdd, 0, "+") # 15
#'}
#' @aliases fold,RDD,RDD-method
#' @noRd
setMethod("fold",
signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
function(x, zeroValue, op) {
aggregateRDD(x, zeroValue, op, op)
})
#' Aggregate an RDD using the given combine functions and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using given combine functions and a neutral "zero value".
#'
#' @param x An RDD.
#' @param zeroValue A neutral "zero value".
#' @param seqOp A function to aggregate the RDD elements. It may return a different
#' result type from the type of the RDD elements.
#' @param combOp A function to aggregate results of seqOp.
#' @return The aggregation result.
#' @rdname aggregateRDD
#' @seealso reduce
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4))
#' zeroValue <- list(0, 0)
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
#'}
# nolint end
#' @aliases aggregateRDD,RDD,RDD-method
#' @noRd
setMethod("aggregateRDD",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
function(x, zeroValue, seqOp, combOp) {
partitionFunc <- function(part) {
Reduce(seqOp, part, zeroValue)
}
partitionList <- collectRDD(lapplyPartition(x, partitionFunc),
flatten = FALSE)
Reduce(combOp, partitionList, zeroValue)
})
#' Pipes elements to a forked external process.
#'
#' The same as 'pipe()' in Spark.
#'
#' @param x The RDD whose elements are piped to the forked external process.
#' @param command The command to fork an external process.
#' @param env A named list to set environment variables of the external process.
#' @return A new RDD created by piping all elements to a forked external process.
#' @rdname pipeRDD
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' pipeRDD(rdd, "more")
#' Output: c("1", "2", ..., "10")
#'}
#' @aliases pipeRDD,RDD,character-method
#' @noRd
setMethod("pipeRDD",
signature(x = "RDD", command = "character"),
function(x, command, env = list()) {
func <- function(part) {
trim_trailing_func <- function(x) {
sub("[\r\n]*$", "", toString(x))
}
input <- unlist(lapply(part, trim_trailing_func))
res <- system2(command, stdout = TRUE, input = input, env = env)
lapply(res, trim_trailing_func)
}
lapplyPartition(x, func)
})
#' TODO: Consider caching the name in the RDD's environment
#' Return an RDD's name.
#'
#' @param x The RDD whose name is returned.
#' @rdname name
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1,2,3))
#' name(rdd) # NULL (if not set before)
#'}
#' @aliases name,RDD
#' @noRd
setMethod("name",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "name")
})
#' Set an RDD's name.
#'
#' @param x The RDD whose name is to be set.
#' @param name The RDD name to be set.
#' @return a new RDD renamed.
#' @rdname setName
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1,2,3))
#' setName(rdd, "myRDD")
#' name(rdd) # "myRDD"
#'}
#' @aliases setName,RDD
#' @noRd
setMethod("setName",
signature(x = "RDD", name = "character"),
function(x, name) {
callJMethod(getJRDD(x), "setName", name)
x
})
#' Zip an RDD with generated unique Long IDs.
#'
#' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
#' n is the number of partitions. So there may exist gaps, but this
#' method won't trigger a spark job, which is different from
#' zipWithIndex.
#'
#' @param x An RDD to be zipped.
#' @return An RDD with zipped items.
#' @seealso zipWithIndex
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
#' collectRDD(zipWithUniqueId(rdd))
#' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
#'}
# nolint end
#' @rdname zipWithUniqueId
#' @aliases zipWithUniqueId,RDD
#' @noRd
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
n <- getNumPartitionsRDD(x)
partitionFunc <- function(partIndex, part) {
mapply(
function(item, index) {
list(item, (index - 1) * n + partIndex)
},
part,
seq_along(part),
SIMPLIFY = FALSE)
}
lapplyPartitionsWithIndex(x, partitionFunc)
})
#' Zip an RDD with its element indices.
#'
#' The ordering is first based on the partition index and then the
#' ordering of items within each partition. So the first item in
#' the first partition gets index 0, and the last item in the last
#' partition receives the largest index.
#'
#' This method needs to trigger a Spark job when this RDD contains
#' more than one partition.
#'
#' @param x An RDD to be zipped.
#' @return An RDD with zipped items.
#' @seealso zipWithUniqueId
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
#' collectRDD(zipWithIndex(rdd))
#' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
#'}
# nolint end
#' @rdname zipWithIndex
#' @aliases zipWithIndex,RDD
#' @noRd
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
n <- getNumPartitionsRDD(x)
if (n > 1) {
nums <- collectRDD(lapplyPartition(x,
function(part) {
list(length(part))
}))
startIndices <- Reduce("+", nums, accumulate = TRUE)
}
partitionFunc <- function(partIndex, part) {
if (partIndex == 0) {
startIndex <- 0
} else {
startIndex <- startIndices[[partIndex]]
}
mapply(
function(item, index) {
list(item, index - 1 + startIndex)
},
part,
seq_along(part),
SIMPLIFY = FALSE)
}
lapplyPartitionsWithIndex(x, partitionFunc)
})
#' Coalesce all elements within each partition of an RDD into a list.
#'
#' @param x An RDD.
#' @return An RDD created by coalescing all elements within
#' each partition into a list.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, as.list(1:4), 2L)
#' collectRDD(glom(rdd))
#' # list(list(1, 2), list(3, 4))
#'}
# nolint end
#' @rdname glom
#' @aliases glom,RDD
#' @noRd
setMethod("glom",
signature(x = "RDD"),
function(x) {
partitionFunc <- function(part) {
list(part)
}
lapplyPartition(x, partitionFunc)
})
############ Binary Functions #############
#' Return the union RDD of two RDDs.
#' The same as union() in Spark.
#'
#' @param x An RDD.
#' @param y An RDD.
#' @return a new RDD created by performing the simple union (without removing
#' duplicates) of two input RDDs.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:3)
#' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
#'}
#' @rdname unionRDD
#' @aliases unionRDD,RDD,RDD-method
#' @noRd
setMethod("unionRDD",
signature(x = "RDD", y = "RDD"),
function(x, y) {
if (getSerializedMode(x) == getSerializedMode(y)) {
jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
union.rdd <- RDD(jrdd, getSerializedMode(x))
} else {
# One of the RDDs is not serialized, we need to serialize it first.
if (getSerializedMode(x) != "byte") x <- serializeToBytes(x)
if (getSerializedMode(y) != "byte") y <- serializeToBytes(y)
jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
union.rdd <- RDD(jrdd, "byte")
}
union.rdd
})
#' Zip an RDD with another RDD.
#'
#' Zips this RDD with another one, returning key-value pairs with the
#' first element in each RDD second element in each RDD, etc. Assumes
#' that the two RDDs have the same number of partitions and the same
#' number of elements in each partition (e.g. one was made through
#' a map on the other).
#'
#' @param x An RDD to be zipped.
#' @param other Another RDD to be zipped.
#' @return An RDD zipped from the two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 0:4)
#' rdd2 <- parallelize(sc, 1000:1004)
#' collectRDD(zipRDD(rdd1, rdd2))
#' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
#'}
# nolint end
#' @rdname zipRDD
#' @aliases zipRDD,RDD
#' @noRd
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
n1 <- getNumPartitionsRDD(x)
n2 <- getNumPartitionsRDD(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
mergePartitions(rdd, TRUE)
})
#' Cartesian product of this RDD and another one.
#'
#' Return the Cartesian product of this RDD and another one,
#' that is, the RDD of all pairs of elements (a, b) where a
#' is in this and b is in other.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @return A new RDD which is the Cartesian product of these two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2)
#' sortByKey(cartesian(rdd, rdd))
#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
#'}
# nolint end
#' @rdname cartesian
#' @aliases cartesian,RDD,RDD-method
#' @noRd
setMethod("cartesian",
signature(x = "RDD", other = "RDD"),
function(x, other) {
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
mergePartitions(rdd, FALSE)
})
#' Subtract an RDD with another RDD.
#'
#' Return an RDD with the elements from this that are not in other.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @param numPartitions Number of the partitions in the result RDD.
#' @return An RDD with the elements from this that are not in other.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
#' rdd2 <- parallelize(sc, list(2, 4))
#' collectRDD(subtract(rdd1, rdd2))
#' # list(1, 1, 3)
#'}
# nolint end
#' @rdname subtract
#' @aliases subtract,RDD
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
keys(subtractByKey(rdd1, rdd2, numPartitions))
})
#' Intersection of this RDD and another one.
#'
#' Return the intersection of this RDD and another one.
#' The output will not contain any duplicate elements,
#' even if the input RDDs did. Performs a hash partition
#' across the cluster.
#' Note that this method performs a shuffle internally.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @param numPartitions The number of partitions in the result RDD.
#' @return An RDD which is the intersection of these two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
#' collectRDD(sortBy(intersection(rdd1, rdd2), function(x) { x }))
#' # list(1, 2, 3)
#'}
# nolint end
#' @rdname intersection
#' @aliases intersection,RDD
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })
filterFunction <- function(elem) {
iters <- elem[[2]]
all(as.vector(
lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
}
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})
#' Zips an RDD's partitions with one (or more) RDD(s).
#' Same as zipPartitions in Spark.
#'
#' @param ... RDDs to be zipped.
#' @param func A function to transform zipped partitions.
#' @return A new RDD by applying a function to the zipped partitions.
#' Assumes that all the RDDs have the *same number of partitions*, but
#' does *not* require them to have the same number of elements in each partition.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
#' collectRDD(zipPartitions(rdd1, rdd2, rdd3,
#' func = function(x, y, z) { list(list(x, y, z))} ))
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
#'}
# nolint end
#' @rdname zipRDD
#' @aliases zipPartitions,RDD
#' @noRd
setMethod("zipPartitions",
"RDD",
function(..., func) {
rrdds <- list(...)
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, getNumPartitionsRDD)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}
rrdds <- lapply(rrdds, function(rdd) {
mapPartitionsWithIndex(rdd, function(partIndex, part) {
print(length(part))
list(list(partIndex, part))
})
})
union.rdd <- Reduce(unionRDD, rrdds)
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
res <- mapPartitions(zipped.rdd, function(plist) {
do.call(func, plist[[1]])
})
res
})