| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # mllib_clustering.R: Provides methods for MLlib clustering algorithms integration |
| |
| #' S4 class that represents a BisectingKMeansModel |
| #' |
| #' @param jobj a Java object reference to the backing Scala BisectingKMeansModel |
| #' @note BisectingKMeansModel since 2.2.0 |
| setClass("BisectingKMeansModel", representation(jobj = "jobj")) |
| |
| #' S4 class that represents a GaussianMixtureModel |
| #' |
| #' @param jobj a Java object reference to the backing Scala GaussianMixtureModel |
| #' @note GaussianMixtureModel since 2.1.0 |
| setClass("GaussianMixtureModel", representation(jobj = "jobj")) |
| |
| #' S4 class that represents a KMeansModel |
| #' |
| #' @param jobj a Java object reference to the backing Scala KMeansModel |
| #' @note KMeansModel since 2.0.0 |
| setClass("KMeansModel", representation(jobj = "jobj")) |
| |
| #' S4 class that represents an LDAModel |
| #' |
| #' @param jobj a Java object reference to the backing Scala LDAWrapper |
| #' @note LDAModel since 2.1.0 |
| setClass("LDAModel", representation(jobj = "jobj")) |
| |
| #' S4 class that represents a PowerIterationClustering |
| #' |
| #' @param jobj a Java object reference to the backing Scala PowerIterationClustering |
| #' @note PowerIterationClustering since 3.0.0 |
| setClass("PowerIterationClustering", slots = list(jobj = "jobj")) |
| |
| #' Bisecting K-Means Clustering Model |
| #' |
| #' Fits a bisecting k-means clustering model against a SparkDataFrame. |
| #' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make |
| #' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models. |
| #' |
| #' @param data a SparkDataFrame for training. |
| #' @param formula a symbolic description of the model to be fitted. Currently only a few formula |
| #' operators are supported, including '~', '.', ':', '+', '-', '*', and '^'. |
| #' Note that the response variable of formula is empty in spark.bisectingKmeans. |
| #' @param k the desired number of leaf clusters. Must be > 1. |
| #' The actual number could be smaller if there are no divisible leaf clusters. |
| #' @param maxIter maximum iteration number. |
| #' @param seed the random seed. |
| #' @param minDivisibleClusterSize The minimum number of points (if greater than or equal to 1.0) |
| #' or the minimum proportion of points (if less than 1.0) of a |
| #' divisible cluster. Note that it is an expert parameter. The |
| #' default value should be good enough for most cases. |
| #' @param ... additional argument(s) passed to the method. |
| #' @return \code{spark.bisectingKmeans} returns a fitted bisecting k-means model. |
| #' @rdname spark.bisectingKmeans |
| #' @aliases spark.bisectingKmeans,SparkDataFrame,formula-method |
| #' @name spark.bisectingKmeans |
| #' @examples |
| #' \dontrun{ |
| #' sparkR.session() |
| #' t <- as.data.frame(Titanic) |
| #' df <- createDataFrame(t) |
| #' model <- spark.bisectingKmeans(df, Class ~ Survived, k = 4) |
| #' summary(model) |
| #' |
| #' # get fitted result from a bisecting k-means model |
| #' fitted.model <- fitted(model, "centers") |
| #' showDF(fitted.model) |
| #' |
| #' # fitted values on training data |
| #' fitted <- predict(model, df) |
| #' head(select(fitted, "Class", "prediction")) |
| #' |
| #' # save fitted model to input path |
| #' path <- "path/to/model" |
| #' write.ml(model, path) |
| #' |
| #' # can also read back the saved model and print |
| #' savedModel <- read.ml(path) |
| #' summary(savedModel) |
| #' } |
| #' @note spark.bisectingKmeans since 2.2.0 |
| #' @seealso \link{predict}, \link{read.ml}, \link{write.ml} |
| setMethod("spark.bisectingKmeans", signature(data = "SparkDataFrame", formula = "formula"), |
| function(data, formula, k = 4, maxIter = 20, seed = NULL, minDivisibleClusterSize = 1.0) { |
| formula <- paste0(deparse(formula), collapse = "") |
| if (!is.null(seed)) { |
| seed <- as.character(as.integer(seed)) |
| } |
| jobj <- callJStatic("org.apache.spark.ml.r.BisectingKMeansWrapper", "fit", |
| data@sdf, formula, as.integer(k), as.integer(maxIter), |
| seed, as.numeric(minDivisibleClusterSize)) |
| new("BisectingKMeansModel", jobj = jobj) |
| }) |
| |
| # Get the summary of a bisecting k-means model |
| |
| #' @param object a fitted bisecting k-means model. |
| #' @return \code{summary} returns summary information of the fitted model, which is a list. |
| #' The list includes the model's \code{k} (number of cluster centers), |
| #' \code{coefficients} (model cluster centers), |
| #' \code{size} (number of data points in each cluster), \code{cluster} |
| #' (cluster centers of the transformed data; cluster is NULL if is.loaded is TRUE), |
| #' and \code{is.loaded} (whether the model is loaded from a saved file). |
| #' @rdname spark.bisectingKmeans |
| #' @note summary(BisectingKMeansModel) since 2.2.0 |
| setMethod("summary", signature(object = "BisectingKMeansModel"), |
| function(object) { |
| jobj <- object@jobj |
| is.loaded <- callJMethod(jobj, "isLoaded") |
| features <- callJMethod(jobj, "features") |
| coefficients <- callJMethod(jobj, "coefficients") |
| k <- callJMethod(jobj, "k") |
| size <- callJMethod(jobj, "size") |
| coefficients <- t(matrix(coefficients, ncol = k)) |
| colnames(coefficients) <- unlist(features) |
| rownames(coefficients) <- 1:k |
| cluster <- if (is.loaded) { |
| NULL |
| } else { |
| dataFrame(callJMethod(jobj, "cluster")) |
| } |
| list(k = k, coefficients = coefficients, size = size, |
| cluster = cluster, is.loaded = is.loaded) |
| }) |
| |
| # Predicted values based on a bisecting k-means model |
| |
| #' @param newData a SparkDataFrame for testing. |
| #' @return \code{predict} returns the predicted values based on a bisecting k-means model. |
| #' @rdname spark.bisectingKmeans |
| #' @note predict(BisectingKMeansModel) since 2.2.0 |
| setMethod("predict", signature(object = "BisectingKMeansModel"), |
| function(object, newData) { |
| predict_internal(object, newData) |
| }) |
| |
| #' Get fitted result from a bisecting k-means model |
| #' |
| #' Get fitted result from a bisecting k-means model. |
| #' Note: A saved-loaded model does not support this method. |
| #' |
| #' @param method type of fitted results, \code{"centers"} for cluster centers |
| #' or \code{"classes"} for assigned classes. |
| #' @return \code{fitted} returns a SparkDataFrame containing fitted values. |
| #' @rdname spark.bisectingKmeans |
| #' @note fitted since 2.2.0 |
| setMethod("fitted", signature(object = "BisectingKMeansModel"), |
| function(object, method = c("centers", "classes")) { |
| method <- match.arg(method) |
| jobj <- object@jobj |
| is.loaded <- callJMethod(jobj, "isLoaded") |
| if (is.loaded) { |
| stop("Saved-loaded bisecting k-means model does not support 'fitted' method") |
| } else { |
| dataFrame(callJMethod(jobj, "fitted", method)) |
| } |
| }) |
| |
| # Save fitted MLlib model to the input path |
| |
| #' @param path the directory where the model is saved. |
| #' @param overwrite overwrites or not if the output path already exists. Default is FALSE |
| #' which means throw exception if the output path exists. |
| #' |
| #' @rdname spark.bisectingKmeans |
| #' @note write.ml(BisectingKMeansModel, character) since 2.2.0 |
| setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "character"), |
| function(object, path, overwrite = FALSE) { |
| write_internal(object, path, overwrite) |
| }) |
| |
| #' Multivariate Gaussian Mixture Model (GMM) |
| #' |
| #' Fits multivariate gaussian mixture model against a SparkDataFrame, similarly to R's |
| #' mvnormalmixEM(). Users can call \code{summary} to print a summary of the fitted model, |
| #' \code{predict} to make predictions on new data, and \code{write.ml}/\code{read.ml} |
| #' to save/load fitted models. |
| #' |
| #' @param data a SparkDataFrame for training. |
| #' @param formula a symbolic description of the model to be fitted. Currently only a few formula |
| #' operators are supported, including '~', '.', ':', '+', and '-'. |
| #' Note that the response variable of formula is empty in spark.gaussianMixture. |
| #' @param k number of independent Gaussians in the mixture model. |
| #' @param maxIter maximum iteration number. |
| #' @param tol the convergence tolerance. |
| #' @param ... additional arguments passed to the method. |
| #' @aliases spark.gaussianMixture,SparkDataFrame,formula-method |
| #' @return \code{spark.gaussianMixture} returns a fitted multivariate gaussian mixture model. |
| #' @rdname spark.gaussianMixture |
| #' @name spark.gaussianMixture |
| #' @seealso mixtools: \url{https://cran.r-project.org/package=mixtools} |
| #' @examples |
| #' \dontrun{ |
| #' sparkR.session() |
| #' library(mvtnorm) |
| #' set.seed(100) |
| #' a <- rmvnorm(4, c(0, 0)) |
| #' b <- rmvnorm(6, c(3, 4)) |
| #' data <- rbind(a, b) |
| #' df <- createDataFrame(as.data.frame(data)) |
| #' model <- spark.gaussianMixture(df, ~ V1 + V2, k = 2) |
| #' summary(model) |
| #' |
| #' # fitted values on training data |
| #' fitted <- predict(model, df) |
| #' head(select(fitted, "V1", "prediction")) |
| #' |
| #' # save fitted model to input path |
| #' path <- "path/to/model" |
| #' write.ml(model, path) |
| #' |
| #' # can also read back the saved model and print |
| #' savedModel <- read.ml(path) |
| #' summary(savedModel) |
| #' } |
| #' @note spark.gaussianMixture since 2.1.0 |
| #' @seealso \link{predict}, \link{read.ml}, \link{write.ml} |
| setMethod("spark.gaussianMixture", signature(data = "SparkDataFrame", formula = "formula"), |
| function(data, formula, k = 2, maxIter = 100, tol = 0.01) { |
| formula <- paste(deparse(formula), collapse = "") |
| jobj <- callJStatic("org.apache.spark.ml.r.GaussianMixtureWrapper", "fit", data@sdf, |
| formula, as.integer(k), as.integer(maxIter), as.numeric(tol)) |
| new("GaussianMixtureModel", jobj = jobj) |
| }) |
| |
| # Get the summary of a multivariate gaussian mixture model |
| |
| #' @param object a fitted gaussian mixture model. |
| #' @return \code{summary} returns summary of the fitted model, which is a list. |
| #' The list includes the model's \code{lambda} (lambda), \code{mu} (mu), |
| #' \code{sigma} (sigma), \code{loglik} (loglik), and \code{posterior} (posterior). |
| #' @aliases spark.gaussianMixture,SparkDataFrame,formula-method |
| #' @rdname spark.gaussianMixture |
| #' @note summary(GaussianMixtureModel) since 2.1.0 |
| setMethod("summary", signature(object = "GaussianMixtureModel"), |
| function(object) { |
| jobj <- object@jobj |
| is.loaded <- callJMethod(jobj, "isLoaded") |
| lambda <- unlist(callJMethod(jobj, "lambda")) |
| muList <- callJMethod(jobj, "mu") |
| sigmaList <- callJMethod(jobj, "sigma") |
| k <- callJMethod(jobj, "k") |
| dim <- callJMethod(jobj, "dim") |
| loglik <- callJMethod(jobj, "logLikelihood") |
| mu <- c() |
| for (i in 1 : k) { |
| start <- (i - 1) * dim + 1 |
| end <- i * dim |
| mu[[i]] <- unlist(muList[start : end]) |
| } |
| sigma <- c() |
| for (i in 1 : k) { |
| start <- (i - 1) * dim * dim + 1 |
| end <- i * dim * dim |
| sigma[[i]] <- t(matrix(sigmaList[start : end], ncol = dim)) |
| } |
| posterior <- if (is.loaded) { |
| NULL |
| } else { |
| dataFrame(callJMethod(jobj, "posterior")) |
| } |
| list(lambda = lambda, mu = mu, sigma = sigma, loglik = loglik, |
| posterior = posterior, is.loaded = is.loaded) |
| }) |
| |
| # Predicted values based on a gaussian mixture model |
| |
| #' @param newData a SparkDataFrame for testing. |
| #' @return \code{predict} returns a SparkDataFrame containing predicted labels in a column named |
| #' "prediction". |
| #' @aliases predict,GaussianMixtureModel,SparkDataFrame-method |
| #' @rdname spark.gaussianMixture |
| #' @note predict(GaussianMixtureModel) since 2.1.0 |
| setMethod("predict", signature(object = "GaussianMixtureModel"), |
| function(object, newData) { |
| predict_internal(object, newData) |
| }) |
| |
| # Save fitted MLlib model to the input path |
| |
| #' @param path the directory where the model is saved. |
| #' @param overwrite overwrites or not if the output path already exists. Default is FALSE |
| #' which means throw exception if the output path exists. |
| #' |
| #' @aliases write.ml,GaussianMixtureModel,character-method |
| #' @rdname spark.gaussianMixture |
| #' @note write.ml(GaussianMixtureModel, character) since 2.1.0 |
| setMethod("write.ml", signature(object = "GaussianMixtureModel", path = "character"), |
| function(object, path, overwrite = FALSE) { |
| write_internal(object, path, overwrite) |
| }) |
| |
| #' K-Means Clustering Model |
| #' |
| #' Fits a k-means clustering model against a SparkDataFrame, similarly to R's kmeans(). |
| #' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make |
| #' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models. |
| #' |
| #' @param data a SparkDataFrame for training. |
| #' @param formula a symbolic description of the model to be fitted. Currently only a few formula |
| #' operators are supported, including '~', '.', ':', '+', and '-'. |
| #' Note that the response variable of formula is empty in spark.kmeans. |
| #' @param k number of centers. |
| #' @param maxIter maximum iteration number. |
| #' @param initMode the initialization algorithm chosen to fit the model. |
| #' @param seed the random seed for cluster initialization. |
| #' @param initSteps the number of steps for the k-means|| initialization mode. |
| #' This is an advanced setting, the default of 2 is almost always enough. |
| #' Must be > 0. |
| #' @param tol convergence tolerance of iterations. |
| #' @param ... additional argument(s) passed to the method. |
| #' @return \code{spark.kmeans} returns a fitted k-means model. |
| #' @rdname spark.kmeans |
| #' @aliases spark.kmeans,SparkDataFrame,formula-method |
| #' @name spark.kmeans |
| #' @examples |
| #' \dontrun{ |
| #' sparkR.session() |
| #' t <- as.data.frame(Titanic) |
| #' df <- createDataFrame(t) |
| #' model <- spark.kmeans(df, Class ~ Survived, k = 4, initMode = "random") |
| #' summary(model) |
| #' |
| #' # fitted values on training data |
| #' fitted <- predict(model, df) |
| #' head(select(fitted, "Class", "prediction")) |
| #' |
| #' # save fitted model to input path |
| #' path <- "path/to/model" |
| #' write.ml(model, path) |
| #' |
| #' # can also read back the saved model and print |
| #' savedModel <- read.ml(path) |
| #' summary(savedModel) |
| #' } |
| #' @note spark.kmeans since 2.0.0 |
| #' @seealso \link{predict}, \link{read.ml}, \link{write.ml} |
| setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"), |
| function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random"), |
| seed = NULL, initSteps = 2, tol = 1E-4) { |
| formula <- paste(deparse(formula), collapse = "") |
| initMode <- match.arg(initMode) |
| if (!is.null(seed)) { |
| seed <- as.character(as.integer(seed)) |
| } |
| jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", data@sdf, formula, |
| as.integer(k), as.integer(maxIter), initMode, seed, |
| as.integer(initSteps), as.numeric(tol)) |
| new("KMeansModel", jobj = jobj) |
| }) |
| |
| # Get the summary of a k-means model |
| |
| #' @param object a fitted k-means model. |
| #' @return \code{summary} returns summary information of the fitted model, which is a list. |
| #' The list includes the model's \code{k} (the configured number of cluster centers), |
| #' \code{coefficients} (model cluster centers), |
| #' \code{size} (number of data points in each cluster), \code{cluster} |
| #' (cluster centers of the transformed data), {is.loaded} (whether the model is loaded |
| #' from a saved file), and \code{clusterSize} |
| #' (the actual number of cluster centers. When using initMode = "random", |
| #' \code{clusterSize} may not equal to \code{k}). |
| #' @rdname spark.kmeans |
| #' @note summary(KMeansModel) since 2.0.0 |
| setMethod("summary", signature(object = "KMeansModel"), |
| function(object) { |
| jobj <- object@jobj |
| is.loaded <- callJMethod(jobj, "isLoaded") |
| features <- callJMethod(jobj, "features") |
| coefficients <- callJMethod(jobj, "coefficients") |
| k <- callJMethod(jobj, "k") |
| size <- callJMethod(jobj, "size") |
| clusterSize <- callJMethod(jobj, "clusterSize") |
| coefficients <- t(matrix(unlist(coefficients), ncol = clusterSize)) |
| colnames(coefficients) <- unlist(features) |
| rownames(coefficients) <- 1:clusterSize |
| cluster <- if (is.loaded) { |
| NULL |
| } else { |
| dataFrame(callJMethod(jobj, "cluster")) |
| } |
| list(k = k, coefficients = coefficients, size = size, |
| cluster = cluster, is.loaded = is.loaded, clusterSize = clusterSize) |
| }) |
| |
| # Predicted values based on a k-means model |
| |
| #' @param newData a SparkDataFrame for testing. |
| #' @return \code{predict} returns the predicted values based on a k-means model. |
| #' @rdname spark.kmeans |
| #' @note predict(KMeansModel) since 2.0.0 |
| setMethod("predict", signature(object = "KMeansModel"), |
| function(object, newData) { |
| predict_internal(object, newData) |
| }) |
| |
| #' Get fitted result from a k-means model |
| #' |
| #' Get fitted result from a k-means model, similarly to R's fitted(). |
| #' Note: A saved-loaded model does not support this method. |
| #' |
| #' @param object a fitted k-means model. |
| #' @param method type of fitted results, \code{"centers"} for cluster centers |
| #' or \code{"classes"} for assigned classes. |
| #' @param ... additional argument(s) passed to the method. |
| #' @return \code{fitted} returns a SparkDataFrame containing fitted values. |
| #' @rdname fitted |
| #' @examples |
| #' \dontrun{ |
| #' model <- spark.kmeans(trainingData, ~ ., 2) |
| #' fitted.model <- fitted(model) |
| #' showDF(fitted.model) |
| #'} |
| #' @note fitted since 2.0.0 |
| setMethod("fitted", signature(object = "KMeansModel"), |
| function(object, method = c("centers", "classes")) { |
| method <- match.arg(method) |
| jobj <- object@jobj |
| is.loaded <- callJMethod(jobj, "isLoaded") |
| if (is.loaded) { |
| stop("Saved-loaded k-means model does not support 'fitted' method") |
| } else { |
| dataFrame(callJMethod(jobj, "fitted", method)) |
| } |
| }) |
| |
| # Save fitted MLlib model to the input path |
| |
| #' @param path the directory where the model is saved. |
| #' @param overwrite overwrites or not if the output path already exists. Default is FALSE |
| #' which means throw exception if the output path exists. |
| #' |
| #' @rdname spark.kmeans |
| #' @note write.ml(KMeansModel, character) since 2.0.0 |
| setMethod("write.ml", signature(object = "KMeansModel", path = "character"), |
| function(object, path, overwrite = FALSE) { |
| write_internal(object, path, overwrite) |
| }) |
| |
| #' Latent Dirichlet Allocation |
| #' |
| #' \code{spark.lda} fits a Latent Dirichlet Allocation model on a SparkDataFrame. Users can call |
| #' \code{summary} to get a summary of the fitted LDA model, \code{spark.posterior} to compute |
| #' posterior probabilities on new data, \code{spark.perplexity} to compute log perplexity on new |
| #' data and \code{write.ml}/\code{read.ml} to save/load fitted models. |
| #' |
| #' @param data A SparkDataFrame for training. |
| #' @param features Features column name. Either libSVM-format column or character-format column is |
| #' valid. |
| #' @param k Number of topics. |
| #' @param maxIter Maximum iterations. |
| #' @param optimizer Optimizer to train an LDA model, "online" or "em", default is "online". |
| #' @param subsamplingRate (For online optimizer) Fraction of the corpus to be sampled and used in |
| #' each iteration of mini-batch gradient descent, in range (0, 1]. |
| #' @param topicConcentration concentration parameter (commonly named \code{beta} or \code{eta}) for |
| #' the prior placed on topic distributions over terms, default -1 to set automatically on the |
| #' Spark side. Use \code{summary} to retrieve the effective topicConcentration. Only 1-size |
| #' numeric is accepted. |
| #' @param docConcentration concentration parameter (commonly named \code{alpha}) for the |
| #' prior placed on documents distributions over topics (\code{theta}), default -1 to set |
| #' automatically on the Spark side. Use \code{summary} to retrieve the effective |
| #' docConcentration. Only 1-size or \code{k}-size numeric is accepted. |
| #' @param customizedStopWords stopwords that need to be removed from the given corpus. Ignore the |
| #' parameter if libSVM-format column is used as the features column. |
| #' @param maxVocabSize maximum vocabulary size, default 1 << 18 |
| #' @param ... additional argument(s) passed to the method. |
| #' @return \code{spark.lda} returns a fitted Latent Dirichlet Allocation model. |
| #' @rdname spark.lda |
| #' @aliases spark.lda,SparkDataFrame-method |
| #' @seealso topicmodels: \url{https://cran.r-project.org/package=topicmodels} |
| #' @examples |
| #' \dontrun{ |
| #' text <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm") |
| #' model <- spark.lda(data = text, optimizer = "em") |
| #' |
| #' # get a summary of the model |
| #' summary(model) |
| #' |
| #' # compute posterior probabilities |
| #' posterior <- spark.posterior(model, text) |
| #' showDF(posterior) |
| #' |
| #' # compute perplexity |
| #' perplexity <- spark.perplexity(model, text) |
| #' |
| #' # save and load the model |
| #' path <- "path/to/model" |
| #' write.ml(model, path) |
| #' savedModel <- read.ml(path) |
| #' summary(savedModel) |
| #' } |
| #' @note spark.lda since 2.1.0 |
| setMethod("spark.lda", signature(data = "SparkDataFrame"), |
| function(data, features = "features", k = 10, maxIter = 20, optimizer = c("online", "em"), |
| subsamplingRate = 0.05, topicConcentration = -1, docConcentration = -1, |
| customizedStopWords = "", maxVocabSize = bitwShiftL(1, 18)) { |
| optimizer <- match.arg(optimizer) |
| jobj <- callJStatic("org.apache.spark.ml.r.LDAWrapper", "fit", data@sdf, features, |
| as.integer(k), as.integer(maxIter), optimizer, |
| as.numeric(subsamplingRate), topicConcentration, |
| as.array(docConcentration), as.array(customizedStopWords), |
| maxVocabSize) |
| new("LDAModel", jobj = jobj) |
| }) |
| |
| # Returns the summary of a Latent Dirichlet Allocation model produced by \code{spark.lda} |
| |
| #' @param object A Latent Dirichlet Allocation model fitted by \code{spark.lda}. |
| #' @param maxTermsPerTopic Maximum number of terms to collect for each topic. Default value of 10. |
| #' @return \code{summary} returns summary information of the fitted model, which is a list. |
| #' The list includes |
| #' \item{\code{docConcentration}}{concentration parameter commonly named \code{alpha} for |
| #' the prior placed on documents distributions over topics \code{theta}} |
| #' \item{\code{topicConcentration}}{concentration parameter commonly named \code{beta} or |
| #' \code{eta} for the prior placed on topic distributions over terms} |
| #' \item{\code{logLikelihood}}{log likelihood of the entire corpus} |
| #' \item{\code{logPerplexity}}{log perplexity} |
| #' \item{\code{isDistributed}}{TRUE for distributed model while FALSE for local model} |
| #' \item{\code{vocabSize}}{number of terms in the corpus} |
| #' \item{\code{topics}}{top 10 terms and their weights of all topics} |
| #' \item{\code{vocabulary}}{whole terms of the training corpus, NULL if libsvm format file |
| #' used as training set} |
| #' \item{\code{trainingLogLikelihood}}{Log likelihood of the observed tokens in the |
| #' training set, given the current parameter estimates: |
| #' log P(docs | topics, topic distributions for docs, Dirichlet hyperparameters) |
| #' It is only for distributed LDA model (i.e., optimizer = "em")} |
| #' \item{\code{logPrior}}{Log probability of the current parameter estimate: |
| #' log P(topics, topic distributions for docs | Dirichlet hyperparameters) |
| #' It is only for distributed LDA model (i.e., optimizer = "em")} |
| #' @rdname spark.lda |
| #' @aliases summary,LDAModel-method |
| #' @note summary(LDAModel) since 2.1.0 |
| setMethod("summary", signature(object = "LDAModel"), |
| function(object, maxTermsPerTopic) { |
| maxTermsPerTopic <- as.integer(ifelse(missing(maxTermsPerTopic), 10, maxTermsPerTopic)) |
| jobj <- object@jobj |
| docConcentration <- callJMethod(jobj, "docConcentration") |
| topicConcentration <- callJMethod(jobj, "topicConcentration") |
| logLikelihood <- callJMethod(jobj, "logLikelihood") |
| logPerplexity <- callJMethod(jobj, "logPerplexity") |
| isDistributed <- callJMethod(jobj, "isDistributed") |
| vocabSize <- callJMethod(jobj, "vocabSize") |
| topics <- dataFrame(callJMethod(jobj, "topics", maxTermsPerTopic)) |
| vocabulary <- callJMethod(jobj, "vocabulary") |
| trainingLogLikelihood <- if (isDistributed) { |
| callJMethod(jobj, "trainingLogLikelihood") |
| } else { |
| NA |
| } |
| logPrior <- if (isDistributed) { |
| callJMethod(jobj, "logPrior") |
| } else { |
| NA |
| } |
| list(docConcentration = unlist(docConcentration), |
| topicConcentration = topicConcentration, |
| logLikelihood = logLikelihood, logPerplexity = logPerplexity, |
| isDistributed = isDistributed, vocabSize = vocabSize, |
| topics = topics, vocabulary = unlist(vocabulary), |
| trainingLogLikelihood = trainingLogLikelihood, logPrior = logPrior) |
| }) |
| |
| # Returns the log perplexity of a Latent Dirichlet Allocation model produced by \code{spark.lda} |
| |
| #' @return \code{spark.perplexity} returns the log perplexity of given SparkDataFrame, or the log |
| #' perplexity of the training data if missing argument "data". |
| #' @rdname spark.lda |
| #' @aliases spark.perplexity,LDAModel-method |
| #' @note spark.perplexity(LDAModel) since 2.1.0 |
| setMethod("spark.perplexity", signature(object = "LDAModel", data = "SparkDataFrame"), |
| function(object, data) { |
| ifelse(missing(data), callJMethod(object@jobj, "logPerplexity"), |
| callJMethod(object@jobj, "computeLogPerplexity", data@sdf)) |
| }) |
| |
| # Returns posterior probabilities from a Latent Dirichlet Allocation model produced by spark.lda() |
| |
| #' @param newData A SparkDataFrame for testing. |
| #' @return \code{spark.posterior} returns a SparkDataFrame containing posterior probabilities |
| #' vectors named "topicDistribution". |
| #' @rdname spark.lda |
| #' @aliases spark.posterior,LDAModel,SparkDataFrame-method |
| #' @note spark.posterior(LDAModel) since 2.1.0 |
| setMethod("spark.posterior", signature(object = "LDAModel", newData = "SparkDataFrame"), |
| function(object, newData) { |
| predict_internal(object, newData) |
| }) |
| |
| # Saves the Latent Dirichlet Allocation model to the input path. |
| |
| #' @param path The directory where the model is saved. |
| #' @param overwrite Overwrites or not if the output path already exists. Default is FALSE |
| #' which means throw exception if the output path exists. |
| #' |
| #' @rdname spark.lda |
| #' @aliases write.ml,LDAModel,character-method |
| #' @seealso \link{read.ml} |
| #' @note write.ml(LDAModel, character) since 2.1.0 |
| setMethod("write.ml", signature(object = "LDAModel", path = "character"), |
| function(object, path, overwrite = FALSE) { |
| write_internal(object, path, overwrite) |
| }) |
| |
| #' PowerIterationClustering |
| #' |
| #' A scalable graph clustering algorithm. Users can call \code{spark.assignClusters} to |
| #' return a cluster assignment for each input vertex. |
| #' Run the PIC algorithm and returns a cluster assignment for each input vertex. |
| #' @param data a SparkDataFrame. |
| #' @param k the number of clusters to create. |
| #' @param initMode the initialization algorithm; "random" or "degree" |
| #' @param maxIter the maximum number of iterations. |
| #' @param sourceCol the name of the input column for source vertex IDs. |
| #' @param destinationCol the name of the input column for destination vertex IDs |
| #' @param weightCol weight column name. If this is not set or \code{NULL}, |
| #' we treat all instance weights as 1.0. |
| #' @param ... additional argument(s) passed to the method. |
| #' @return A dataset that contains columns of vertex id and the corresponding cluster for the id. |
| #' The schema of it will be: \code{id: integer}, \code{cluster: integer} |
| #' @rdname spark.powerIterationClustering |
| #' @aliases spark.assignClusters,SparkDataFrame-method |
| #' @examples |
| #' \dontrun{ |
| #' df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0), |
| #' list(1L, 2L, 1.0), list(3L, 4L, 1.0), |
| #' list(4L, 0L, 0.1)), |
| #' schema = c("src", "dst", "weight")) |
| #' clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight") |
| #' showDF(clusters) |
| #' } |
| #' @note spark.assignClusters(SparkDataFrame) since 3.0.0 |
| setMethod("spark.assignClusters", |
| signature(data = "SparkDataFrame"), |
| function(data, k = 2L, initMode = c("random", "degree"), maxIter = 20L, |
| sourceCol = "src", destinationCol = "dst", weightCol = NULL) { |
| if (!is.integer(k) || k < 1) { |
| stop("k should be a number with value >= 1.") |
| } |
| if (!is.integer(maxIter) || maxIter <= 0) { |
| stop("maxIter should be a number with value > 0.") |
| } |
| initMode <- match.arg(initMode) |
| if (!is.null(weightCol) && weightCol == "") { |
| weightCol <- NULL |
| } else if (!is.null(weightCol)) { |
| weightCol <- as.character(weightCol) |
| } |
| jobj <- callJStatic("org.apache.spark.ml.r.PowerIterationClusteringWrapper", |
| "getPowerIterationClustering", |
| as.integer(k), initMode, |
| as.integer(maxIter), as.character(sourceCol), |
| as.character(destinationCol), weightCol) |
| object <- new("PowerIterationClustering", jobj = jobj) |
| dataFrame(callJMethod(object@jobj, "assignClusters", data@sdf)) |
| }) |