| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # RDD in R implemented in S4 OO system. |
| |
| setOldClass("jobj") |
| |
| #' S4 class that represents an RDD |
| #' |
| #' RDD can be created using functions like |
| #' \code{parallelize}, \code{textFile} etc. |
| #' |
| #' @rdname RDD |
| #' @seealso parallelize, textFile |
| #' @slot env An R environment that stores bookkeeping states of the RDD |
| #' @slot jrdd Java object reference to the backing JavaRDD |
| #' to an RDD |
| #' @noRd |
| setClass("RDD", |
| slots = list(env = "environment", |
| jrdd = "jobj")) |
| |
| setClass("PipelinedRDD", |
| slots = list(prev = "RDD", |
| func = "function", |
| prev_jrdd = "jobj"), |
| contains = "RDD") |
| |
| setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode, |
| isCached, isCheckpointed) { |
| # Check that RDD constructor is using the correct version of serializedMode |
| stopifnot(class(serializedMode) == "character") |
| stopifnot(serializedMode %in% c("byte", "string", "row")) |
| # RDD has three serialization types: |
| # byte: The RDD stores data serialized in R. |
| # string: The RDD stores data as strings. |
| # row: The RDD stores the serialized rows of a SparkDataFrame. |
| |
| # We use an environment to store mutable states inside an RDD object. |
| # Note that R's call-by-value semantics makes modifying slots inside an |
| # object (passed as an argument into a function, such as cache()) difficult: |
| # i.e. one needs to make a copy of the RDD object and sets the new slot value |
| # there. |
| |
| # The slots are inheritable from superclass. Here, both `env' and `jrdd' are |
| # inherited from RDD, but only the former is used. |
| .Object@env <- new.env() |
| .Object@env$isCached <- isCached |
| .Object@env$isCheckpointed <- isCheckpointed |
| .Object@env$serializedMode <- serializedMode |
| |
| .Object@jrdd <- jrdd |
| .Object |
| }) |
| |
| setMethod("showRDD", "RDD", |
| function(object) { |
| cat(paste0(callJMethod(getJRDD(object), "toString"), "\n")) |
| }) |
| |
| setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) { |
| .Object@env <- new.env() |
| .Object@env$isCached <- FALSE |
| .Object@env$isCheckpointed <- FALSE |
| .Object@env$jrdd_val <- jrdd_val |
| if (!is.null(jrdd_val)) { |
| # This tracks the serialization mode for jrdd_val |
| .Object@env$serializedMode <- prev@env$serializedMode |
| } |
| |
| .Object@prev <- prev |
| |
| isPipelinable <- function(rdd) { |
| e <- rdd@env |
| # nolint start |
| !(e$isCached || e$isCheckpointed) |
| # nolint end |
| } |
| |
| if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) { |
| # This transformation is the first in its stage: |
| .Object@func <- cleanClosure(func) |
| .Object@prev_jrdd <- getJRDD(prev) |
| .Object@env$prev_serializedMode <- prev@env$serializedMode |
| # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD |
| # prev_serializedMode is used during the delayed computation of JRDD in getJRDD |
| } else { |
| pipelinedFunc <- function(partIndex, part) { |
| f <- prev@func |
| func(partIndex, f(partIndex, part)) |
| } |
| .Object@func <- cleanClosure(pipelinedFunc) |
| .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline |
| # Get the serialization mode of the parent RDD |
| .Object@env$prev_serializedMode <- prev@env$prev_serializedMode |
| } |
| |
| .Object |
| }) |
| |
| #' @rdname RDD |
| #' @noRd |
| #' @param jrdd Java object reference to the backing JavaRDD |
| #' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD |
| #' stores strings, and "row" if the RDD stores the rows of a SparkDataFrame |
| #' @param isCached TRUE if the RDD is cached |
| #' @param isCheckpointed TRUE if the RDD has been checkpointed |
| RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE, |
| isCheckpointed = FALSE) { |
| new("RDD", jrdd, serializedMode, isCached, isCheckpointed) |
| } |
| |
| PipelinedRDD <- function(prev, func) { |
| new("PipelinedRDD", prev, func, NULL) |
| } |
| |
| # Return the serialization mode for an RDD. |
| setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") }) |
| # For normal RDDs we can directly read the serializedMode |
| setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode) |
| # For pipelined RDDs if jrdd_val is set then serializedMode should exist |
| # if not we return the defaultSerialization mode of "byte" as we don't know the serialization |
| # mode at this point in time. |
| setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"), |
| function(rdd) { |
| if (!is.null(rdd@env$jrdd_val)) { |
| return(rdd@env$serializedMode) |
| } else { |
| return("byte") |
| } |
| }) |
| |
| # The jrdd accessor function. |
| setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd) |
| setMethod("getJRDD", signature(rdd = "PipelinedRDD"), |
| function(rdd, serializedMode = "byte") { |
| if (!is.null(rdd@env$jrdd_val)) { |
| return(rdd@env$jrdd_val) |
| } |
| |
| packageNamesArr <- serialize(.sparkREnv[[".packages"]], |
| connection = NULL) |
| |
| broadcastArr <- lapply(ls(.broadcastNames), |
| function(name) { get(name, .broadcastNames) }) |
| |
| serializedFuncArr <- serialize(rdd@func, connection = NULL) |
| |
| prev_jrdd <- rdd@prev_jrdd |
| |
| if (serializedMode == "string") { |
| rddRef <- newJObject("org.apache.spark.api.r.StringRRDD", |
| callJMethod(prev_jrdd, "rdd"), |
| serializedFuncArr, |
| rdd@env$prev_serializedMode, |
| packageNamesArr, |
| broadcastArr, |
| callJMethod(prev_jrdd, "classTag")) |
| } else { |
| rddRef <- newJObject("org.apache.spark.api.r.RRDD", |
| callJMethod(prev_jrdd, "rdd"), |
| serializedFuncArr, |
| rdd@env$prev_serializedMode, |
| serializedMode, |
| packageNamesArr, |
| broadcastArr, |
| callJMethod(prev_jrdd, "classTag")) |
| } |
| # Save the serialization flag after we create a RRDD |
| rdd@env$serializedMode <- serializedMode |
| rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") |
| rdd@env$jrdd_val |
| }) |
| |
| setValidity("RDD", |
| function(object) { |
| jrdd <- getJRDD(object) |
| cls <- callJMethod(jrdd, "getClass") |
| className <- callJMethod(cls, "getName") |
| if (grep("spark.api.java.*RDD*", className) == 1) { |
| TRUE |
| } else { |
| paste("Invalid RDD class ", className) |
| } |
| }) |
| |
| |
| ############ Actions and Transformations ############ |
| |
| #' Persist an RDD |
| #' |
| #' Persist this RDD with the default storage level (MEMORY_ONLY). |
| #' |
| #' @param x The RDD to cache |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10, 2L) |
| #' cache(rdd) |
| #'} |
| #' @rdname cache-methods |
| #' @aliases cache,RDD-method |
| #' @noRd |
| setMethod("cacheRDD", |
| signature(x = "RDD"), |
| function(x) { |
| callJMethod(getJRDD(x), "cache") |
| x@env$isCached <- TRUE |
| x |
| }) |
| |
| #' Persist an RDD |
| #' |
| #' Persist this RDD with the specified storage level. For details of the |
| #' supported storage levels, refer to |
| #'\url{http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence}. |
| #' |
| #' @param x The RDD to persist |
| #' @param newLevel The new storage level to be assigned |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10, 2L) |
| #' persistRDD(rdd, "MEMORY_AND_DISK") |
| #'} |
| #' @rdname persist |
| #' @aliases persist,RDD-method |
| #' @noRd |
| setMethod("persistRDD", |
| signature(x = "RDD", newLevel = "character"), |
| function(x, newLevel = "MEMORY_ONLY") { |
| callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel)) |
| x@env$isCached <- TRUE |
| x |
| }) |
| |
| #' Unpersist an RDD |
| #' |
| #' Mark the RDD as non-persistent, and remove all blocks for it from memory and |
| #' disk. |
| #' |
| #' @param x The RDD to unpersist |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10, 2L) |
| #' cache(rdd) # rdd@@env$isCached == TRUE |
| #' unpersistRDD(rdd) # rdd@@env$isCached == FALSE |
| #'} |
| #' @rdname unpersist |
| #' @aliases unpersist,RDD-method |
| #' @noRd |
| setMethod("unpersistRDD", |
| signature(x = "RDD"), |
| function(x) { |
| callJMethod(getJRDD(x), "unpersist") |
| x@env$isCached <- FALSE |
| x |
| }) |
| |
| #' Checkpoint an RDD |
| #' |
| #' Mark this RDD for checkpointing. It will be saved to a file inside the |
| #' checkpoint directory set with setCheckpointDir() and all references to its |
| #' parent RDDs will be removed. This function must be called before any job has |
| #' been executed on this RDD. It is strongly recommended that this RDD is |
| #' persisted in memory, otherwise saving it on a file will require recomputation. |
| #' |
| #' @param x The RDD to checkpoint |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' setCheckpointDir(sc, "checkpoint") |
| #' rdd <- parallelize(sc, 1:10, 2L) |
| #' checkpoint(rdd) |
| #'} |
| #' @rdname checkpoint-methods |
| #' @aliases checkpoint,RDD-method |
| #' @noRd |
| setMethod("checkpointRDD", |
| signature(x = "RDD"), |
| function(x) { |
| jrdd <- getJRDD(x) |
| callJMethod(jrdd, "checkpoint") |
| x@env$isCheckpointed <- TRUE |
| x |
| }) |
| |
| #' Gets the number of partitions of an RDD |
| #' |
| #' @param x A RDD. |
| #' @return the number of partitions of rdd as an integer. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10, 2L) |
| #' getNumPartitions(rdd) # 2L |
| #'} |
| #' @rdname getNumPartitions |
| #' @aliases getNumPartitions,RDD-method |
| #' @noRd |
| setMethod("getNumPartitionsRDD", |
| signature(x = "RDD"), |
| function(x) { |
| callJMethod(getJRDD(x), "getNumPartitions") |
| }) |
| |
| #' Gets the number of partitions of an RDD, the same as getNumPartitions. |
| #' But this function has been deprecated, please use getNumPartitions. |
| #' |
| #' @rdname getNumPartitions |
| #' @aliases numPartitions,RDD-method |
| #' @noRd |
| setMethod("numPartitions", |
| signature(x = "RDD"), |
| function(x) { |
| .Deprecated("getNumPartitions") |
| getNumPartitionsRDD(x) |
| }) |
| |
| #' Collect elements of an RDD |
| #' |
| #' @description |
| #' \code{collect} returns a list that contains all of the elements in this RDD. |
| #' |
| #' @param x The RDD to collect |
| #' @param ... Other optional arguments to collect |
| #' @param flatten FALSE if the list should not flattened |
| #' @return a list containing elements in the RDD |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10, 2L) |
| #' collectRDD(rdd) # list from 1 to 10 |
| #' collectPartition(rdd, 0L) # list from 1 to 5 |
| #'} |
| #' @rdname collect-methods |
| #' @aliases collect,RDD-method |
| #' @noRd |
| setMethod("collectRDD", |
| signature(x = "RDD"), |
| function(x, flatten = TRUE) { |
| # Assumes a pairwise RDD is backed by a JavaPairRDD. |
| collected <- callJMethod(getJRDD(x), "collect") |
| convertJListToRList(collected, flatten, |
| serializedMode = getSerializedMode(x)) |
| }) |
| |
| |
| #' @description |
| #' \code{collectPartition} returns a list that contains all of the elements |
| #' in the specified partition of the RDD. |
| #' @param partitionId the partition to collect (starts from 0) |
| #' @rdname collect-methods |
| #' @aliases collectPartition,integer,RDD-method |
| #' @noRd |
| setMethod("collectPartition", |
| signature(x = "RDD", partitionId = "integer"), |
| function(x, partitionId) { |
| jPartitionsList <- callJMethod(getJRDD(x), |
| "collectPartitions", |
| as.list(as.integer(partitionId))) |
| |
| jList <- jPartitionsList[[1]] |
| convertJListToRList(jList, flatten = TRUE, |
| serializedMode = getSerializedMode(x)) |
| }) |
| |
| #' @description |
| #' \code{collectAsMap} returns a named list as a map that contains all of the elements |
| #' in a key-value pair RDD. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L) |
| #' collectAsMap(rdd) # list(`1` = 2, `3` = 4) |
| #'} |
| # nolint end |
| #' @rdname collect-methods |
| #' @aliases collectAsMap,RDD-method |
| #' @noRd |
| setMethod("collectAsMap", |
| signature(x = "RDD"), |
| function(x) { |
| pairList <- collectRDD(x) |
| map <- new.env() |
| lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) }) |
| as.list(map) |
| }) |
| |
| #' Return the number of elements in the RDD. |
| #' |
| #' @param x The RDD to count |
| #' @return number of elements in the RDD. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' countRDD(rdd) # 10 |
| #' length(rdd) # Same as count |
| #'} |
| #' @rdname count |
| #' @aliases count,RDD-method |
| #' @noRd |
| setMethod("countRDD", |
| signature(x = "RDD"), |
| function(x) { |
| countPartition <- function(part) { |
| as.integer(length(part)) |
| } |
| valsRDD <- lapplyPartition(x, countPartition) |
| vals <- collectRDD(valsRDD) |
| sum(as.integer(vals)) |
| }) |
| |
| #' Return the number of elements in the RDD |
| #' @rdname count |
| #' @noRd |
| setMethod("lengthRDD", |
| signature(x = "RDD"), |
| function(x) { |
| countRDD(x) |
| }) |
| |
| #' Return the count of each unique value in this RDD as a list of |
| #' (value, count) pairs. |
| #' |
| #' Same as countByValue in Spark. |
| #' |
| #' @param x The RDD to count |
| #' @return list of (value, count) pairs, where count is number of each unique |
| #' value in rdd. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, c(1,2,3,2,1)) |
| #' countByValue(rdd) # (1,2L), (2,2L), (3,1L) |
| #'} |
| # nolint end |
| #' @rdname countByValue |
| #' @aliases countByValue,RDD-method |
| #' @noRd |
| setMethod("countByValue", |
| signature(x = "RDD"), |
| function(x) { |
| ones <- lapply(x, function(item) { list(item, 1L) }) |
| collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x))) |
| }) |
| |
| #' Apply a function to all elements |
| #' |
| #' This function creates a new RDD by applying the given transformation to all |
| #' elements of the given RDD |
| #' |
| #' @param X The RDD to apply the transformation. |
| #' @param FUN the transformation to apply on each element |
| #' @return a new RDD created by the transformation. |
| #' @rdname lapply |
| #' @noRd |
| #' @aliases lapply |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' multiplyByTwo <- lapply(rdd, function(x) { x * 2 }) |
| #' collectRDD(multiplyByTwo) # 2,4,6... |
| #'} |
| setMethod("lapply", |
| signature(X = "RDD", FUN = "function"), |
| function(X, FUN) { |
| func <- function(partIndex, part) { |
| lapply(part, FUN) |
| } |
| lapplyPartitionsWithIndex(X, func) |
| }) |
| |
| #' @rdname lapply |
| #' @aliases map,RDD,function-method |
| #' @noRd |
| setMethod("map", |
| signature(X = "RDD", FUN = "function"), |
| function(X, FUN) { |
| lapply(X, FUN) |
| }) |
| |
| #' Flatten results after applying a function to all elements |
| #' |
| #' This function returns a new RDD by first applying a function to all |
| #' elements of this RDD, and then flattening the results. |
| #' |
| #' @param X The RDD to apply the transformation. |
| #' @param FUN the transformation to apply on each element |
| #' @return a new RDD created by the transformation. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) }) |
| #' collectRDD(multiplyByTwo) # 2,20,4,40,6,60... |
| #'} |
| #' @rdname flatMap |
| #' @aliases flatMap,RDD,function-method |
| #' @noRd |
| setMethod("flatMap", |
| signature(X = "RDD", FUN = "function"), |
| function(X, FUN) { |
| partitionFunc <- function(part) { |
| unlist( |
| lapply(part, FUN), |
| recursive = F |
| ) |
| } |
| lapplyPartition(X, partitionFunc) |
| }) |
| |
| #' Apply a function to each partition of an RDD |
| #' |
| #' Return a new RDD by applying a function to each partition of this RDD. |
| #' |
| #' @param X The RDD to apply the transformation. |
| #' @param FUN the transformation to apply on each partition. |
| #' @return a new RDD created by the transformation. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) }) |
| #' collectRDD(partitionSum) # 15, 40 |
| #'} |
| #' @rdname lapplyPartition |
| #' @aliases lapplyPartition,RDD,function-method |
| #' @noRd |
| setMethod("lapplyPartition", |
| signature(X = "RDD", FUN = "function"), |
| function(X, FUN) { |
| lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) }) |
| }) |
| |
| #' mapPartitions is the same as lapplyPartition. |
| #' |
| #' @rdname lapplyPartition |
| #' @aliases mapPartitions,RDD,function-method |
| #' @noRd |
| setMethod("mapPartitions", |
| signature(X = "RDD", FUN = "function"), |
| function(X, FUN) { |
| lapplyPartition(X, FUN) |
| }) |
| |
| #' Return a new RDD by applying a function to each partition of this RDD, while |
| #' tracking the index of the original partition. |
| #' |
| #' @param X The RDD to apply the transformation. |
| #' @param FUN the transformation to apply on each partition; takes the partition |
| #' index and a list of elements in the particular partition. |
| #' @return a new RDD created by the transformation. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10, 5L) |
| #' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) { |
| #' partIndex * Reduce("+", part) }) |
| #' collectRDD(prod, flatten = FALSE) # 0, 7, 22, 45, 76 |
| #'} |
| #' @rdname lapplyPartitionsWithIndex |
| #' @aliases lapplyPartitionsWithIndex,RDD,function-method |
| #' @noRd |
| setMethod("lapplyPartitionsWithIndex", |
| signature(X = "RDD", FUN = "function"), |
| function(X, FUN) { |
| PipelinedRDD(X, FUN) |
| }) |
| |
| #' @rdname lapplyPartitionsWithIndex |
| #' @aliases mapPartitionsWithIndex,RDD,function-method |
| #' @noRd |
| setMethod("mapPartitionsWithIndex", |
| signature(X = "RDD", FUN = "function"), |
| function(X, FUN) { |
| lapplyPartitionsWithIndex(X, FUN) |
| }) |
| |
| #' This function returns a new RDD containing only the elements that satisfy |
| #' a predicate (i.e. returning TRUE in a given logical function). |
| #' The same as `filter()' in Spark. |
| #' |
| #' @param x The RDD to be filtered. |
| #' @param f A unary predicate function. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' unlist(collectRDD(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2) |
| #'} |
| # nolint end |
| #' @rdname filterRDD |
| #' @aliases filterRDD,RDD,function-method |
| #' @noRd |
| setMethod("filterRDD", |
| signature(x = "RDD", f = "function"), |
| function(x, f) { |
| filter.func <- function(part) { |
| Filter(f, part) |
| } |
| lapplyPartition(x, filter.func) |
| }) |
| |
| #' @rdname filterRDD |
| #' @aliases Filter |
| #' @noRd |
| setMethod("Filter", |
| signature(f = "function", x = "RDD"), |
| function(f, x) { |
| filterRDD(x, f) |
| }) |
| |
| #' Reduce across elements of an RDD. |
| #' |
| #' This function reduces the elements of this RDD using the |
| #' specified commutative and associative binary operator. |
| #' |
| #' @param x The RDD to reduce |
| #' @param func Commutative and associative function to apply on elements |
| #' of the RDD. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' reduce(rdd, "+") # 55 |
| #'} |
| #' @rdname reduce |
| #' @aliases reduce,RDD,ANY-method |
| #' @noRd |
| setMethod("reduce", |
| signature(x = "RDD", func = "ANY"), |
| function(x, func) { |
| |
| reducePartition <- function(part) { |
| Reduce(func, part) |
| } |
| |
| partitionList <- collectRDD(lapplyPartition(x, reducePartition), |
| flatten = FALSE) |
| Reduce(func, partitionList) |
| }) |
| |
| #' Get the maximum element of an RDD. |
| #' |
| #' @param x The RDD to get the maximum element from |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' maximum(rdd) # 10 |
| #'} |
| #' @rdname maximum |
| #' @aliases maximum,RDD |
| #' @noRd |
| setMethod("maximum", |
| signature(x = "RDD"), |
| function(x) { |
| reduce(x, max) |
| }) |
| |
| #' Get the minimum element of an RDD. |
| #' |
| #' @param x The RDD to get the minimum element from |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' minimum(rdd) # 1 |
| #'} |
| #' @rdname minimum |
| #' @aliases minimum,RDD |
| #' @noRd |
| setMethod("minimum", |
| signature(x = "RDD"), |
| function(x) { |
| reduce(x, min) |
| }) |
| |
| #' Add up the elements in an RDD. |
| #' |
| #' @param x The RDD to add up the elements in |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' sumRDD(rdd) # 55 |
| #'} |
| #' @rdname sumRDD |
| #' @aliases sumRDD,RDD |
| #' @noRd |
| setMethod("sumRDD", |
| signature(x = "RDD"), |
| function(x) { |
| reduce(x, "+") |
| }) |
| |
| #' Applies a function to all elements in an RDD, and forces evaluation. |
| #' |
| #' @param x The RDD to apply the function |
| #' @param func The function to be applied. |
| #' @return invisible NULL. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' foreach(rdd, function(x) { save(x, file=...) }) |
| #'} |
| #' @rdname foreach |
| #' @aliases foreach,RDD,function-method |
| #' @noRd |
| setMethod("foreach", |
| signature(x = "RDD", func = "function"), |
| function(x, func) { |
| partition.func <- function(x) { |
| lapply(x, func) |
| NULL |
| } |
| invisible(collectRDD(mapPartitions(x, partition.func))) |
| }) |
| |
| #' Applies a function to each partition in an RDD, and forces evaluation. |
| #' |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' foreachPartition(rdd, function(part) { save(part, file=...); NULL }) |
| #'} |
| #' @rdname foreach |
| #' @aliases foreachPartition,RDD,function-method |
| #' @noRd |
| setMethod("foreachPartition", |
| signature(x = "RDD", func = "function"), |
| function(x, func) { |
| invisible(collectRDD(mapPartitions(x, func))) |
| }) |
| |
| #' Take elements from an RDD. |
| #' |
| #' This function takes the first NUM elements in the RDD and |
| #' returns them in a list. |
| #' |
| #' @param x The RDD to take elements from |
| #' @param num Number of elements to take |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' takeRDD(rdd, 2L) # list(1, 2) |
| #'} |
| # nolint end |
| #' @rdname take |
| #' @aliases take,RDD,numeric-method |
| #' @noRd |
| setMethod("takeRDD", |
| signature(x = "RDD", num = "numeric"), |
| function(x, num) { |
| resList <- list() |
| index <- -1 |
| jrdd <- getJRDD(x) |
| numPartitions <- getNumPartitionsRDD(x) |
| serializedModeRDD <- getSerializedMode(x) |
| |
| # TODO(shivaram): Collect more than one partition based on size |
| # estimates similar to the scala version of `take`. |
| while (TRUE) { |
| index <- index + 1 |
| |
| if (length(resList) >= num || index >= numPartitions) |
| break |
| |
| # a JList of byte arrays |
| partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index))) |
| partition <- partitionArr[[1]] |
| |
| size <- num - length(resList) |
| # elems is capped to have at most `size` elements |
| elems <- convertJListToRList(partition, |
| flatten = TRUE, |
| logicalUpperBound = size, |
| serializedMode = serializedModeRDD) |
| |
| resList <- append(resList, elems) |
| } |
| resList |
| }) |
| |
| |
| #' First |
| #' |
| #' Return the first element of an RDD |
| #' |
| #' @rdname first |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' firstRDD(rdd) |
| #' } |
| #' @noRd |
| setMethod("firstRDD", |
| signature(x = "RDD"), |
| function(x) { |
| takeRDD(x, 1)[[1]] |
| }) |
| |
| #' Removes the duplicates from RDD. |
| #' |
| #' This function returns a new RDD containing the distinct elements in the |
| #' given RDD. The same as `distinct()' in Spark. |
| #' |
| #' @param x The RDD to remove duplicates from. |
| #' @param numPartitions Number of partitions to create. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, c(1,2,2,3,3,3)) |
| #' sort(unlist(collectRDD(distinctRDD(rdd)))) # c(1, 2, 3) |
| #'} |
| # nolint end |
| #' @rdname distinct |
| #' @aliases distinct,RDD-method |
| #' @noRd |
| setMethod("distinctRDD", |
| signature(x = "RDD"), |
| function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) { |
| identical.mapped <- lapply(x, function(x) { list(x, NULL) }) |
| reduced <- reduceByKey(identical.mapped, |
| function(x, y) { x }, |
| numPartitions) |
| resRDD <- lapply(reduced, function(x) { x[[1]] }) |
| resRDD |
| }) |
| |
| #' Return an RDD that is a sampled subset of the given RDD. |
| #' |
| #' The same as `sample()' in Spark. (We rename it due to signature |
| #' inconsistencies with the `sample()' function in R's base package.) |
| #' |
| #' @param x The RDD to sample elements from |
| #' @param withReplacement Sampling with replacement or not |
| #' @param fraction The (rough) sample target fraction |
| #' @param seed Randomness seed value |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' collectRDD(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements |
| #' collectRDD(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates |
| #'} |
| #' @rdname sampleRDD |
| #' @aliases sampleRDD,RDD |
| #' @noRd |
| setMethod("sampleRDD", |
| signature(x = "RDD", withReplacement = "logical", |
| fraction = "numeric", seed = "integer"), |
| function(x, withReplacement, fraction, seed) { |
| |
| # The sampler: takes a partition and returns its sampled version. |
| samplingFunc <- function(partIndex, part) { |
| set.seed(seed) |
| res <- vector("list", length(part)) |
| len <- 0 |
| |
| # Discards some random values to ensure each partition has a |
| # different random seed. |
| stats::runif(partIndex) |
| |
| for (elem in part) { |
| if (withReplacement) { |
| count <- stats::rpois(1, fraction) |
| if (count > 0) { |
| res[(len + 1) : (len + count)] <- rep(list(elem), count) |
| len <- len + count |
| } |
| } else { |
| if (stats::runif(1) < fraction) { |
| len <- len + 1 |
| res[[len]] <- elem |
| } |
| } |
| } |
| |
| # TODO(zongheng): look into the performance of the current |
| # implementation. Look into some iterator package? Note that |
| # Scala avoids many calls to creating an empty list and PySpark |
| # similarly achieves this using `yield'. |
| if (len > 0) |
| res[1:len] |
| else |
| list() |
| } |
| |
| lapplyPartitionsWithIndex(x, samplingFunc) |
| }) |
| |
| #' Return a list of the elements that are a sampled subset of the given RDD. |
| #' |
| #' @param x The RDD to sample elements from |
| #' @param withReplacement Sampling with replacement or not |
| #' @param num Number of elements to return |
| #' @param seed Randomness seed value |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:100) |
| #' # exactly 5 elements sampled, which may not be distinct |
| #' takeSample(rdd, TRUE, 5L, 1618L) |
| #' # exactly 5 distinct elements sampled |
| #' takeSample(rdd, FALSE, 5L, 16181618L) |
| #'} |
| #' @rdname takeSample |
| #' @aliases takeSample,RDD |
| #' @noRd |
| setMethod("takeSample", signature(x = "RDD", withReplacement = "logical", |
| num = "integer", seed = "integer"), |
| function(x, withReplacement, num, seed) { |
| # This function is ported from RDD.scala. |
| fraction <- 0.0 |
| total <- 0 |
| multiplier <- 3.0 |
| initialCount <- countRDD(x) |
| maxSelected <- 0 |
| MAXINT <- .Machine$integer.max |
| |
| if (num < 0) |
| stop("Negative number of elements requested") |
| |
| if (initialCount > MAXINT - 1) { |
| maxSelected <- MAXINT - 1 |
| } else { |
| maxSelected <- initialCount |
| } |
| |
| if (num > initialCount && !withReplacement) { |
| total <- maxSelected |
| fraction <- multiplier * (maxSelected + 1) / initialCount |
| } else { |
| total <- num |
| fraction <- multiplier * (num + 1) / initialCount |
| } |
| |
| set.seed(seed) |
| samples <- collectRDD(sampleRDD(x, withReplacement, fraction, |
| as.integer(ceiling(stats::runif(1, |
| -MAXINT, |
| MAXINT))))) |
| # If the first sample didn't turn out large enough, keep trying to |
| # take samples; this shouldn't happen often because we use a big |
| # multiplier for the initial size |
| while (length(samples) < total) |
| samples <- collectRDD(sampleRDD(x, withReplacement, fraction, |
| as.integer(ceiling(stats::runif(1, |
| -MAXINT, |
| MAXINT))))) |
| |
| # TODO(zongheng): investigate if this call is an in-place shuffle? |
| base::sample(samples)[1:total] |
| }) |
| |
| #' Creates tuples of the elements in this RDD by applying a function. |
| #' |
| #' @param x The RDD. |
| #' @param func The function to be applied. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(1, 2, 3)) |
| #' collectRDD(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3)) |
| #'} |
| # nolint end |
| #' @rdname keyBy |
| #' @aliases keyBy,RDD |
| #' @noRd |
| setMethod("keyBy", |
| signature(x = "RDD", func = "function"), |
| function(x, func) { |
| apply.func <- function(x) { |
| list(func(x), x) |
| } |
| lapply(x, apply.func) |
| }) |
| |
| #' Return a new RDD that has exactly numPartitions partitions. |
| #' Can increase or decrease the level of parallelism in this RDD. Internally, |
| #' this uses a shuffle to redistribute data. |
| #' If you are decreasing the number of partitions in this RDD, consider using |
| #' coalesce, which can avoid performing a shuffle. |
| #' |
| #' @param x The RDD. |
| #' @param numPartitions Number of partitions to create. |
| #' @seealso coalesce |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L) |
| #' getNumPartitions(rdd) # 4 |
| #' getNumPartitions(repartitionRDD(rdd, 2L)) # 2 |
| #'} |
| #' @rdname repartition |
| #' @aliases repartition,RDD |
| #' @noRd |
| setMethod("repartitionRDD", |
| signature(x = "RDD"), |
| function(x, numPartitions) { |
| if (!is.null(numPartitions) && is.numeric(numPartitions)) { |
| coalesceRDD(x, numPartitions, TRUE) |
| } else { |
| stop("Please, specify the number of partitions") |
| } |
| }) |
| |
| #' Return a new RDD that is reduced into numPartitions partitions. |
| #' |
| #' @param x The RDD. |
| #' @param numPartitions Number of partitions to create. |
| #' @seealso repartition |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L) |
| #' getNumPartitions(rdd) # 3 |
| #' getNumPartitions(coalesce(rdd, 1L)) # 1 |
| #'} |
| #' @rdname coalesce |
| #' @aliases coalesce,RDD |
| #' @noRd |
| setMethod("coalesceRDD", |
| signature(x = "RDD", numPartitions = "numeric"), |
| function(x, numPartitions, shuffle = FALSE) { |
| numPartitions <- numToInt(numPartitions) |
| if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) { |
| func <- function(partIndex, part) { |
| set.seed(partIndex) # partIndex as seed |
| start <- as.integer(base::sample(numPartitions, 1) - 1) |
| lapply(seq_along(part), |
| function(i) { |
| pos <- (start + i) %% numPartitions |
| list(pos, part[[i]]) |
| }) |
| } |
| shuffled <- lapplyPartitionsWithIndex(x, func) |
| repartitioned <- partitionByRDD(shuffled, numPartitions) |
| values(repartitioned) |
| } else { |
| jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle) |
| RDD(jrdd) |
| } |
| }) |
| |
| #' Save this RDD as a SequenceFile of serialized objects. |
| #' |
| #' @param x The RDD to save |
| #' @param path The directory where the file is saved |
| #' @seealso objectFile |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:3) |
| #' saveAsObjectFile(rdd, "/tmp/sparkR-tmp") |
| #'} |
| #' @rdname saveAsObjectFile |
| #' @aliases saveAsObjectFile,RDD |
| #' @noRd |
| setMethod("saveAsObjectFile", |
| signature(x = "RDD", path = "character"), |
| function(x, path) { |
| # If serializedMode == "string" we need to serialize the data before saving it since |
| # objectFile() assumes serializedMode == "byte". |
| if (getSerializedMode(x) != "byte") { |
| x <- serializeToBytes(x) |
| } |
| # Return nothing |
| invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path)) |
| }) |
| |
| #' Save this RDD as a text file, using string representations of elements. |
| #' |
| #' @param x The RDD to save |
| #' @param path The directory where the partitions of the text file are saved |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:3) |
| #' saveAsTextFile(rdd, "/tmp/sparkR-tmp") |
| #'} |
| #' @rdname saveAsTextFile |
| #' @aliases saveAsTextFile,RDD |
| #' @noRd |
| setMethod("saveAsTextFile", |
| signature(x = "RDD", path = "character"), |
| function(x, path) { |
| func <- function(str) { |
| toString(str) |
| } |
| stringRdd <- lapply(x, func) |
| # Return nothing |
| invisible( |
| callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path)) |
| }) |
| |
| #' Sort an RDD by the given key function. |
| #' |
| #' @param x An RDD to be sorted. |
| #' @param func A function used to compute the sort key for each element. |
| #' @param ascending A flag to indicate whether the sorting is ascending or descending. |
| #' @param numPartitions Number of partitions to create. |
| #' @return An RDD where all elements are sorted. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(3, 2, 1)) |
| #' collectRDD(sortBy(rdd, function(x) { x })) # list (1, 2, 3) |
| #'} |
| # nolint end |
| #' @rdname sortBy |
| #' @aliases sortBy,RDD,RDD-method |
| #' @noRd |
| setMethod("sortBy", |
| signature(x = "RDD", func = "function"), |
| function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) { |
| values(sortByKey(keyBy(x, func), ascending, numPartitions)) |
| }) |
| |
| # Helper function to get first N elements from an RDD in the specified order. |
| # Param: |
| # x An RDD. |
| # num Number of elements to return. |
| # ascending A flag to indicate whether the sorting is ascending or descending. |
| # Return: |
| # A list of the first N elements from the RDD in the specified order. |
| # |
| takeOrderedElem <- function(x, num, ascending = TRUE) { |
| if (num <= 0L) { |
| return(list()) |
| } |
| |
| partitionFunc <- function(part) { |
| if (num < length(part)) { |
| # R limitation: order works only on primitive types! |
| ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending) |
| part[ord[1:num]] |
| } else { |
| part |
| } |
| } |
| |
| newRdd <- mapPartitions(x, partitionFunc) |
| |
| resList <- list() |
| index <- -1 |
| jrdd <- getJRDD(newRdd) |
| numPartitions <- getNumPartitionsRDD(newRdd) |
| serializedModeRDD <- getSerializedMode(newRdd) |
| |
| while (TRUE) { |
| index <- index + 1 |
| |
| if (index >= numPartitions) { |
| ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending) |
| resList <- resList[ord[1:num]] |
| break |
| } |
| |
| # a JList of byte arrays |
| partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index))) |
| partition <- partitionArr[[1]] |
| |
| # elems is capped to have at most `num` elements |
| elems <- convertJListToRList(partition, |
| flatten = TRUE, |
| logicalUpperBound = num, |
| serializedMode = serializedModeRDD) |
| |
| resList <- append(resList, elems) |
| } |
| resList |
| } |
| |
| #' Returns the first N elements from an RDD in ascending order. |
| #' |
| #' @param x An RDD. |
| #' @param num Number of elements to return. |
| #' @return The first N elements from the RDD in ascending order. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) |
| #' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6) |
| #'} |
| # nolint end |
| #' @rdname takeOrdered |
| #' @aliases takeOrdered,RDD,RDD-method |
| #' @noRd |
| setMethod("takeOrdered", |
| signature(x = "RDD", num = "integer"), |
| function(x, num) { |
| takeOrderedElem(x, num) |
| }) |
| |
| #' Returns the top N elements from an RDD. |
| #' |
| #' @param x An RDD. |
| #' @param num Number of elements to return. |
| #' @return The top N elements from the RDD. |
| #' @rdname top |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) |
| #' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4) |
| #'} |
| # nolint end |
| #' @aliases top,RDD,RDD-method |
| #' @noRd |
| setMethod("top", |
| signature(x = "RDD", num = "integer"), |
| function(x, num) { |
| takeOrderedElem(x, num, FALSE) |
| }) |
| |
| #' Fold an RDD using a given associative function and a neutral "zero value". |
| #' |
| #' Aggregate the elements of each partition, and then the results for all the |
| #' partitions, using a given associative function and a neutral "zero value". |
| #' |
| #' @param x An RDD. |
| #' @param zeroValue A neutral "zero value". |
| #' @param op An associative function for the folding operation. |
| #' @return The folding result. |
| #' @rdname fold |
| #' @seealso reduce |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5)) |
| #' fold(rdd, 0, "+") # 15 |
| #'} |
| #' @aliases fold,RDD,RDD-method |
| #' @noRd |
| setMethod("fold", |
| signature(x = "RDD", zeroValue = "ANY", op = "ANY"), |
| function(x, zeroValue, op) { |
| aggregateRDD(x, zeroValue, op, op) |
| }) |
| |
| #' Aggregate an RDD using the given combine functions and a neutral "zero value". |
| #' |
| #' Aggregate the elements of each partition, and then the results for all the |
| #' partitions, using given combine functions and a neutral "zero value". |
| #' |
| #' @param x An RDD. |
| #' @param zeroValue A neutral "zero value". |
| #' @param seqOp A function to aggregate the RDD elements. It may return a different |
| #' result type from the type of the RDD elements. |
| #' @param combOp A function to aggregate results of seqOp. |
| #' @return The aggregation result. |
| #' @rdname aggregateRDD |
| #' @seealso reduce |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(1, 2, 3, 4)) |
| #' zeroValue <- list(0, 0) |
| #' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } |
| #' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } |
| #' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4) |
| #'} |
| # nolint end |
| #' @aliases aggregateRDD,RDD,RDD-method |
| #' @noRd |
| setMethod("aggregateRDD", |
| signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"), |
| function(x, zeroValue, seqOp, combOp) { |
| partitionFunc <- function(part) { |
| Reduce(seqOp, part, zeroValue) |
| } |
| |
| partitionList <- collectRDD(lapplyPartition(x, partitionFunc), |
| flatten = FALSE) |
| Reduce(combOp, partitionList, zeroValue) |
| }) |
| |
| #' Pipes elements to a forked external process. |
| #' |
| #' The same as 'pipe()' in Spark. |
| #' |
| #' @param x The RDD whose elements are piped to the forked external process. |
| #' @param command The command to fork an external process. |
| #' @param env A named list to set environment variables of the external process. |
| #' @return A new RDD created by piping all elements to a forked external process. |
| #' @rdname pipeRDD |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:10) |
| #' pipeRDD(rdd, "more") |
| #' Output: c("1", "2", ..., "10") |
| #'} |
| #' @aliases pipeRDD,RDD,character-method |
| #' @noRd |
| setMethod("pipeRDD", |
| signature(x = "RDD", command = "character"), |
| function(x, command, env = list()) { |
| func <- function(part) { |
| trim_trailing_func <- function(x) { |
| sub("[\r\n]*$", "", toString(x)) |
| } |
| input <- unlist(lapply(part, trim_trailing_func)) |
| res <- system2(command, stdout = TRUE, input = input, env = env) |
| lapply(res, trim_trailing_func) |
| } |
| lapplyPartition(x, func) |
| }) |
| |
| #' TODO: Consider caching the name in the RDD's environment |
| #' Return an RDD's name. |
| #' |
| #' @param x The RDD whose name is returned. |
| #' @rdname name |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(1,2,3)) |
| #' name(rdd) # NULL (if not set before) |
| #'} |
| #' @aliases name,RDD |
| #' @noRd |
| setMethod("name", |
| signature(x = "RDD"), |
| function(x) { |
| callJMethod(getJRDD(x), "name") |
| }) |
| |
| #' Set an RDD's name. |
| #' |
| #' @param x The RDD whose name is to be set. |
| #' @param name The RDD name to be set. |
| #' @return a new RDD renamed. |
| #' @rdname setName |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list(1,2,3)) |
| #' setName(rdd, "myRDD") |
| #' name(rdd) # "myRDD" |
| #'} |
| #' @aliases setName,RDD |
| #' @noRd |
| setMethod("setName", |
| signature(x = "RDD", name = "character"), |
| function(x, name) { |
| callJMethod(getJRDD(x), "setName", name) |
| x |
| }) |
| |
| #' Zip an RDD with generated unique Long IDs. |
| #' |
| #' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where |
| #' n is the number of partitions. So there may exist gaps, but this |
| #' method won't trigger a spark job, which is different from |
| #' zipWithIndex. |
| #' |
| #' @param x An RDD to be zipped. |
| #' @return An RDD with zipped items. |
| #' @seealso zipWithIndex |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) |
| #' collectRDD(zipWithUniqueId(rdd)) |
| #' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2)) |
| #'} |
| # nolint end |
| #' @rdname zipWithUniqueId |
| #' @aliases zipWithUniqueId,RDD |
| #' @noRd |
| setMethod("zipWithUniqueId", |
| signature(x = "RDD"), |
| function(x) { |
| n <- getNumPartitionsRDD(x) |
| |
| partitionFunc <- function(partIndex, part) { |
| mapply( |
| function(item, index) { |
| list(item, (index - 1) * n + partIndex) |
| }, |
| part, |
| seq_along(part), |
| SIMPLIFY = FALSE) |
| } |
| |
| lapplyPartitionsWithIndex(x, partitionFunc) |
| }) |
| |
| #' Zip an RDD with its element indices. |
| #' |
| #' The ordering is first based on the partition index and then the |
| #' ordering of items within each partition. So the first item in |
| #' the first partition gets index 0, and the last item in the last |
| #' partition receives the largest index. |
| #' |
| #' This method needs to trigger a Spark job when this RDD contains |
| #' more than one partition. |
| #' |
| #' @param x An RDD to be zipped. |
| #' @return An RDD with zipped items. |
| #' @seealso zipWithUniqueId |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) |
| #' collectRDD(zipWithIndex(rdd)) |
| #' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4)) |
| #'} |
| # nolint end |
| #' @rdname zipWithIndex |
| #' @aliases zipWithIndex,RDD |
| #' @noRd |
| setMethod("zipWithIndex", |
| signature(x = "RDD"), |
| function(x) { |
| n <- getNumPartitionsRDD(x) |
| if (n > 1) { |
| nums <- collectRDD(lapplyPartition(x, |
| function(part) { |
| list(length(part)) |
| })) |
| startIndices <- Reduce("+", nums, accumulate = TRUE) |
| } |
| |
| partitionFunc <- function(partIndex, part) { |
| if (partIndex == 0) { |
| startIndex <- 0 |
| } else { |
| startIndex <- startIndices[[partIndex]] |
| } |
| |
| mapply( |
| function(item, index) { |
| list(item, index - 1 + startIndex) |
| }, |
| part, |
| seq_along(part), |
| SIMPLIFY = FALSE) |
| } |
| |
| lapplyPartitionsWithIndex(x, partitionFunc) |
| }) |
| |
| #' Coalesce all elements within each partition of an RDD into a list. |
| #' |
| #' @param x An RDD. |
| #' @return An RDD created by coalescing all elements within |
| #' each partition into a list. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, as.list(1:4), 2L) |
| #' collectRDD(glom(rdd)) |
| #' # list(list(1, 2), list(3, 4)) |
| #'} |
| # nolint end |
| #' @rdname glom |
| #' @aliases glom,RDD |
| #' @noRd |
| setMethod("glom", |
| signature(x = "RDD"), |
| function(x) { |
| partitionFunc <- function(part) { |
| list(part) |
| } |
| |
| lapplyPartition(x, partitionFunc) |
| }) |
| |
| ############ Binary Functions ############# |
| |
| #' Return the union RDD of two RDDs. |
| #' The same as union() in Spark. |
| #' |
| #' @param x An RDD. |
| #' @param y An RDD. |
| #' @return a new RDD created by performing the simple union (without removing |
| #' duplicates) of two input RDDs. |
| #' @examples |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:3) |
| #' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3 |
| #'} |
| #' @rdname unionRDD |
| #' @aliases unionRDD,RDD,RDD-method |
| #' @noRd |
| setMethod("unionRDD", |
| signature(x = "RDD", y = "RDD"), |
| function(x, y) { |
| if (getSerializedMode(x) == getSerializedMode(y)) { |
| jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y)) |
| union.rdd <- RDD(jrdd, getSerializedMode(x)) |
| } else { |
| # One of the RDDs is not serialized, we need to serialize it first. |
| if (getSerializedMode(x) != "byte") x <- serializeToBytes(x) |
| if (getSerializedMode(y) != "byte") y <- serializeToBytes(y) |
| jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y)) |
| union.rdd <- RDD(jrdd, "byte") |
| } |
| union.rdd |
| }) |
| |
| #' Zip an RDD with another RDD. |
| #' |
| #' Zips this RDD with another one, returning key-value pairs with the |
| #' first element in each RDD second element in each RDD, etc. Assumes |
| #' that the two RDDs have the same number of partitions and the same |
| #' number of elements in each partition (e.g. one was made through |
| #' a map on the other). |
| #' |
| #' @param x An RDD to be zipped. |
| #' @param other Another RDD to be zipped. |
| #' @return An RDD zipped from the two RDDs. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd1 <- parallelize(sc, 0:4) |
| #' rdd2 <- parallelize(sc, 1000:1004) |
| #' collectRDD(zipRDD(rdd1, rdd2)) |
| #' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)) |
| #'} |
| # nolint end |
| #' @rdname zipRDD |
| #' @aliases zipRDD,RDD |
| #' @noRd |
| setMethod("zipRDD", |
| signature(x = "RDD", other = "RDD"), |
| function(x, other) { |
| n1 <- getNumPartitionsRDD(x) |
| n2 <- getNumPartitionsRDD(other) |
| if (n1 != n2) { |
| stop("Can only zip RDDs which have the same number of partitions.") |
| } |
| |
| rdds <- appendPartitionLengths(x, other) |
| jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]])) |
| # The jrdd's elements are of scala Tuple2 type. The serialized |
| # flag here is used for the elements inside the tuples. |
| rdd <- RDD(jrdd, getSerializedMode(rdds[[1]])) |
| |
| mergePartitions(rdd, TRUE) |
| }) |
| |
| #' Cartesian product of this RDD and another one. |
| #' |
| #' Return the Cartesian product of this RDD and another one, |
| #' that is, the RDD of all pairs of elements (a, b) where a |
| #' is in this and b is in other. |
| #' |
| #' @param x An RDD. |
| #' @param other An RDD. |
| #' @return A new RDD which is the Cartesian product of these two RDDs. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd <- parallelize(sc, 1:2) |
| #' sortByKey(cartesian(rdd, rdd)) |
| #' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2)) |
| #'} |
| # nolint end |
| #' @rdname cartesian |
| #' @aliases cartesian,RDD,RDD-method |
| #' @noRd |
| setMethod("cartesian", |
| signature(x = "RDD", other = "RDD"), |
| function(x, other) { |
| rdds <- appendPartitionLengths(x, other) |
| jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]])) |
| # The jrdd's elements are of scala Tuple2 type. The serialized |
| # flag here is used for the elements inside the tuples. |
| rdd <- RDD(jrdd, getSerializedMode(rdds[[1]])) |
| |
| mergePartitions(rdd, FALSE) |
| }) |
| |
| #' Subtract an RDD with another RDD. |
| #' |
| #' Return an RDD with the elements from this that are not in other. |
| #' |
| #' @param x An RDD. |
| #' @param other An RDD. |
| #' @param numPartitions Number of the partitions in the result RDD. |
| #' @return An RDD with the elements from this that are not in other. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4)) |
| #' rdd2 <- parallelize(sc, list(2, 4)) |
| #' collectRDD(subtract(rdd1, rdd2)) |
| #' # list(1, 1, 3) |
| #'} |
| # nolint end |
| #' @rdname subtract |
| #' @aliases subtract,RDD |
| #' @noRd |
| setMethod("subtract", |
| signature(x = "RDD", other = "RDD"), |
| function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) { |
| mapFunction <- function(e) { list(e, NA) } |
| rdd1 <- map(x, mapFunction) |
| rdd2 <- map(other, mapFunction) |
| keys(subtractByKey(rdd1, rdd2, numPartitions)) |
| }) |
| |
| #' Intersection of this RDD and another one. |
| #' |
| #' Return the intersection of this RDD and another one. |
| #' The output will not contain any duplicate elements, |
| #' even if the input RDDs did. Performs a hash partition |
| #' across the cluster. |
| #' Note that this method performs a shuffle internally. |
| #' |
| #' @param x An RDD. |
| #' @param other An RDD. |
| #' @param numPartitions The number of partitions in the result RDD. |
| #' @return An RDD which is the intersection of these two RDDs. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5)) |
| #' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8)) |
| #' collectRDD(sortBy(intersection(rdd1, rdd2), function(x) { x })) |
| #' # list(1, 2, 3) |
| #'} |
| # nolint end |
| #' @rdname intersection |
| #' @aliases intersection,RDD |
| #' @noRd |
| setMethod("intersection", |
| signature(x = "RDD", other = "RDD"), |
| function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) { |
| rdd1 <- map(x, function(v) { list(v, NA) }) |
| rdd2 <- map(other, function(v) { list(v, NA) }) |
| |
| filterFunction <- function(elem) { |
| iters <- elem[[2]] |
| all(as.vector( |
| lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical")) |
| } |
| |
| keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction)) |
| }) |
| |
| #' Zips an RDD's partitions with one (or more) RDD(s). |
| #' Same as zipPartitions in Spark. |
| #' |
| #' @param ... RDDs to be zipped. |
| #' @param func A function to transform zipped partitions. |
| #' @return A new RDD by applying a function to the zipped partitions. |
| #' Assumes that all the RDDs have the *same number of partitions*, but |
| #' does *not* require them to have the same number of elements in each partition. |
| #' @examples |
| # nolint start |
| #'\dontrun{ |
| #' sc <- sparkR.init() |
| #' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2 |
| #' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4 |
| #' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6 |
| #' collectRDD(zipPartitions(rdd1, rdd2, rdd3, |
| #' func = function(x, y, z) { list(list(x, y, z))} )) |
| #' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))) |
| #'} |
| # nolint end |
| #' @rdname zipRDD |
| #' @aliases zipPartitions,RDD |
| #' @noRd |
| setMethod("zipPartitions", |
| "RDD", |
| function(..., func) { |
| rrdds <- list(...) |
| if (length(rrdds) == 1) { |
| return(rrdds[[1]]) |
| } |
| nPart <- sapply(rrdds, getNumPartitionsRDD) |
| if (length(unique(nPart)) != 1) { |
| stop("Can only zipPartitions RDDs which have the same number of partitions.") |
| } |
| |
| rrdds <- lapply(rrdds, function(rdd) { |
| mapPartitionsWithIndex(rdd, function(partIndex, part) { |
| print(length(part)) |
| list(list(partIndex, part)) |
| }) |
| }) |
| union.rdd <- Reduce(unionRDD, rrdds) |
| zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1])) |
| res <- mapPartitions(zipped.rdd, function(plist) { |
| do.call(func, plist[[1]]) |
| }) |
| res |
| }) |