| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| library(testthat) |
| |
| context("MLlib regression algorithms, except for tree-based algorithms") |
| |
| # Tests for MLlib regression algorithms in SparkR |
| sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) |
| |
| test_that("formula of spark.glm", { |
| training <- suppressWarnings(createDataFrame(iris)) |
| # directly calling the spark API |
| # dot minus and intercept vs native glm |
| model <- spark.glm(training, Sepal_Width ~ . - Species + 0) |
| vals <- collect(select(predict(model, training), "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # feature interaction vs native glm |
| model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length) |
| vals <- collect(select(predict(model, training), "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # glm should work with long formula |
| training <- suppressWarnings(createDataFrame(iris)) |
| training$LongLongLongLongLongName <- training$Sepal_Width |
| training$VeryLongLongLongLonLongName <- training$Sepal_Length |
| training$AnotherLongLongLongLongName <- training$Species |
| model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName + |
| AnotherLongLongLongLongName) |
| vals <- collect(select(predict(model, training), "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| }) |
| |
| test_that("spark.glm and predict", { |
| training <- suppressWarnings(createDataFrame(iris)) |
| # gaussian family |
| model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) |
| prediction <- predict(model, training) |
| expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") |
| vals <- collect(select(prediction, "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # poisson family |
| model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, |
| family = poisson(link = identity)) |
| prediction <- predict(model, training) |
| expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") |
| vals <- collect(select(prediction, "prediction")) |
| rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, |
| data = iris, family = poisson(link = identity)), iris)) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # Gamma family |
| x <- runif(100, -1, 1) |
| y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10) |
| df <- as.DataFrame(as.data.frame(list(x = x, y = y))) |
| model <- glm(y ~ x, family = Gamma, df) |
| out <- capture.output(print(summary(model))) |
| expect_true(any(grepl("Dispersion parameter for gamma family", out))) |
| |
| # tweedie family |
| model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, |
| family = "tweedie", var.power = 1.2, link.power = 0.0) |
| prediction <- predict(model, training) |
| expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") |
| vals <- collect(select(prediction, "prediction")) |
| |
| # manual calculation of the R predicted values to avoid dependence on statmod |
| #' library(statmod) |
| #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris, |
| #' family = tweedie(var.power = 1.2, link.power = 0.0)) |
| #' print(coef(rModel)) |
| |
| rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174) |
| rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species, |
| data = iris) %*% rCoef)) |
| expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals) |
| |
| # Test stats::predict is working |
| x <- rnorm(15) |
| y <- x + rnorm(15) |
| expect_equal(length(predict(lm(y ~ x))), 15) |
| }) |
| |
| test_that("spark.glm summary", { |
| # prepare dataset |
| Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3) |
| Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5) |
| Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2) |
| Species <- c("setosa", "versicolor", "versicolor", "versicolor", "virginica", "virginica", |
| "versicolor", "virginica") |
| dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, stringsAsFactors = TRUE) |
| |
| # gaussian family |
| training <- suppressWarnings(createDataFrame(dataset)) |
| stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species)) |
| rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset)) |
| |
| # test summary coefficients return matrix type |
| expect_true(any(class(stats$coefficients) == "matrix")) |
| expect_true(class(stats$coefficients[, 1]) == "numeric") |
| |
| coefs <- stats$coefficients |
| rCoefs <- rStats$coefficients |
| expect_true(all(abs(rCoefs - coefs) < 1e-4)) |
| expect_true(all( |
| rownames(stats$coefficients) == |
| c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) |
| expect_equal(stats$dispersion, rStats$dispersion) |
| expect_equal(stats$null.deviance, rStats$null.deviance) |
| expect_equal(stats$deviance, rStats$deviance) |
| expect_equal(stats$df.null, rStats$df.null) |
| expect_equal(stats$df.residual, rStats$df.residual) |
| expect_equal(stats$aic, rStats$aic) |
| |
| out <- capture.output(print(stats)) |
| expect_match(out[2], "Deviance Residuals:") |
| expect_true(any(grepl("AIC: 35.84", out))) |
| |
| # binomial family |
| df <- suppressWarnings(createDataFrame(dataset)) |
| training <- df[df$Species %in% c("versicolor", "virginica"), ] |
| stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width, |
| family = binomial(link = "logit"))) |
| |
| rTraining <- dataset[dataset$Species %in% c("versicolor", "virginica"), ] |
| rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, |
| family = binomial(link = "logit"))) |
| |
| coefs <- stats$coefficients |
| rCoefs <- rStats$coefficients |
| expect_true(all(abs(rCoefs - coefs) < 1e-4)) |
| expect_true(all( |
| rownames(stats$coefficients) == |
| c("(Intercept)", "Sepal_Length", "Sepal_Width"))) |
| expect_equal(stats$dispersion, rStats$dispersion) |
| expect_equal(stats$null.deviance, rStats$null.deviance) |
| expect_equal(stats$deviance, rStats$deviance) |
| expect_equal(stats$df.null, rStats$df.null) |
| expect_equal(stats$df.residual, rStats$df.residual) |
| expect_equal(stats$aic, rStats$aic) |
| |
| # Test spark.glm works with weighted dataset |
| a1 <- c(0, 1, 2, 3) |
| a2 <- c(5, 2, 1, 3) |
| w <- c(1, 2, 3, 4) |
| b <- c(1, 0, 1, 0) |
| data <- as.data.frame(cbind(a1, a2, w, b)) |
| df <- createDataFrame(data) |
| |
| stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w")) |
| rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w)) |
| |
| coefs <- stats$coefficients |
| rCoefs <- rStats$coefficients |
| expect_true(all(abs(rCoefs - coefs) < 1e-3)) |
| expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2"))) |
| expect_equal(stats$dispersion, rStats$dispersion) |
| expect_equal(stats$null.deviance, rStats$null.deviance) |
| expect_equal(stats$deviance, rStats$deviance) |
| expect_equal(stats$df.null, rStats$df.null) |
| expect_equal(stats$df.residual, rStats$df.residual) |
| expect_equal(stats$aic, rStats$aic) |
| |
| # Test spark.glm works with offset |
| training <- suppressWarnings(createDataFrame(dataset)) |
| stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species, |
| family = poisson(), offsetCol = "Petal_Length")) |
| rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species, |
| data = dataset, family = poisson(), offset = dataset$Petal.Length))) |
| expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3)) |
| |
| # Test summary works on base GLM models |
| baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = dataset) |
| baseSummary <- summary(baseModel) |
| expect_true(abs(baseSummary$deviance - 11.84013) < 1e-4) |
| |
| # Test spark.glm works with regularization parameter |
| data <- as.data.frame(cbind(a1, a2, b)) |
| df <- suppressWarnings(createDataFrame(data)) |
| regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0)) |
| expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result |
| |
| # Test spark.glm works on collinear data |
| A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2) |
| b <- c(1, 2, 3, 4) |
| data <- as.data.frame(cbind(A, b)) |
| df <- createDataFrame(data) |
| stats <- summary(spark.glm(df, b ~ . - 1)) |
| coefs <- stats$coefficients |
| expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4)) |
| }) |
| |
| test_that("spark.glm save/load", { |
| training <- suppressWarnings(createDataFrame(iris)) |
| m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) |
| s <- summary(m) |
| |
| modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp") |
| write.ml(m, modelPath) |
| expect_error(write.ml(m, modelPath)) |
| write.ml(m, modelPath, overwrite = TRUE) |
| m2 <- read.ml(modelPath) |
| s2 <- summary(m2) |
| |
| expect_equal(s$coefficients, s2$coefficients) |
| expect_equal(rownames(s$coefficients), rownames(s2$coefficients)) |
| expect_equal(s$dispersion, s2$dispersion) |
| expect_equal(s$null.deviance, s2$null.deviance) |
| expect_equal(s$deviance, s2$deviance) |
| expect_equal(s$df.null, s2$df.null) |
| expect_equal(s$df.residual, s2$df.residual) |
| expect_equal(s$aic, s2$aic) |
| expect_equal(s$iter, s2$iter) |
| expect_true(!s$is.loaded) |
| expect_true(s2$is.loaded) |
| |
| unlink(modelPath) |
| }) |
| |
| test_that("formula of glm", { |
| training <- suppressWarnings(createDataFrame(iris)) |
| # dot minus and intercept vs native glm |
| model <- glm(Sepal_Width ~ . - Species + 0, data = training) |
| vals <- collect(select(predict(model, training), "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # feature interaction vs native glm |
| model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training) |
| vals <- collect(select(predict(model, training), "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # glm should work with long formula |
| training <- suppressWarnings(createDataFrame(iris)) |
| training$LongLongLongLongLongName <- training$Sepal_Width |
| training$VeryLongLongLongLonLongName <- training$Sepal_Length |
| training$AnotherLongLongLongLongName <- training$Species |
| model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName, |
| data = training) |
| vals <- collect(select(predict(model, training), "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| }) |
| |
| test_that("glm and predict", { |
| training <- suppressWarnings(createDataFrame(iris)) |
| # gaussian family |
| model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) |
| prediction <- predict(model, training) |
| expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") |
| vals <- collect(select(prediction, "prediction")) |
| rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # poisson family |
| model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training, |
| family = poisson(link = identity)) |
| prediction <- predict(model, training) |
| expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") |
| vals <- collect(select(prediction, "prediction")) |
| rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, |
| data = iris, family = poisson(link = identity)), iris)) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| |
| # tweedie family |
| model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training, |
| family = "tweedie", var.power = 1.2, link.power = 0.0) |
| prediction <- predict(model, training) |
| expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") |
| vals <- collect(select(prediction, "prediction")) |
| |
| # manual calculation of the R predicted values to avoid dependence on statmod |
| #' library(statmod) |
| #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris, |
| #' family = tweedie(var.power = 1.2, link.power = 0.0)) |
| #' print(coef(rModel)) |
| |
| rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174) |
| rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species, |
| data = iris) %*% rCoef)) |
| expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals) |
| |
| # Test stats::predict is working |
| x <- rnorm(15) |
| y <- x + rnorm(15) |
| expect_equal(length(predict(lm(y ~ x))), 15) |
| }) |
| |
| test_that("glm summary", { |
| # prepare dataset |
| Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3) |
| Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5) |
| Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2) |
| Species <- c("setosa", "versicolor", "versicolor", "versicolor", "virginica", "virginica", |
| "versicolor", "virginica") |
| dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, stringsAsFactors = TRUE) |
| |
| # gaussian family |
| training <- suppressWarnings(createDataFrame(dataset)) |
| stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training)) |
| |
| rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset)) |
| |
| coefs <- stats$coefficients |
| rCoefs <- rStats$coefficients |
| expect_true(all(abs(rCoefs - coefs) < 1e-4)) |
| expect_true(all( |
| rownames(stats$coefficients) == |
| c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) |
| expect_equal(stats$dispersion, rStats$dispersion) |
| expect_equal(stats$null.deviance, rStats$null.deviance) |
| expect_equal(stats$deviance, rStats$deviance) |
| expect_equal(stats$df.null, rStats$df.null) |
| expect_equal(stats$df.residual, rStats$df.residual) |
| expect_equal(stats$aic, rStats$aic) |
| |
| # binomial family |
| df <- suppressWarnings(createDataFrame(dataset)) |
| training <- df[df$Species %in% c("versicolor", "virginica"), ] |
| stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training, |
| family = binomial(link = "logit"))) |
| |
| rTraining <- dataset[dataset$Species %in% c("versicolor", "virginica"), ] |
| rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, |
| family = binomial(link = "logit"))) |
| |
| coefs <- stats$coefficients |
| rCoefs <- rStats$coefficients |
| expect_true(all(abs(rCoefs - coefs) < 1e-4)) |
| expect_true(all( |
| rownames(stats$coefficients) == |
| c("(Intercept)", "Sepal_Length", "Sepal_Width"))) |
| expect_equal(stats$dispersion, rStats$dispersion) |
| expect_equal(stats$null.deviance, rStats$null.deviance) |
| expect_equal(stats$deviance, rStats$deviance) |
| expect_equal(stats$df.null, rStats$df.null) |
| expect_equal(stats$df.residual, rStats$df.residual) |
| expect_equal(stats$aic, rStats$aic) |
| |
| # Test summary works on base GLM models |
| baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) |
| baseSummary <- summary(baseModel) |
| expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) |
| }) |
| |
| test_that("glm save/load", { |
| training <- suppressWarnings(createDataFrame(iris)) |
| m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) |
| s <- summary(m) |
| |
| modelPath <- tempfile(pattern = "glm", fileext = ".tmp") |
| write.ml(m, modelPath) |
| expect_error(write.ml(m, modelPath)) |
| write.ml(m, modelPath, overwrite = TRUE) |
| m2 <- read.ml(modelPath) |
| s2 <- summary(m2) |
| |
| expect_equal(s$coefficients, s2$coefficients) |
| expect_equal(rownames(s$coefficients), rownames(s2$coefficients)) |
| expect_equal(s$dispersion, s2$dispersion) |
| expect_equal(s$null.deviance, s2$null.deviance) |
| expect_equal(s$deviance, s2$deviance) |
| expect_equal(s$df.null, s2$df.null) |
| expect_equal(s$df.residual, s2$df.residual) |
| expect_equal(s$aic, s2$aic) |
| expect_equal(s$iter, s2$iter) |
| expect_true(!s$is.loaded) |
| expect_true(s2$is.loaded) |
| |
| unlink(modelPath) |
| }) |
| |
| test_that("spark.glm and glm with string encoding", { |
| t <- as.data.frame(Titanic, stringsAsFactors = FALSE) |
| df <- createDataFrame(t) |
| |
| # base R |
| rm <- stats::glm(Freq ~ Sex + Age, family = "gaussian", data = t) |
| # spark.glm with default stringIndexerOrderType = "frequencyDesc" |
| sm0 <- spark.glm(df, Freq ~ Sex + Age, family = "gaussian") |
| # spark.glm with stringIndexerOrderType = "alphabetDesc" |
| sm1 <- spark.glm(df, Freq ~ Sex + Age, family = "gaussian", |
| stringIndexerOrderType = "alphabetDesc") |
| # glm with stringIndexerOrderType = "alphabetDesc" |
| sm2 <- glm(Freq ~ Sex + Age, family = "gaussian", data = df, |
| stringIndexerOrderType = "alphabetDesc") |
| |
| rStats <- summary(rm) |
| rCoefs <- rStats$coefficients |
| sStats <- lapply(list(sm0, sm1, sm2), summary) |
| # order by coefficient size since column rendering may be different |
| o <- order(rCoefs[, 1]) |
| |
| # default encoding does not produce same results as R |
| expect_false(all(abs(rCoefs[o, ] - sStats[[1]]$coefficients[o, ]) < 1e-4)) |
| |
| # all estimates should be the same as R with stringIndexerOrderType = "alphabetDesc" |
| test <- lapply(sStats[2:3], function(stats) { |
| expect_true(all(abs(rCoefs[o, ] - stats$coefficients[o, ]) < 1e-4)) |
| expect_equal(stats$dispersion, rStats$dispersion) |
| expect_equal(stats$null.deviance, rStats$null.deviance) |
| expect_equal(stats$deviance, rStats$deviance) |
| expect_equal(stats$df.null, rStats$df.null) |
| expect_equal(stats$df.residual, rStats$df.residual) |
| expect_equal(stats$aic, rStats$aic) |
| }) |
| |
| # fitted values should be equal regardless of string encoding |
| rVals <- predict(rm, t) |
| test <- lapply(list(sm0, sm1, sm2), function(sm) { |
| vals <- collect(select(predict(sm, df), "prediction")) |
| expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) |
| }) |
| }) |
| |
| test_that("spark.isoreg", { |
| label <- c(7.0, 5.0, 3.0, 5.0, 1.0) |
| feature <- c(0.0, 1.0, 2.0, 3.0, 4.0) |
| weight <- c(1.0, 1.0, 1.0, 1.0, 1.0) |
| data <- as.data.frame(cbind(label, feature, weight)) |
| df <- createDataFrame(data) |
| |
| model <- spark.isoreg(df, label ~ feature, isotonic = FALSE, |
| weightCol = "weight") |
| # only allow one variable on the right hand side of the formula |
| expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE)) |
| result <- summary(model) |
| expect_equal(result$predictions, list(7, 5, 4, 4, 1)) |
| |
| # Test model prediction |
| predict_data <- list(list(-2.0), list(-1.0), list(0.5), |
| list(0.75), list(1.0), list(2.0), list(9.0)) |
| predict_df <- createDataFrame(predict_data, c("feature")) |
| predict_result <- collect(select(predict(model, predict_df), "prediction")) |
| expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0)) |
| |
| # Test model save/load |
| if (windows_with_hadoop()) { |
| modelPath <- tempfile(pattern = "spark-isoreg", fileext = ".tmp") |
| write.ml(model, modelPath) |
| expect_error(write.ml(model, modelPath)) |
| write.ml(model, modelPath, overwrite = TRUE) |
| model2 <- read.ml(modelPath) |
| expect_equal(result, summary(model2)) |
| |
| unlink(modelPath) |
| } |
| }) |
| |
| test_that("spark.survreg", { |
| # R code to reproduce the result. |
| # |
| #' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0), |
| #' x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1)) |
| #' library(survival) |
| #' model <- survreg(Surv(time, status) ~ x + sex, rData) |
| #' summary(model) |
| #' predict(model, data) |
| # |
| # -- output of 'summary(model)' |
| # |
| # Value Std. Error z p |
| # (Intercept) 1.315 0.270 4.88 1.07e-06 |
| # x -0.190 0.173 -1.10 2.72e-01 |
| # sex -0.253 0.329 -0.77 4.42e-01 |
| # Log(scale) -1.160 0.396 -2.93 3.41e-03 |
| # |
| # -- output of 'predict(model, data)' |
| # |
| # 1 2 3 4 5 6 7 |
| # 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269 |
| # |
| data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0), |
| list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1)) |
| df <- createDataFrame(data, c("time", "status", "x", "sex")) |
| model <- spark.survreg(df, Surv(time, status) ~ x + sex) |
| stats <- summary(model) |
| coefs <- as.vector(stats$coefficients[, 1]) |
| rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800) |
| expect_equal(coefs, rCoefs, tolerance = 1e-4) |
| expect_true(all( |
| rownames(stats$coefficients) == |
| c("(Intercept)", "x", "sex", "Log(scale)"))) |
| p <- collect(select(predict(model, df), "prediction")) |
| expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035, |
| 2.390146, 2.891269, 2.891269), tolerance = 1e-4) |
| |
| # Test model save/load |
| if (windows_with_hadoop()) { |
| modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp") |
| write.ml(model, modelPath) |
| expect_error(write.ml(model, modelPath)) |
| write.ml(model, modelPath, overwrite = TRUE) |
| model2 <- read.ml(modelPath) |
| stats2 <- summary(model2) |
| coefs2 <- as.vector(stats2$coefficients[, 1]) |
| expect_equal(coefs, coefs2) |
| expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients)) |
| |
| unlink(modelPath) |
| } |
| |
| # Test survival::survreg |
| if (requireNamespace("survival", quietly = TRUE)) { |
| rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0), |
| x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1)) |
| expect_error( |
| model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData), |
| NA) |
| expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4) |
| |
| # Test stringIndexerOrderType |
| rData <- as.data.frame(rData) |
| rData$sex2 <- c("female", "male")[rData$sex + 1] |
| df <- createDataFrame(rData) |
| expect_error( |
| rModel <- survival::survreg(survival::Surv(time, status) ~ x + sex2, rData), NA) |
| rCoefs <- as.numeric(summary(rModel)$table[, 1]) |
| model <- spark.survreg(df, Surv(time, status) ~ x + sex2) |
| coefs <- as.vector(summary(model)$coefficients[, 1]) |
| o <- order(rCoefs) |
| # stringIndexerOrderType = "frequencyDesc" produces different estimates from R |
| expect_false(all(abs(rCoefs[o] - coefs[o]) < 1e-4)) |
| |
| # stringIndexerOrderType = "alphabetDesc" produces the same estimates as R |
| model <- spark.survreg(df, Surv(time, status) ~ x + sex2, |
| stringIndexerOrderType = "alphabetDesc") |
| coefs <- as.vector(summary(model)$coefficients[, 1]) |
| expect_true(all(abs(rCoefs[o] - coefs[o]) < 1e-4)) |
| } |
| |
| test_that("spark.lm", { |
| df <- suppressWarnings(createDataFrame(iris)) |
| |
| model <- spark.lm( |
| df, Sepal_Width ~ ., |
| regParam = 0.01, maxIter = 10 |
| ) |
| |
| prediction1 <- predict(model, df) |
| expect_is(prediction1, "SparkDataFrame") |
| |
| # Test model save/load |
| if (windows_with_hadoop()) { |
| modelPath <- tempfile(pattern = "spark-lm", fileext = ".tmp") |
| write.ml(model, modelPath) |
| model2 <- read.ml(modelPath) |
| |
| expect_is(model2, "LinearRegressionModel") |
| expect_equal(summary(model), summary(model2)) |
| |
| prediction2 <- predict(model2, df) |
| expect_equal( |
| collect(prediction1), |
| collect(prediction2) |
| ) |
| unlink(modelPath) |
| } |
| }) |
| }) |
| |
| |
| test_that("spark.fmRegressor", { |
| df <- suppressWarnings(createDataFrame(iris)) |
| |
| model <- spark.fmRegressor( |
| df, Sepal_Width ~ ., |
| regParam = 0.01, maxIter = 10, fitLinear = TRUE |
| ) |
| |
| prediction1 <- predict(model, df) |
| expect_is(prediction1, "SparkDataFrame") |
| |
| # Test model save/load |
| if (windows_with_hadoop()) { |
| modelPath <- tempfile(pattern = "spark-fmregressor", fileext = ".tmp") |
| write.ml(model, modelPath) |
| model2 <- read.ml(modelPath) |
| |
| expect_is(model2, "FMRegressionModel") |
| expect_equal(summary(model), summary(model2)) |
| |
| prediction2 <- predict(model2, df) |
| expect_equal( |
| collect(prediction1), |
| collect(prediction2) |
| ) |
| unlink(modelPath) |
| } |
| }) |
| |
| sparkR.session.stop() |