| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| This is an example implementation of ALS for learning how to use Spark. Please refer to |
| ALS in pyspark.mllib.recommendation for more conventional use. |
| |
| This example requires numpy (http://www.numpy.org/) |
| """ |
| from __future__ import print_function |
| |
| import sys |
| |
| import numpy as np |
| from numpy.random import rand |
| from numpy import matrix |
| from pyspark import SparkContext |
| |
| LAMBDA = 0.01 # regularization |
| np.random.seed(42) |
| |
| |
| def rmse(R, ms, us): |
| diff = R - ms * us.T |
| return np.sqrt(np.sum(np.power(diff, 2)) / (M * U)) |
| |
| |
| def update(i, vec, mat, ratings): |
| uu = mat.shape[0] |
| ff = mat.shape[1] |
| |
| XtX = mat.T * mat |
| Xty = mat.T * ratings[i, :].T |
| |
| for j in range(ff): |
| XtX[j, j] += LAMBDA * uu |
| |
| return np.linalg.solve(XtX, Xty) |
| |
| |
| if __name__ == "__main__": |
| |
| """ |
| Usage: als [M] [U] [F] [iterations] [partitions]" |
| """ |
| |
| print("""WARN: This is a naive implementation of ALS and is given as an |
| example. Please use the ALS method found in pyspark.mllib.recommendation for more |
| conventional use.""", file=sys.stderr) |
| |
| sc = SparkContext(appName="PythonALS") |
| M = int(sys.argv[1]) if len(sys.argv) > 1 else 100 |
| U = int(sys.argv[2]) if len(sys.argv) > 2 else 500 |
| F = int(sys.argv[3]) if len(sys.argv) > 3 else 10 |
| ITERATIONS = int(sys.argv[4]) if len(sys.argv) > 4 else 5 |
| partitions = int(sys.argv[5]) if len(sys.argv) > 5 else 2 |
| |
| print("Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" % |
| (M, U, F, ITERATIONS, partitions)) |
| |
| R = matrix(rand(M, F)) * matrix(rand(U, F).T) |
| ms = matrix(rand(M, F)) |
| us = matrix(rand(U, F)) |
| |
| Rb = sc.broadcast(R) |
| msb = sc.broadcast(ms) |
| usb = sc.broadcast(us) |
| |
| for i in range(ITERATIONS): |
| ms = sc.parallelize(range(M), partitions) \ |
| .map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \ |
| .collect() |
| # collect() returns a list, so array ends up being |
| # a 3-d array, we take the first 2 dims for the matrix |
| ms = matrix(np.array(ms)[:, :, 0]) |
| msb = sc.broadcast(ms) |
| |
| us = sc.parallelize(range(U), partitions) \ |
| .map(lambda x: update(x, usb.value[x, :], msb.value, Rb.value.T)) \ |
| .collect() |
| us = matrix(np.array(us)[:, :, 0]) |
| usb = sc.broadcast(us) |
| |
| error = rmse(R, ms, us) |
| print("Iteration %d:" % i) |
| print("\nRMSE: %5.4f\n" % error) |
| |
| sc.stop() |