| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| context("basic RDD functions") |
| |
| # JavaSparkContext handle |
| sc <- sparkR.init() |
| |
| # Data |
| nums <- 1:10 |
| rdd <- parallelize(sc, nums, 2L) |
| |
| intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200)) |
| intRdd <- parallelize(sc, intPairs, 2L) |
| |
| test_that("get number of partitions in RDD", { |
| expect_equal(getNumPartitions(rdd), 2) |
| expect_equal(getNumPartitions(intRdd), 2) |
| }) |
| |
| test_that("first on RDD", { |
| expect_equal(first(rdd), 1) |
| newrdd <- lapply(rdd, function(x) x + 1) |
| expect_equal(first(newrdd), 2) |
| }) |
| |
| test_that("count and length on RDD", { |
| expect_equal(count(rdd), 10) |
| expect_equal(length(rdd), 10) |
| }) |
| |
| test_that("count by values and keys", { |
| mods <- lapply(rdd, function(x) { x %% 3 }) |
| actual <- countByValue(mods) |
| expected <- list(list(0, 3L), list(1, 4L), list(2, 3L)) |
| expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) |
| |
| actual <- countByKey(intRdd) |
| expected <- list(list(2L, 2L), list(1L, 2L)) |
| expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) |
| }) |
| |
| test_that("lapply on RDD", { |
| multiples <- lapply(rdd, function(x) { 2 * x }) |
| actual <- collect(multiples) |
| expect_equal(actual, as.list(nums * 2)) |
| }) |
| |
| test_that("lapplyPartition on RDD", { |
| sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) }) |
| actual <- collect(sums) |
| expect_equal(actual, list(15, 40)) |
| }) |
| |
| test_that("mapPartitions on RDD", { |
| sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) }) |
| actual <- collect(sums) |
| expect_equal(actual, list(15, 40)) |
| }) |
| |
| test_that("flatMap() on RDDs", { |
| flat <- flatMap(intRdd, function(x) { list(x, x) }) |
| actual <- collect(flat) |
| expect_equal(actual, rep(intPairs, each = 2)) |
| }) |
| |
| test_that("filterRDD on RDD", { |
| filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 }) |
| actual <- collect(filtered.rdd) |
| expect_equal(actual, list(2, 4, 6, 8, 10)) |
| |
| filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd) |
| actual <- collect(filtered.rdd) |
| expect_equal(actual, list(list(1L, -1))) |
| |
| # Filter out all elements. |
| filtered.rdd <- filterRDD(rdd, function(x) { x > 10 }) |
| actual <- collect(filtered.rdd) |
| expect_equal(actual, list()) |
| }) |
| |
| test_that("lookup on RDD", { |
| vals <- lookup(intRdd, 1L) |
| expect_equal(vals, list(-1, 200)) |
| |
| vals <- lookup(intRdd, 3L) |
| expect_equal(vals, list()) |
| }) |
| |
| test_that("several transformations on RDD (a benchmark on PipelinedRDD)", { |
| rdd2 <- rdd |
| for (i in 1:12) |
| rdd2 <- lapplyPartitionsWithIndex( |
| rdd2, function(partIndex, part) { |
| part <- as.list(unlist(part) * partIndex + i) |
| }) |
| rdd2 <- lapply(rdd2, function(x) x + x) |
| actual <- collect(rdd2) |
| expected <- list(24, 24, 24, 24, 24, |
| 168, 170, 172, 174, 176) |
| expect_equal(actual, expected) |
| }) |
| |
| test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", { |
| # RDD |
| rdd2 <- rdd |
| # PipelinedRDD |
| rdd2 <- lapplyPartitionsWithIndex( |
| rdd2, |
| function(partIndex, part) { |
| part <- as.list(unlist(part) * partIndex) |
| }) |
| |
| cache(rdd2) |
| expect_true(rdd2@env$isCached) |
| rdd2 <- lapply(rdd2, function(x) x) |
| expect_false(rdd2@env$isCached) |
| |
| unpersist(rdd2) |
| expect_false(rdd2@env$isCached) |
| |
| persist(rdd2, "MEMORY_AND_DISK") |
| expect_true(rdd2@env$isCached) |
| rdd2 <- lapply(rdd2, function(x) x) |
| expect_false(rdd2@env$isCached) |
| |
| unpersist(rdd2) |
| expect_false(rdd2@env$isCached) |
| |
| tempDir <- tempfile(pattern = "checkpoint") |
| setCheckpointDir(sc, tempDir) |
| checkpoint(rdd2) |
| expect_true(rdd2@env$isCheckpointed) |
| |
| rdd2 <- lapply(rdd2, function(x) x) |
| expect_false(rdd2@env$isCached) |
| expect_false(rdd2@env$isCheckpointed) |
| |
| # make sure the data is collectable |
| collect(rdd2) |
| |
| unlink(tempDir) |
| }) |
| |
| test_that("reduce on RDD", { |
| sum <- reduce(rdd, "+") |
| expect_equal(sum, 55) |
| |
| # Also test with an inline function |
| sumInline <- reduce(rdd, function(x, y) { x + y }) |
| expect_equal(sumInline, 55) |
| }) |
| |
| test_that("lapply with dependency", { |
| fa <- 5 |
| multiples <- lapply(rdd, function(x) { fa * x }) |
| actual <- collect(multiples) |
| |
| expect_equal(actual, as.list(nums * 5)) |
| }) |
| |
| test_that("lapplyPartitionsWithIndex on RDDs", { |
| func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) } |
| actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE) |
| expect_equal(actual, list(list(0, 15), list(1, 40))) |
| |
| pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L) |
| partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 } |
| mkTup <- function(partIndex, part) { list(partIndex, part) } |
| actual <- collect(lapplyPartitionsWithIndex( |
| partitionBy(pairsRDD, 2L, partitionByParity), |
| mkTup), |
| FALSE) |
| expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))), |
| list(1, list(list(4, 8))))) |
| }) |
| |
| test_that("sampleRDD() on RDDs", { |
| expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums) |
| }) |
| |
| test_that("takeSample() on RDDs", { |
| # ported from RDDSuite.scala, modified seeds |
| data <- parallelize(sc, 1:100, 2L) |
| for (seed in 4:5) { |
| s <- takeSample(data, FALSE, 20L, seed) |
| expect_equal(length(s), 20L) |
| expect_equal(length(unique(s)), 20L) |
| for (elem in s) { |
| expect_true(elem >= 1 && elem <= 100) |
| } |
| } |
| for (seed in 4:5) { |
| s <- takeSample(data, FALSE, 200L, seed) |
| expect_equal(length(s), 100L) |
| expect_equal(length(unique(s)), 100L) |
| for (elem in s) { |
| expect_true(elem >= 1 && elem <= 100) |
| } |
| } |
| for (seed in 4:5) { |
| s <- takeSample(data, TRUE, 20L, seed) |
| expect_equal(length(s), 20L) |
| for (elem in s) { |
| expect_true(elem >= 1 && elem <= 100) |
| } |
| } |
| for (seed in 4:5) { |
| s <- takeSample(data, TRUE, 100L, seed) |
| expect_equal(length(s), 100L) |
| # Chance of getting all distinct elements is astronomically low, so test we |
| # got less than 100 |
| expect_true(length(unique(s)) < 100L) |
| } |
| for (seed in 4:5) { |
| s <- takeSample(data, TRUE, 200L, seed) |
| expect_equal(length(s), 200L) |
| # Chance of getting all distinct elements is still quite low, so test we |
| # got less than 100 |
| expect_true(length(unique(s)) < 100L) |
| } |
| }) |
| |
| test_that("mapValues() on pairwise RDDs", { |
| multiples <- mapValues(intRdd, function(x) { x * 2 }) |
| actual <- collect(multiples) |
| expected <- lapply(intPairs, function(x) { |
| list(x[[1]], x[[2]] * 2) |
| }) |
| expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) |
| }) |
| |
| test_that("flatMapValues() on pairwise RDDs", { |
| l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4)))) |
| actual <- collect(flatMapValues(l, function(x) { x })) |
| expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) |
| |
| # Generate x to x+1 for every value |
| actual <- collect(flatMapValues(intRdd, function(x) { x: (x + 1) })) |
| expect_equal(actual, |
| list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101), |
| list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201))) |
| }) |
| |
| test_that("reduceByKeyLocally() on PairwiseRDDs", { |
| pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L) |
| actual <- reduceByKeyLocally(pairs, "+") |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list(1, 6), list(1.1, 3)))) |
| |
| pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3), |
| list("bb", 5)), 4L) |
| actual <- reduceByKeyLocally(pairs, "+") |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5)))) |
| }) |
| |
| test_that("distinct() on RDDs", { |
| nums.rep2 <- rep(1:10, 2) |
| rdd.rep2 <- parallelize(sc, nums.rep2, 2L) |
| uniques <- distinct(rdd.rep2) |
| actual <- sort(unlist(collect(uniques))) |
| expect_equal(actual, nums) |
| }) |
| |
| test_that("maximum() on RDDs", { |
| max <- maximum(rdd) |
| expect_equal(max, 10) |
| }) |
| |
| test_that("minimum() on RDDs", { |
| min <- minimum(rdd) |
| expect_equal(min, 1) |
| }) |
| |
| test_that("sumRDD() on RDDs", { |
| sum <- sumRDD(rdd) |
| expect_equal(sum, 55) |
| }) |
| |
| test_that("keyBy on RDDs", { |
| func <- function(x) { x * x } |
| keys <- keyBy(rdd, func) |
| actual <- collect(keys) |
| expect_equal(actual, lapply(nums, function(x) { list(func(x), x) })) |
| }) |
| |
| test_that("repartition/coalesce on RDDs", { |
| rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements |
| |
| # repartition |
| r1 <- repartition(rdd, 2) |
| expect_equal(getNumPartitions(r1), 2L) |
| count <- length(collectPartition(r1, 0L)) |
| expect_true(count >= 8 && count <= 12) |
| |
| r2 <- repartition(rdd, 6) |
| expect_equal(getNumPartitions(r2), 6L) |
| count <- length(collectPartition(r2, 0L)) |
| expect_true(count >= 0 && count <= 4) |
| |
| # coalesce |
| r3 <- coalesce(rdd, 1) |
| expect_equal(getNumPartitions(r3), 1L) |
| count <- length(collectPartition(r3, 0L)) |
| expect_equal(count, 20) |
| }) |
| |
| test_that("sortBy() on RDDs", { |
| sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE) |
| actual <- collect(sortedRdd) |
| expect_equal(actual, as.list(sort(nums, decreasing = TRUE))) |
| |
| rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) |
| sortedRdd2 <- sortBy(rdd2, function(x) { x * x }) |
| actual <- collect(sortedRdd2) |
| expect_equal(actual, as.list(nums)) |
| }) |
| |
| test_that("takeOrdered() on RDDs", { |
| l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) |
| rdd <- parallelize(sc, l) |
| actual <- takeOrdered(rdd, 6L) |
| expect_equal(actual, as.list(sort(unlist(l)))[1:6]) |
| |
| l <- list("e", "d", "c", "d", "a") |
| rdd <- parallelize(sc, l) |
| actual <- takeOrdered(rdd, 3L) |
| expect_equal(actual, as.list(sort(unlist(l)))[1:3]) |
| }) |
| |
| test_that("top() on RDDs", { |
| l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) |
| rdd <- parallelize(sc, l) |
| actual <- top(rdd, 6L) |
| expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6]) |
| |
| l <- list("e", "d", "c", "d", "a") |
| rdd <- parallelize(sc, l) |
| actual <- top(rdd, 3L) |
| expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3]) |
| }) |
| |
| test_that("fold() on RDDs", { |
| actual <- fold(rdd, 0, "+") |
| expect_equal(actual, Reduce("+", nums, 0)) |
| |
| rdd <- parallelize(sc, list()) |
| actual <- fold(rdd, 0, "+") |
| expect_equal(actual, 0) |
| }) |
| |
| test_that("aggregateRDD() on RDDs", { |
| rdd <- parallelize(sc, list(1, 2, 3, 4)) |
| zeroValue <- list(0, 0) |
| seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } |
| combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } |
| actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) |
| expect_equal(actual, list(10, 4)) |
| |
| rdd <- parallelize(sc, list()) |
| actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) |
| expect_equal(actual, list(0, 0)) |
| }) |
| |
| test_that("zipWithUniqueId() on RDDs", { |
| rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) |
| actual <- collect(zipWithUniqueId(rdd)) |
| expected <- list(list("a", 0), list("b", 3), list("c", 1), |
| list("d", 4), list("e", 2)) |
| expect_equal(actual, expected) |
| |
| rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) |
| actual <- collect(zipWithUniqueId(rdd)) |
| expected <- list(list("a", 0), list("b", 1), list("c", 2), |
| list("d", 3), list("e", 4)) |
| expect_equal(actual, expected) |
| }) |
| |
| test_that("zipWithIndex() on RDDs", { |
| rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) |
| actual <- collect(zipWithIndex(rdd)) |
| expected <- list(list("a", 0), list("b", 1), list("c", 2), |
| list("d", 3), list("e", 4)) |
| expect_equal(actual, expected) |
| |
| rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) |
| actual <- collect(zipWithIndex(rdd)) |
| expected <- list(list("a", 0), list("b", 1), list("c", 2), |
| list("d", 3), list("e", 4)) |
| expect_equal(actual, expected) |
| }) |
| |
| test_that("glom() on RDD", { |
| rdd <- parallelize(sc, as.list(1:4), 2L) |
| actual <- collect(glom(rdd)) |
| expect_equal(actual, list(list(1, 2), list(3, 4))) |
| }) |
| |
| test_that("keys() on RDDs", { |
| keys <- keys(intRdd) |
| actual <- collect(keys) |
| expect_equal(actual, lapply(intPairs, function(x) { x[[1]] })) |
| }) |
| |
| test_that("values() on RDDs", { |
| values <- values(intRdd) |
| actual <- collect(values) |
| expect_equal(actual, lapply(intPairs, function(x) { x[[2]] })) |
| }) |
| |
| test_that("pipeRDD() on RDDs", { |
| actual <- collect(pipeRDD(rdd, "more")) |
| expected <- as.list(as.character(1:10)) |
| expect_equal(actual, expected) |
| |
| trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n")) |
| actual <- collect(pipeRDD(trailed.rdd, "sort")) |
| expected <- list("", "1", "2", "3") |
| expect_equal(actual, expected) |
| |
| rev.nums <- 9:0 |
| rev.rdd <- parallelize(sc, rev.nums, 2L) |
| actual <- collect(pipeRDD(rev.rdd, "sort")) |
| expected <- as.list(as.character(c(5:9, 0:4))) |
| expect_equal(actual, expected) |
| }) |
| |
| test_that("zipRDD() on RDDs", { |
| rdd1 <- parallelize(sc, 0:4, 2) |
| rdd2 <- parallelize(sc, 1000:1004, 2) |
| actual <- collect(zipRDD(rdd1, rdd2)) |
| expect_equal(actual, |
| list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))) |
| |
| mockFile <- c("Spark is pretty.", "Spark is awesome.") |
| fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") |
| writeLines(mockFile, fileName) |
| |
| rdd <- textFile(sc, fileName, 1) |
| actual <- collect(zipRDD(rdd, rdd)) |
| expected <- lapply(mockFile, function(x) { list(x, x) }) |
| expect_equal(actual, expected) |
| |
| rdd1 <- parallelize(sc, 0:1, 1) |
| actual <- collect(zipRDD(rdd1, rdd)) |
| expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) }) |
| expect_equal(actual, expected) |
| |
| rdd1 <- map(rdd, function(x) { x }) |
| actual <- collect(zipRDD(rdd, rdd1)) |
| expected <- lapply(mockFile, function(x) { list(x, x) }) |
| expect_equal(actual, expected) |
| |
| unlink(fileName) |
| }) |
| |
| test_that("cartesian() on RDDs", { |
| rdd <- parallelize(sc, 1:3) |
| actual <- collect(cartesian(rdd, rdd)) |
| expect_equal(sortKeyValueList(actual), |
| list( |
| list(1, 1), list(1, 2), list(1, 3), |
| list(2, 1), list(2, 2), list(2, 3), |
| list(3, 1), list(3, 2), list(3, 3))) |
| |
| # test case where one RDD is empty |
| emptyRdd <- parallelize(sc, list()) |
| actual <- collect(cartesian(rdd, emptyRdd)) |
| expect_equal(actual, list()) |
| |
| mockFile <- c("Spark is pretty.", "Spark is awesome.") |
| fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") |
| writeLines(mockFile, fileName) |
| |
| rdd <- textFile(sc, fileName) |
| actual <- collect(cartesian(rdd, rdd)) |
| expected <- list( |
| list("Spark is awesome.", "Spark is pretty."), |
| list("Spark is awesome.", "Spark is awesome."), |
| list("Spark is pretty.", "Spark is pretty."), |
| list("Spark is pretty.", "Spark is awesome.")) |
| expect_equal(sortKeyValueList(actual), expected) |
| |
| rdd1 <- parallelize(sc, 0:1) |
| actual <- collect(cartesian(rdd1, rdd)) |
| expect_equal(sortKeyValueList(actual), |
| list( |
| list(0, "Spark is pretty."), |
| list(0, "Spark is awesome."), |
| list(1, "Spark is pretty."), |
| list(1, "Spark is awesome."))) |
| |
| rdd1 <- map(rdd, function(x) { x }) |
| actual <- collect(cartesian(rdd, rdd1)) |
| expect_equal(sortKeyValueList(actual), expected) |
| |
| unlink(fileName) |
| }) |
| |
| test_that("subtract() on RDDs", { |
| l <- list(1, 1, 2, 2, 3, 4) |
| rdd1 <- parallelize(sc, l) |
| |
| # subtract by itself |
| actual <- collect(subtract(rdd1, rdd1)) |
| expect_equal(actual, list()) |
| |
| # subtract by an empty RDD |
| rdd2 <- parallelize(sc, list()) |
| actual <- collect(subtract(rdd1, rdd2)) |
| expect_equal(as.list(sort(as.vector(actual, mode = "integer"))), |
| l) |
| |
| rdd2 <- parallelize(sc, list(2, 4)) |
| actual <- collect(subtract(rdd1, rdd2)) |
| expect_equal(as.list(sort(as.vector(actual, mode = "integer"))), |
| list(1, 1, 3)) |
| |
| l <- list("a", "a", "b", "b", "c", "d") |
| rdd1 <- parallelize(sc, l) |
| rdd2 <- parallelize(sc, list("b", "d")) |
| actual <- collect(subtract(rdd1, rdd2)) |
| expect_equal(as.list(sort(as.vector(actual, mode = "character"))), |
| list("a", "a", "c")) |
| }) |
| |
| test_that("subtractByKey() on pairwise RDDs", { |
| l <- list(list("a", 1), list("b", 4), |
| list("b", 5), list("a", 2)) |
| rdd1 <- parallelize(sc, l) |
| |
| # subtractByKey by itself |
| actual <- collect(subtractByKey(rdd1, rdd1)) |
| expect_equal(actual, list()) |
| |
| # subtractByKey by an empty RDD |
| rdd2 <- parallelize(sc, list()) |
| actual <- collect(subtractByKey(rdd1, rdd2)) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(l)) |
| |
| rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1))) |
| actual <- collect(subtractByKey(rdd1, rdd2)) |
| expect_equal(actual, |
| list(list("b", 4), list("b", 5))) |
| |
| l <- list(list(1, 1), list(2, 4), |
| list(2, 5), list(1, 2)) |
| rdd1 <- parallelize(sc, l) |
| rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1))) |
| actual <- collect(subtractByKey(rdd1, rdd2)) |
| expect_equal(actual, |
| list(list(2, 4), list(2, 5))) |
| }) |
| |
| test_that("intersection() on RDDs", { |
| # intersection with self |
| actual <- collect(intersection(rdd, rdd)) |
| expect_equal(sort(as.integer(actual)), nums) |
| |
| # intersection with an empty RDD |
| emptyRdd <- parallelize(sc, list()) |
| actual <- collect(intersection(rdd, emptyRdd)) |
| expect_equal(actual, list()) |
| |
| rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5)) |
| rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8)) |
| actual <- collect(intersection(rdd1, rdd2)) |
| expect_equal(sort(as.integer(actual)), 1:3) |
| }) |
| |
| test_that("join() on pairwise RDDs", { |
| rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) |
| rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) |
| actual <- collect(join(rdd1, rdd2, 2L)) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3))))) |
| |
| rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4))) |
| rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3))) |
| actual <- collect(join(rdd1, rdd2, 2L)) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3))))) |
| |
| rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) |
| rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) |
| actual <- collect(join(rdd1, rdd2, 2L)) |
| expect_equal(actual, list()) |
| |
| rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) |
| rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) |
| actual <- collect(join(rdd1, rdd2, 2L)) |
| expect_equal(actual, list()) |
| }) |
| |
| test_that("leftOuterJoin() on pairwise RDDs", { |
| rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) |
| rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) |
| actual <- collect(leftOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(expected)) |
| |
| rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4))) |
| rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3))) |
| actual <- collect(leftOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3))) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(expected)) |
| |
| rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) |
| rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) |
| actual <- collect(leftOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL))) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(expected)) |
| |
| rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) |
| rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) |
| actual <- collect(leftOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL))) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(expected)) |
| }) |
| |
| test_that("rightOuterJoin() on pairwise RDDs", { |
| rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3))) |
| rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) |
| actual <- collect(rightOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))) |
| expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) |
| |
| rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3))) |
| rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4))) |
| actual <- collect(rightOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1))) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(expected)) |
| |
| rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) |
| rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) |
| actual <- collect(rightOuterJoin(rdd1, rdd2, 2L)) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4))))) |
| |
| rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) |
| rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) |
| actual <- collect(rightOuterJoin(rdd1, rdd2, 2L)) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3))))) |
| }) |
| |
| test_that("fullOuterJoin() on pairwise RDDs", { |
| rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3))) |
| rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) |
| actual <- collect(fullOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), |
| list(2, list(NULL, 4)), list(3, list(3, NULL))) |
| expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) |
| |
| rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1))) |
| rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4))) |
| actual <- collect(fullOuterJoin(rdd1, rdd2, 2L)) |
| expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), |
| list("a", list(3, 1)), list("c", list(1, NULL))) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(expected)) |
| |
| rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) |
| rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) |
| actual <- collect(fullOuterJoin(rdd1, rdd2, 2L)) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), |
| list(3, list(NULL, 3)), list(4, list(NULL, 4))))) |
| |
| rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) |
| rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) |
| actual <- collect(fullOuterJoin(rdd1, rdd2, 2L)) |
| expect_equal(sortKeyValueList(actual), |
| sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), |
| list("d", list(NULL, 4)), list("c", list(NULL, 3))))) |
| }) |
| |
| test_that("sortByKey() on pairwise RDDs", { |
| numPairsRdd <- map(rdd, function(x) { list (x, x) }) |
| sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE) |
| actual <- collect(sortedRdd) |
| numPairs <- lapply(nums, function(x) { list (x, x) }) |
| expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE)) |
| |
| rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) |
| numPairsRdd2 <- map(rdd2, function(x) { list (x, x) }) |
| sortedRdd2 <- sortByKey(numPairsRdd2) |
| actual <- collect(sortedRdd2) |
| expect_equal(actual, numPairs) |
| |
| # sort by string keys |
| l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5)) |
| rdd3 <- parallelize(sc, l, 2L) |
| sortedRdd3 <- sortByKey(rdd3) |
| actual <- collect(sortedRdd3) |
| expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) |
| |
| # test on the boundary cases |
| |
| # boundary case 1: the RDD to be sorted has only 1 partition |
| rdd4 <- parallelize(sc, l, 1L) |
| sortedRdd4 <- sortByKey(rdd4) |
| actual <- collect(sortedRdd4) |
| expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) |
| |
| # boundary case 2: the sorted RDD has only 1 partition |
| rdd5 <- parallelize(sc, l, 2L) |
| sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L) |
| actual <- collect(sortedRdd5) |
| expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) |
| |
| # boundary case 3: the RDD to be sorted has only 1 element |
| l2 <- list(list("a", 1)) |
| rdd6 <- parallelize(sc, l2, 2L) |
| sortedRdd6 <- sortByKey(rdd6) |
| actual <- collect(sortedRdd6) |
| expect_equal(actual, l2) |
| |
| # boundary case 4: the RDD to be sorted has 0 element |
| l3 <- list() |
| rdd7 <- parallelize(sc, l3, 2L) |
| sortedRdd7 <- sortByKey(rdd7) |
| actual <- collect(sortedRdd7) |
| expect_equal(actual, l3) |
| }) |
| |
| test_that("collectAsMap() on a pairwise RDD", { |
| rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) |
| vals <- collectAsMap(rdd) |
| expect_equal(vals, list(`1` = 2, `3` = 4)) |
| |
| rdd <- parallelize(sc, list(list("a", 1), list("b", 2))) |
| vals <- collectAsMap(rdd) |
| expect_equal(vals, list(a = 1, b = 2)) |
| |
| rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4))) |
| vals <- collectAsMap(rdd) |
| expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4)) |
| |
| rdd <- parallelize(sc, list(list(1, "a"), list(2, "b"))) |
| vals <- collectAsMap(rdd) |
| expect_equal(vals, list(`1` = "a", `2` = "b")) |
| }) |
| |
| test_that("show()", { |
| rdd <- parallelize(sc, list(1:10)) |
| expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+") |
| }) |
| |
| test_that("sampleByKey() on pairwise RDDs", { |
| rdd <- parallelize(sc, 1:2000) |
| pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) }) |
| fractions <- list(a = 0.2, b = 0.1) |
| sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L) |
| expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE) |
| expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE) |
| expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE) |
| expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE) |
| expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE) |
| expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE) |
| |
| rdd <- parallelize(sc, 1:2000) |
| pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) }) |
| fractions <- list(`2` = 0.2, `3` = 0.1) |
| sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L) |
| expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE) |
| expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE) |
| expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE) |
| expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE) |
| expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE) |
| expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE) |
| }) |
| |
| test_that("Test correct concurrency of RRDD.compute()", { |
| rdd <- parallelize(sc, 1:1000, 100) |
| jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row") |
| zrdd <- callJMethod(jrdd, "zip", jrdd) |
| count <- callJMethod(zrdd, "count") |
| expect_equal(count, 1000) |
| }) |