blob: 0ac880ca9fe4898334f057af89980fe4e06f6399 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Predicting Breast Cancer Proliferation Scores with Apache Spark and Apache SystemML\n",
"\n",
"## Machine Learning\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"%matplotlib inline\n",
"\n",
"import os\n",
"\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"from pyspark.sql.functions import col, max\n",
"import systemml # pip3 install systemml\n",
"from systemml import MLContext, dml\n",
"\n",
"plt.rcParams['figure.figsize'] = (10, 6)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"ml = MLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Read in train & val data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# Settings\n",
"size=256\n",
"grayscale = False\n",
"c = 1 if grayscale else 3\n",
"p = 0.01\n",
"folder = \"data\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"if p < 1:\n",
" tr_filename = os.path.join(folder, \"train_{}_sample_{}{}.parquet\".format(p, size, \"_grayscale\" if grayscale else \"\"))\n",
" val_filename = os.path.join(folder, \"val_{}_sample_{}{}.parquet\".format(p, size, \"_grayscale\" if grayscale else \"\"))\n",
"else:\n",
" tr_filename = os.path.join(folder, \"train_{}{}.parquet\".format(size, \"_grayscale\" if grayscale else \"\"))\n",
" val_filename = os.path.join(folder, \"val_{}{}.parquet\".format(size, \"_grayscale\" if grayscale else \"\"))\n",
"train_df = spark.read.load(tr_filename)\n",
"val_df = spark.read.load(val_filename)\n",
"train_df, val_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"tc = train_df.count()\n",
"vc = val_df.count()\n",
"tc, vc, tc + vc"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"train_df.select(max(col(\"__INDEX\"))).show()\n",
"train_df.groupBy(\"tumor_score\").count().show()\n",
"val_df.groupBy(\"tumor_score\").count().show()"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Extract X and Y matrices"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# Note: Must use the row index column, or X may not\n",
"# necessarily correspond correctly to Y\n",
"X_df = train_df.select(\"__INDEX\", \"sample\")\n",
"X_val_df = val_df.select(\"__INDEX\", \"sample\")\n",
"y_df = train_df.select(\"__INDEX\", \"tumor_score\")\n",
"y_val_df = val_df.select(\"__INDEX\", \"tumor_score\")\n",
"X_df, X_val_df, y_df, y_val_df"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Convert to SystemML Matrices\n",
"Note: This allows for reuse of the matrices on multiple\n",
"subsequent script invocations with only a single\n",
"conversion. Additionally, since the underlying RDDs\n",
"backing the SystemML matrices are maintained, any\n",
"caching will also be maintained."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"# Scale images to [-1,1]\n",
"X = X / 255\n",
"X_val = X_val / 255\n",
"X = X * 2 - 1\n",
"X_val = X_val * 2 - 1\n",
"\n",
"# One-hot encode the labels\n",
"num_tumor_classes = 3\n",
"n = nrow(y)\n",
"n_val = nrow(y_val)\n",
"Y = table(seq(1, n), y, n, num_tumor_classes)\n",
"Y_val = table(seq(1, n_val), y_val, n_val, num_tumor_classes)\n",
"\"\"\"\n",
"outputs = (\"X\", \"X_val\", \"Y\", \"Y_val\")\n",
"script = dml(script).input(X=X_df, X_val=X_val_df, y=y_df, y_val=y_val_df).output(*outputs)\n",
"X, X_val, Y, Y_val = ml.execute(script).get(*outputs)\n",
"X, X_val, Y, Y_val"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Trigger Caching (Optional)\n",
"Note: This will take a while and is not necessary, but doing it\n",
"once will speed up the training below. Otherwise, the cost of\n",
"caching will be spread across the first full loop through the\n",
"data during training."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# script = \"\"\"\n",
"# # Trigger conversions and caching\n",
"# # Note: This may take a while, but will enable faster iteration later\n",
"# print(sum(X))\n",
"# print(sum(Y))\n",
"# print(sum(X_val))\n",
"# print(sum(Y_val))\n",
"# \"\"\"\n",
"# script = dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val)\n",
"# ml.execute(script)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Save Matrices (Optional)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# script = \"\"\"\n",
"# write(X, \"data/X_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# write(Y, \"data/Y_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# write(X_val, \"data/X_val_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# write(Y_val, \"data/Y_val_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# \"\"\"\n",
"# script = dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val, p=p)\n",
"# ml.execute(script)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Softmax Classifier"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Sanity Check: Overfit Small Portion"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"softmax_clf.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 1e-2 # learning rate\n",
"mu = 0.9 # momentum\n",
"decay = 0.999 # learning rate decay constant\n",
"batch_size = 32\n",
"epochs = 500\n",
"log_interval = 1\n",
"n = 200 # sample size for overfitting sanity check\n",
"\n",
"# Train\n",
"[W, b] = clf::train(X[1:n,], Y[1:n,], X[1:n,], Y[1:n,], lr, mu, decay, batch_size, epochs, log_interval)\n",
"\"\"\"\n",
"outputs = (\"W\", \"b\")\n",
"script = dml(script).input(X=X, Y=Y, X_val=X_val, Y_val=Y_val).output(*outputs)\n",
"W, b = ml.execute(script).get(*outputs)\n",
"W, b"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Train"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"softmax_clf.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 5e-7 # learning rate\n",
"mu = 0.5 # momentum\n",
"decay = 0.999 # learning rate decay constant\n",
"batch_size = 32\n",
"epochs = 1\n",
"log_interval = 10\n",
"\n",
"# Train\n",
"[W, b] = clf::train(X, Y, X_val, Y_val, lr, mu, decay, batch_size, epochs, log_interval)\n",
"\"\"\"\n",
"outputs = (\"W\", \"b\")\n",
"script = dml(script).input(X=X, Y=Y, X_val=X_val, Y_val=Y_val).output(*outputs)\n",
"W, b = ml.execute(script).get(*outputs)\n",
"W, b"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Eval"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"softmax_clf.dml\") as clf\n",
"\n",
"# Eval\n",
"probs = clf::predict(X, W, b)\n",
"[loss, accuracy] = clf::eval(probs, Y)\n",
"probs_val = clf::predict(X_val, W, b)\n",
"[loss_val, accuracy_val] = clf::eval(probs_val, Y_val)\n",
"\"\"\"\n",
"outputs = (\"loss\", \"accuracy\", \"loss_val\", \"accuracy_val\")\n",
"script = dml(script).input(X=X, Y=Y, X_val=X_val, Y_val=Y_val, W=W, b=b).output(*outputs)\n",
"loss, acc, loss_val, acc_val = ml.execute(script).get(*outputs)\n",
"loss, acc, loss_val, acc_val"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"source": [
"# LeNet-like ConvNet"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Sanity Check: Overfit Small Portion"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"convnet.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 1e-2 # learning rate\n",
"mu = 0.9 # momentum\n",
"decay = 0.999 # learning rate decay constant\n",
"lambda = 0 #5e-04\n",
"batch_size = 32\n",
"epochs = 300\n",
"log_interval = 1\n",
"dir = \"models/lenet-cnn/sanity/\"\n",
"n = 200 # sample size for overfitting sanity check\n",
"\n",
"# Train\n",
"[Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2] = clf::train(X[1:n,], Y[1:n,], X[1:n,], Y[1:n,], C, Hin, Win, lr, mu, decay, lambda, batch_size, epochs, log_interval, dir)\n",
"\"\"\"\n",
"outputs = (\"Wc1\", \"bc1\", \"Wc2\", \"bc2\", \"Wc3\", \"bc3\", \"Wa1\", \"ba1\", \"Wa2\", \"ba2\")\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val,\n",
" C=c, Hin=size, Win=size)\n",
" .output(*outputs))\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2 = ml.execute(script).get(*outputs)\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Hyperparameter Search"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"convnet.dml\") as clf\n",
"\n",
"dir = \"models/lenet-cnn/hyperparam-search/\"\n",
"\n",
"# TODO: Fix `parfor` so that it can be efficiently used for hyperparameter tuning\n",
"j = 1\n",
"while(j < 2) {\n",
"#parfor(j in 1:10000, par=6) {\n",
" # Hyperparameter Sampling & Settings\n",
" lr = 10 ^ as.scalar(rand(rows=1, cols=1, min=-7, max=-1)) # learning rate\n",
" mu = as.scalar(rand(rows=1, cols=1, min=0.5, max=0.9)) # momentum\n",
" decay = as.scalar(rand(rows=1, cols=1, min=0.9, max=1)) # learning rate decay constant\n",
" lambda = 10 ^ as.scalar(rand(rows=1, cols=1, min=-7, max=-1)) # regularization constant\n",
" batch_size = 32\n",
" epochs = 1\n",
" log_interval = 10\n",
" trial_dir = dir + \"j/\"\n",
"\n",
" # Train\n",
" [Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2] = clf::train(X, Y, X_val, Y_val, C, Hin, Win, lr, mu, decay, lambda, batch_size, epochs, log_interval, trial_dir)\n",
"\n",
" # Eval\n",
" #probs = clf::predict(X, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
" #[loss, accuracy] = clf::eval(probs, Y)\n",
" probs_val = clf::predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
" [loss_val, accuracy_val] = clf::eval(probs_val, Y_val)\n",
"\n",
" # Save hyperparams\n",
" str = \"lr: \" + lr + \", mu: \" + mu + \", decay: \" + decay + \", lambda: \" + lambda + \", batch_size: \" + batch_size\n",
" name = dir + accuracy_val + \",\" + j #+\",\"+accuracy+\",\"+j\n",
" write(str, name)\n",
" j = j + 1\n",
"}\n",
"\"\"\"\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val, C=c, Hin=size, Win=size))\n",
"ml.execute(script)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Train"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"convnet.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 0.00205 # learning rate\n",
"mu = 0.632 # momentum\n",
"decay = 0.99 # learning rate decay constant\n",
"lambda = 0.00385\n",
"batch_size = 32\n",
"epochs = 1\n",
"log_interval = 10\n",
"dir = \"models/lenet-cnn/train/\"\n",
"\n",
"# Train\n",
"[Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2] =\n",
" clf::train(X, Y, X_val, Y_val, C, Hin, Win, lr, mu, decay,\n",
" lambda, batch_size, epochs, log_interval, dir)\n",
"\"\"\"\n",
"outputs = (\"Wc1\", \"bc1\", \"Wc2\", \"bc2\", \"Wc3\", \"bc3\",\n",
" \"Wa1\", \"ba1\", \"Wa2\", \"ba2\")\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val,\n",
" C=c, Hin=size, Win=size)\n",
" .output(*outputs))\n",
"outs = ml.execute(script).get(*outputs)\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2 = outs"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Eval"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"convnet.dml\") as clf\n",
"\n",
"# Eval\n",
"probs = clf::predict(X, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
"[loss, accuracy] = clf::eval(probs, Y)\n",
"probs_val = clf::predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
"[loss_val, accuracy_val] = clf::eval(probs_val, Y_val)\n",
"\"\"\"\n",
"outputs = (\"loss\", \"accuracy\", \"loss_val\", \"accuracy_val\")\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val,\n",
" C=c, Hin=size, Win=size,\n",
" Wc1=Wc1, bc1=bc1,\n",
" Wc2=Wc2, bc2=bc2,\n",
" Wc3=Wc3, bc3=bc3,\n",
" Wa1=Wa1, ba1=ba1,\n",
" Wa2=Wa2, ba2=ba2)\n",
" .output(*outputs))\n",
"loss, acc, loss_val, acc_val = ml.execute(script).get(*outputs)\n",
"loss, acc, loss_val, acc_val"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 + Spark 2.x + SystemML",
"language": "python",
"name": "pyspark3_2.x"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.0"
}
},
"nbformat": 4,
"nbformat_minor": 1
}