blob: 84e2e6697375c8881a3516b79cbc9a1f383095c9 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# SystemML PySpark Recommendation Demo\n",
"\n",
"This demonstrates using SystemML for product recommendationg using Poisson NonNegative Matrix Factorization (PNMF) with PNMF algorithm written using R like syntax.\n",
"This includes following steps:\n",
" 1. Installation of SystemML library.\n",
" 2. Download and load the data.\n",
" 3. Write PNMF algorithm using R-like syntax and run it using SystemML and show losses.\n",
" 4. Uninstall SystemML library."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### This notebook is supported with SystemML 0.14.0 and above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"!pip show systemml"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"%matplotlib inline\n",
"\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from systemml import MLContext, dml # pip install systeml\n",
"plt.rcParams['figure.figsize'] = (10, 6)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%sh\n",
"# Download dataset\n",
"curl -O http://snap.stanford.edu/data/amazon0601.txt.gz\n",
"gunzip amazon0601.txt.gz"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Load data\n",
"import pyspark.sql.functions as F\n",
"dataPath = \"amazon0601.txt\"\n",
"\n",
"X_train = (sc.textFile(dataPath)\n",
" .filter(lambda l: not l.startswith(\"#\"))\n",
" .map(lambda l: l.split(\"\\t\"))\n",
" .map(lambda prods: (int(prods[0]), int(prods[1]), 1.0))\n",
" .toDF((\"prod_i\", \"prod_j\", \"x_ij\"))\n",
" .filter(\"prod_i < 500 AND prod_j < 500\") # Filter for memory constraints\n",
" .cache())\n",
"\n",
"max_prod_i = X_train.select(F.max(\"prod_i\")).first()[0]\n",
"max_prod_j = X_train.select(F.max(\"prod_j\")).first()[0]\n",
"numProducts = max(max_prod_i, max_prod_j) + 1 # 0-based indexing\n",
"print(\"Total number of products: {}\".format(numProducts))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# SystemML - Poisson Nonnegative Matrix Factorization (PNMF)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Create SystemML MLContext\n",
"ml = MLContext(sc)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Define PNMF kernel in SystemML's DSL using the R-like syntax for PNMF\n",
"pnmf = \"\"\"\n",
"# data & args\n",
"X = X+1 # change product IDs to be 1-based, rather than 0-based\n",
"V = table(X[,1], X[,2])\n",
"size = ifdef($size, -1)\n",
"if(size > -1) {\n",
" V = V[1:size,1:size]\n",
"}\n",
"\n",
"n = nrow(V)\n",
"m = ncol(V)\n",
"range = 0.01\n",
"W = Rand(rows=n, cols=rank, min=0, max=range, pdf=\"uniform\")\n",
"H = Rand(rows=rank, cols=m, min=0, max=range, pdf=\"uniform\")\n",
"losses = matrix(0, rows=max_iter, cols=1)\n",
"\n",
"# run PNMF\n",
"i=1\n",
"while(i <= max_iter) {\n",
" # update params\n",
" H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W)) \n",
" W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))\n",
" \n",
" # compute loss\n",
" losses[i,] = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H)))\n",
" i = i + 1;\n",
"}\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Run the PNMF script on SystemML with Spark\n",
"script = dml(pnmf).input(X=X_train, max_iter=100, rank=10).output(\"W\", \"H\", \"losses\")\n",
"W, H, losses = ml.execute(script).get(\"W\", \"H\", \"losses\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Plot training loss over time\n",
"xy = losses.toDF().sort(\"__INDEX\").rdd.map(lambda r: (r[0], r[1])).collect()\n",
"x, y = zip(*xy)\n",
"plt.plot(x, y)\n",
"plt.xlabel('Iteration')\n",
"plt.ylabel('Loss')\n",
"plt.title('PNMF Training Loss')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 1
}