|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | from pyspark import since | 
|  | from pyspark.ml.util import keyword_only | 
|  | from pyspark.ml.wrapper import JavaEstimator, JavaModel | 
|  | from pyspark.ml.param.shared import * | 
|  | from pyspark.mllib.common import inherit_doc | 
|  |  | 
|  |  | 
|  | __all__ = ['ALS', 'ALSModel'] | 
|  |  | 
|  |  | 
|  | @inherit_doc | 
|  | class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed): | 
|  | """ | 
|  | Alternating Least Squares (ALS) matrix factorization. | 
|  |  | 
|  | ALS attempts to estimate the ratings matrix `R` as the product of | 
|  | two lower-rank matrices, `X` and `Y`, i.e. `X * Yt = R`. Typically | 
|  | these approximations are called 'factor' matrices. The general | 
|  | approach is iterative. During each iteration, one of the factor | 
|  | matrices is held constant, while the other is solved for using least | 
|  | squares. The newly-solved factor matrix is then held constant while | 
|  | solving for the other factor matrix. | 
|  |  | 
|  | This is a blocked implementation of the ALS factorization algorithm | 
|  | that groups the two sets of factors (referred to as "users" and | 
|  | "products") into blocks and reduces communication by only sending | 
|  | one copy of each user vector to each product block on each | 
|  | iteration, and only for the product blocks that need that user's | 
|  | feature vector. This is achieved by pre-computing some information | 
|  | about the ratings matrix to determine the "out-links" of each user | 
|  | (which blocks of products it will contribute to) and "in-link" | 
|  | information for each product (which of the feature vectors it | 
|  | receives from each user block it will depend on). This allows us to | 
|  | send only an array of feature vectors between each user block and | 
|  | product block, and have the product block find the users' ratings | 
|  | and update the products based on these messages. | 
|  |  | 
|  | For implicit preference data, the algorithm used is based on | 
|  | "Collaborative Filtering for Implicit Feedback Datasets", available | 
|  | at `http://dx.doi.org/10.1109/ICDM.2008.22`, adapted for the blocked | 
|  | approach used here. | 
|  |  | 
|  | Essentially instead of finding the low-rank approximations to the | 
|  | rating matrix `R`, this finds the approximations for a preference | 
|  | matrix `P` where the elements of `P` are 1 if r > 0 and 0 if r <= 0. | 
|  | The ratings then act as 'confidence' values related to strength of | 
|  | indicated user preferences rather than explicit ratings given to | 
|  | items. | 
|  |  | 
|  | >>> df = sqlContext.createDataFrame( | 
|  | ...     [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)], | 
|  | ...     ["user", "item", "rating"]) | 
|  | >>> als = ALS(rank=10, maxIter=5) | 
|  | >>> model = als.fit(df) | 
|  | >>> model.rank | 
|  | 10 | 
|  | >>> model.userFactors.orderBy("id").collect() | 
|  | [Row(id=0, features=[...]), Row(id=1, ...), Row(id=2, ...)] | 
|  | >>> test = sqlContext.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"]) | 
|  | >>> predictions = sorted(model.transform(test).collect(), key=lambda r: r[0]) | 
|  | >>> predictions[0] | 
|  | Row(user=0, item=2, prediction=-0.13807615637779236) | 
|  | >>> predictions[1] | 
|  | Row(user=1, item=0, prediction=2.6258413791656494) | 
|  | >>> predictions[2] | 
|  | Row(user=2, item=0, prediction=-1.5018409490585327) | 
|  |  | 
|  | .. versionadded:: 1.4.0 | 
|  | """ | 
|  |  | 
|  | # a placeholder to make it appear in the generated doc | 
|  | rank = Param(Params._dummy(), "rank", "rank of the factorization") | 
|  | numUserBlocks = Param(Params._dummy(), "numUserBlocks", "number of user blocks") | 
|  | numItemBlocks = Param(Params._dummy(), "numItemBlocks", "number of item blocks") | 
|  | implicitPrefs = Param(Params._dummy(), "implicitPrefs", "whether to use implicit preference") | 
|  | alpha = Param(Params._dummy(), "alpha", "alpha for implicit preference") | 
|  | userCol = Param(Params._dummy(), "userCol", "column name for user ids") | 
|  | itemCol = Param(Params._dummy(), "itemCol", "column name for item ids") | 
|  | ratingCol = Param(Params._dummy(), "ratingCol", "column name for ratings") | 
|  | nonnegative = Param(Params._dummy(), "nonnegative", | 
|  | "whether to use nonnegative constraint for least squares") | 
|  |  | 
|  | @keyword_only | 
|  | def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, | 
|  | implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, | 
|  | ratingCol="rating", nonnegative=False, checkpointInterval=10): | 
|  | """ | 
|  | __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ | 
|  | implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \ | 
|  | ratingCol="rating", nonnegative=false, checkpointInterval=10) | 
|  | """ | 
|  | super(ALS, self).__init__() | 
|  | self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid) | 
|  | self.rank = Param(self, "rank", "rank of the factorization") | 
|  | self.numUserBlocks = Param(self, "numUserBlocks", "number of user blocks") | 
|  | self.numItemBlocks = Param(self, "numItemBlocks", "number of item blocks") | 
|  | self.implicitPrefs = Param(self, "implicitPrefs", "whether to use implicit preference") | 
|  | self.alpha = Param(self, "alpha", "alpha for implicit preference") | 
|  | self.userCol = Param(self, "userCol", "column name for user ids") | 
|  | self.itemCol = Param(self, "itemCol", "column name for item ids") | 
|  | self.ratingCol = Param(self, "ratingCol", "column name for ratings") | 
|  | self.nonnegative = Param(self, "nonnegative", | 
|  | "whether to use nonnegative constraint for least squares") | 
|  | self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, | 
|  | implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, | 
|  | ratingCol="rating", nonnegative=False, checkpointInterval=10) | 
|  | kwargs = self.__init__._input_kwargs | 
|  | self.setParams(**kwargs) | 
|  |  | 
|  | @keyword_only | 
|  | @since("1.4.0") | 
|  | def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, | 
|  | implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, | 
|  | ratingCol="rating", nonnegative=False, checkpointInterval=10): | 
|  | """ | 
|  | setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ | 
|  | implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \ | 
|  | ratingCol="rating", nonnegative=False, checkpointInterval=10) | 
|  | Sets params for ALS. | 
|  | """ | 
|  | kwargs = self.setParams._input_kwargs | 
|  | return self._set(**kwargs) | 
|  |  | 
|  | def _create_model(self, java_model): | 
|  | return ALSModel(java_model) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setRank(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`rank`. | 
|  | """ | 
|  | self._paramMap[self.rank] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getRank(self): | 
|  | """ | 
|  | Gets the value of rank or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.rank) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setNumUserBlocks(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`numUserBlocks`. | 
|  | """ | 
|  | self._paramMap[self.numUserBlocks] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getNumUserBlocks(self): | 
|  | """ | 
|  | Gets the value of numUserBlocks or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.numUserBlocks) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setNumItemBlocks(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`numItemBlocks`. | 
|  | """ | 
|  | self._paramMap[self.numItemBlocks] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getNumItemBlocks(self): | 
|  | """ | 
|  | Gets the value of numItemBlocks or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.numItemBlocks) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setNumBlocks(self, value): | 
|  | """ | 
|  | Sets both :py:attr:`numUserBlocks` and :py:attr:`numItemBlocks` to the specific value. | 
|  | """ | 
|  | self._paramMap[self.numUserBlocks] = value | 
|  | self._paramMap[self.numItemBlocks] = value | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setImplicitPrefs(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`implicitPrefs`. | 
|  | """ | 
|  | self._paramMap[self.implicitPrefs] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getImplicitPrefs(self): | 
|  | """ | 
|  | Gets the value of implicitPrefs or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.implicitPrefs) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setAlpha(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`alpha`. | 
|  | """ | 
|  | self._paramMap[self.alpha] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getAlpha(self): | 
|  | """ | 
|  | Gets the value of alpha or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.alpha) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setUserCol(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`userCol`. | 
|  | """ | 
|  | self._paramMap[self.userCol] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getUserCol(self): | 
|  | """ | 
|  | Gets the value of userCol or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.userCol) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setItemCol(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`itemCol`. | 
|  | """ | 
|  | self._paramMap[self.itemCol] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getItemCol(self): | 
|  | """ | 
|  | Gets the value of itemCol or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.itemCol) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setRatingCol(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`ratingCol`. | 
|  | """ | 
|  | self._paramMap[self.ratingCol] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getRatingCol(self): | 
|  | """ | 
|  | Gets the value of ratingCol or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.ratingCol) | 
|  |  | 
|  | @since("1.4.0") | 
|  | def setNonnegative(self, value): | 
|  | """ | 
|  | Sets the value of :py:attr:`nonnegative`. | 
|  | """ | 
|  | self._paramMap[self.nonnegative] = value | 
|  | return self | 
|  |  | 
|  | @since("1.4.0") | 
|  | def getNonnegative(self): | 
|  | """ | 
|  | Gets the value of nonnegative or its default value. | 
|  | """ | 
|  | return self.getOrDefault(self.nonnegative) | 
|  |  | 
|  |  | 
|  | class ALSModel(JavaModel): | 
|  | """ | 
|  | Model fitted by ALS. | 
|  |  | 
|  | .. versionadded:: 1.4.0 | 
|  | """ | 
|  |  | 
|  | @property | 
|  | @since("1.4.0") | 
|  | def rank(self): | 
|  | """rank of the matrix factorization model""" | 
|  | return self._call_java("rank") | 
|  |  | 
|  | @property | 
|  | @since("1.4.0") | 
|  | def userFactors(self): | 
|  | """ | 
|  | a DataFrame that stores user factors in two columns: `id` and | 
|  | `features` | 
|  | """ | 
|  | return self._call_java("userFactors") | 
|  |  | 
|  | @property | 
|  | @since("1.4.0") | 
|  | def itemFactors(self): | 
|  | """ | 
|  | a DataFrame that stores item factors in two columns: `id` and | 
|  | `features` | 
|  | """ | 
|  | return self._call_java("itemFactors") | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | import doctest | 
|  | from pyspark.context import SparkContext | 
|  | from pyspark.sql import SQLContext | 
|  | globs = globals().copy() | 
|  | # The small batch size here ensures that we see multiple batches, | 
|  | # even in these small test examples: | 
|  | sc = SparkContext("local[2]", "ml.recommendation tests") | 
|  | sqlContext = SQLContext(sc) | 
|  | globs['sc'] = sc | 
|  | globs['sqlContext'] = sqlContext | 
|  | (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) | 
|  | sc.stop() | 
|  | if failure_count: | 
|  | exit(-1) |