| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# SystemML PySpark Recommendation Demo\n", |
| "\n", |
| "This demonstrates using SystemML for product recommendationg using Poisson NonNegative Matrix Factorization (PNMF) with PNMF algorithm written using R like syntax.\n", |
| "This includes following steps:\n", |
| " 1. Installation of SystemML library.\n", |
| " 2. Download and load the data.\n", |
| " 3. Write PNMF algorithm using R-like syntax and run it using SystemML and show losses.\n", |
| " 4. Uninstall SystemML library." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "### This notebook is supported with SystemML 0.14.0 and above." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "!pip show systemml" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "%load_ext autoreload\n", |
| "%autoreload 2\n", |
| "%matplotlib inline\n", |
| "\n", |
| "import numpy as np\n", |
| "import matplotlib.pyplot as plt\n", |
| "from systemml import MLContext, dml # pip install systeml\n", |
| "plt.rcParams['figure.figsize'] = (10, 6)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Data" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "%%sh\n", |
| "# Download dataset\n", |
| "curl -O http://snap.stanford.edu/data/amazon0601.txt.gz\n", |
| "gunzip amazon0601.txt.gz" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "# Load data\n", |
| "import pyspark.sql.functions as F\n", |
| "dataPath = \"amazon0601.txt\"\n", |
| "\n", |
| "X_train = (sc.textFile(dataPath)\n", |
| " .filter(lambda l: not l.startswith(\"#\"))\n", |
| " .map(lambda l: l.split(\"\\t\"))\n", |
| " .map(lambda prods: (int(prods[0]), int(prods[1]), 1.0))\n", |
| " .toDF((\"prod_i\", \"prod_j\", \"x_ij\"))\n", |
| " .filter(\"prod_i < 500 AND prod_j < 500\") # Filter for memory constraints\n", |
| " .cache())\n", |
| "\n", |
| "max_prod_i = X_train.select(F.max(\"prod_i\")).first()[0]\n", |
| "max_prod_j = X_train.select(F.max(\"prod_j\")).first()[0]\n", |
| "numProducts = max(max_prod_i, max_prod_j) + 1 # 0-based indexing\n", |
| "print(\"Total number of products: {}\".format(numProducts))" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# SystemML - Poisson Nonnegative Matrix Factorization (PNMF)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "# Create SystemML MLContext\n", |
| "ml = MLContext(sc)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "# Define PNMF kernel in SystemML's DSL using the R-like syntax for PNMF\n", |
| "pnmf = \"\"\"\n", |
| "# data & args\n", |
| "X = X+1 # change product IDs to be 1-based, rather than 0-based\n", |
| "V = table(X[,1], X[,2])\n", |
| "size = ifdef($size, -1)\n", |
| "if(size > -1) {\n", |
| " V = V[1:size,1:size]\n", |
| "}\n", |
| "\n", |
| "n = nrow(V)\n", |
| "m = ncol(V)\n", |
| "range = 0.01\n", |
| "W = Rand(rows=n, cols=rank, min=0, max=range, pdf=\"uniform\")\n", |
| "H = Rand(rows=rank, cols=m, min=0, max=range, pdf=\"uniform\")\n", |
| "losses = matrix(0, rows=max_iter, cols=1)\n", |
| "\n", |
| "# run PNMF\n", |
| "i=1\n", |
| "while(i <= max_iter) {\n", |
| " # update params\n", |
| " H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W)) \n", |
| " W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))\n", |
| " \n", |
| " # compute loss\n", |
| " losses[i,] = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H)))\n", |
| " i = i + 1;\n", |
| "}\n", |
| "\"\"\"" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "# Run the PNMF script on SystemML with Spark\n", |
| "script = dml(pnmf).input(X=X_train, max_iter=100, rank=10).output(\"W\", \"H\", \"losses\")\n", |
| "W, H, losses = ml.execute(script).get(\"W\", \"H\", \"losses\")" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "# Plot training loss over time\n", |
| "xy = losses.toDF().sort(\"__INDEX\").rdd.map(lambda r: (r[0], r[1])).collect()\n", |
| "x, y = zip(*xy)\n", |
| "plt.plot(x, y)\n", |
| "plt.xlabel('Iteration')\n", |
| "plt.ylabel('Loss')\n", |
| "plt.title('PNMF Training Loss')" |
| ] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 2", |
| "language": "python", |
| "name": "python2" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 2 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython2", |
| "version": "2.7.13" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 1 |
| } |