| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import array |
| import sys |
| from collections import namedtuple |
| |
| from pyspark import SparkContext, since |
| from pyspark.rdd import RDD |
| from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc |
| from pyspark.mllib.util import JavaLoader, JavaSaveable |
| from pyspark.sql import DataFrame |
| |
| __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating'] |
| |
| |
| class Rating(namedtuple("Rating", ["user", "product", "rating"])): |
| """ |
| Represents a (user, product, rating) tuple. |
| |
| >>> r = Rating(1, 2, 5.0) |
| >>> (r.user, r.product, r.rating) |
| (1, 2, 5.0) |
| >>> (r[0], r[1], r[2]) |
| (1, 2, 5.0) |
| |
| .. versionadded:: 1.2.0 |
| """ |
| |
| def __reduce__(self): |
| return Rating, (int(self.user), int(self.product), float(self.rating)) |
| |
| |
| @inherit_doc |
| class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader): |
| |
| """A matrix factorisation model trained by regularized alternating |
| least-squares. |
| |
| >>> r1 = (1, 1, 1.0) |
| >>> r2 = (1, 2, 2.0) |
| >>> r3 = (2, 1, 2.0) |
| >>> ratings = sc.parallelize([r1, r2, r3]) |
| >>> model = ALS.trainImplicit(ratings, 1, seed=10) |
| >>> model.predict(2, 2) |
| 0.4... |
| |
| >>> testset = sc.parallelize([(1, 2), (1, 1)]) |
| >>> model = ALS.train(ratings, 2, seed=0) |
| >>> model.predictAll(testset).collect() |
| [Rating(user=1, product=1, rating=1.0...), Rating(user=1, product=2, rating=1.9...)] |
| |
| >>> model = ALS.train(ratings, 4, seed=10) |
| >>> model.userFeatures().collect() |
| [(1, array('d', [...])), (2, array('d', [...]))] |
| |
| >>> model.recommendUsers(1, 2) |
| [Rating(user=2, product=1, rating=1.9...), Rating(user=1, product=1, rating=1.0...)] |
| >>> model.recommendProducts(1, 2) |
| [Rating(user=1, product=2, rating=1.9...), Rating(user=1, product=1, rating=1.0...)] |
| >>> model.rank |
| 4 |
| |
| >>> first_user = model.userFeatures().take(1)[0] |
| >>> latents = first_user[1] |
| >>> len(latents) |
| 4 |
| |
| >>> model.productFeatures().collect() |
| [(1, array('d', [...])), (2, array('d', [...]))] |
| |
| >>> first_product = model.productFeatures().take(1)[0] |
| >>> latents = first_product[1] |
| >>> len(latents) |
| 4 |
| |
| >>> products_for_users = model.recommendProductsForUsers(1).collect() |
| >>> len(products_for_users) |
| 2 |
| >>> products_for_users[0] |
| (1, (Rating(user=1, product=2, rating=...),)) |
| |
| >>> users_for_products = model.recommendUsersForProducts(1).collect() |
| >>> len(users_for_products) |
| 2 |
| >>> users_for_products[0] |
| (1, (Rating(user=2, product=1, rating=...),)) |
| |
| >>> model = ALS.train(ratings, 1, nonnegative=True, seed=10) |
| >>> model.predict(2, 2) |
| 3.73... |
| |
| >>> df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)]) |
| >>> model = ALS.train(df, 1, nonnegative=True, seed=10) |
| >>> model.predict(2, 2) |
| 3.73... |
| |
| >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10) |
| >>> model.predict(2, 2) |
| 0.4... |
| |
| >>> import os, tempfile |
| >>> path = tempfile.mkdtemp() |
| >>> model.save(sc, path) |
| >>> sameModel = MatrixFactorizationModel.load(sc, path) |
| >>> sameModel.predict(2, 2) |
| 0.4... |
| >>> sameModel.predictAll(testset).collect() |
| [Rating(... |
| >>> from shutil import rmtree |
| >>> try: |
| ... rmtree(path) |
| ... except OSError: |
| ... pass |
| |
| .. versionadded:: 0.9.0 |
| """ |
| @since("0.9.0") |
| def predict(self, user, product): |
| """ |
| Predicts rating for the given user and product. |
| """ |
| return self._java_model.predict(int(user), int(product)) |
| |
| @since("0.9.0") |
| def predictAll(self, user_product): |
| """ |
| Returns a list of predicted ratings for input user and product |
| pairs. |
| """ |
| assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" |
| first = user_product.first() |
| assert len(first) == 2, "user_product should be RDD of (user, product)" |
| user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1]))) |
| return self.call("predict", user_product) |
| |
| @since("1.2.0") |
| def userFeatures(self): |
| """ |
| Returns a paired RDD, where the first element is the user and the |
| second is an array of features corresponding to that user. |
| """ |
| return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v)) |
| |
| @since("1.2.0") |
| def productFeatures(self): |
| """ |
| Returns a paired RDD, where the first element is the product and the |
| second is an array of features corresponding to that product. |
| """ |
| return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v)) |
| |
| @since("1.4.0") |
| def recommendUsers(self, product, num): |
| """ |
| Recommends the top "num" number of users for a given product and |
| returns a list of Rating objects sorted by the predicted rating in |
| descending order. |
| """ |
| return list(self.call("recommendUsers", product, num)) |
| |
| @since("1.4.0") |
| def recommendProducts(self, user, num): |
| """ |
| Recommends the top "num" number of products for a given user and |
| returns a list of Rating objects sorted by the predicted rating in |
| descending order. |
| """ |
| return list(self.call("recommendProducts", user, num)) |
| |
| def recommendProductsForUsers(self, num): |
| """ |
| Recommends the top "num" number of products for all users. The |
| number of recommendations returned per user may be less than "num". |
| """ |
| return self.call("wrappedRecommendProductsForUsers", num) |
| |
| def recommendUsersForProducts(self, num): |
| """ |
| Recommends the top "num" number of users for all products. The |
| number of recommendations returned per product may be less than |
| "num". |
| """ |
| return self.call("wrappedRecommendUsersForProducts", num) |
| |
| @property |
| @since("1.4.0") |
| def rank(self): |
| """Rank for the features in this model""" |
| return self.call("rank") |
| |
| @classmethod |
| @since("1.3.1") |
| def load(cls, sc, path): |
| """Load a model from the given path""" |
| model = cls._load_java(sc, path) |
| wrapper = sc._jvm.org.apache.spark.mllib.api.python.MatrixFactorizationModelWrapper(model) |
| return MatrixFactorizationModel(wrapper) |
| |
| |
| class ALS(object): |
| """Alternating Least Squares matrix factorization |
| |
| .. versionadded:: 0.9.0 |
| """ |
| |
| @classmethod |
| def _prepare(cls, ratings): |
| if isinstance(ratings, RDD): |
| pass |
| elif isinstance(ratings, DataFrame): |
| ratings = ratings.rdd |
| else: |
| raise TypeError("Ratings should be represented by either an RDD or a DataFrame, " |
| "but got %s." % type(ratings)) |
| first = ratings.first() |
| if isinstance(first, Rating): |
| pass |
| elif isinstance(first, (tuple, list)): |
| ratings = ratings.map(lambda x: Rating(*x)) |
| else: |
| raise TypeError("Expect a Rating or a tuple/list, but got %s." % type(first)) |
| return ratings |
| |
| @classmethod |
| @since("0.9.0") |
| def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False, |
| seed=None): |
| """ |
| Train a matrix factorization model given an RDD of ratings by users |
| for a subset of products. The ratings matrix is approximated as the |
| product of two lower-rank matrices of a given rank (number of |
| features). To solve for these features, ALS is run iteratively with |
| a configurable level of parallelism. |
| |
| :param ratings: |
| RDD of `Rating` or (userID, productID, rating) tuple. |
| :param rank: |
| Number of features to use (also referred to as the number of latent factors). |
| :param iterations: |
| Number of iterations of ALS. |
| (default: 5) |
| :param lambda_: |
| Regularization parameter. |
| (default: 0.01) |
| :param blocks: |
| Number of blocks used to parallelize the computation. A value |
| of -1 will use an auto-configured number of blocks. |
| (default: -1) |
| :param nonnegative: |
| A value of True will solve least-squares with nonnegativity |
| constraints. |
| (default: False) |
| :param seed: |
| Random seed for initial matrix factorization model. A value |
| of None will use system time as the seed. |
| (default: None) |
| """ |
| model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, |
| lambda_, blocks, nonnegative, seed) |
| return MatrixFactorizationModel(model) |
| |
| @classmethod |
| @since("0.9.0") |
| def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01, |
| nonnegative=False, seed=None): |
| """ |
| Train a matrix factorization model given an RDD of 'implicit |
| preferences' of users for a subset of products. The ratings matrix |
| is approximated as the product of two lower-rank matrices of a |
| given rank (number of features). To solve for these features, ALS |
| is run iteratively with a configurable level of parallelism. |
| |
| :param ratings: |
| RDD of `Rating` or (userID, productID, rating) tuple. |
| :param rank: |
| Number of features to use (also referred to as the number of latent factors). |
| :param iterations: |
| Number of iterations of ALS. |
| (default: 5) |
| :param lambda_: |
| Regularization parameter. |
| (default: 0.01) |
| :param blocks: |
| Number of blocks used to parallelize the computation. A value |
| of -1 will use an auto-configured number of blocks. |
| (default: -1) |
| :param alpha: |
| A constant used in computing confidence. |
| (default: 0.01) |
| :param nonnegative: |
| A value of True will solve least-squares with nonnegativity |
| constraints. |
| (default: False) |
| :param seed: |
| Random seed for initial matrix factorization model. A value |
| of None will use system time as the seed. |
| (default: None) |
| """ |
| model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, |
| iterations, lambda_, blocks, alpha, nonnegative, seed) |
| return MatrixFactorizationModel(model) |
| |
| |
| def _test(): |
| import doctest |
| import pyspark.mllib.recommendation |
| from pyspark.sql import SQLContext |
| globs = pyspark.mllib.recommendation.__dict__.copy() |
| sc = SparkContext('local[4]', 'PythonTest') |
| globs['sc'] = sc |
| globs['sqlContext'] = SQLContext(sc) |
| (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) |
| globs['sc'].stop() |
| if failure_count: |
| sys.exit(-1) |
| |
| |
| if __name__ == "__main__": |
| _test() |