blob: 1a423b7a4001af50bd39a682ef22dc9ee7aab28c [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Predicting Breast Cancer Proliferation Scores with Apache Spark and Apache SystemML\n",
"\n",
"## Machine Learning\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"%matplotlib inline\n",
"\n",
"import os\n",
"\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"from pyspark.sql.functions import col, max\n",
"import systemml # pip3 install systemml\n",
"from systemml import MLContext, dml\n",
"\n",
"plt.rcParams['figure.figsize'] = (10, 6)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ml = MLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Read in train & val data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Settings\n",
"size=256\n",
"grayscale = False\n",
"c = 1 if grayscale else 3\n",
"p = 0.01\n",
"folder = \"data\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if p < 1:\n",
" tr_filename = os.path.join(folder, \"train_{}_sample_{}{}.parquet\".format(p, size, \"_grayscale\" if grayscale else \"\"))\n",
" val_filename = os.path.join(folder, \"val_{}_sample_{}{}.parquet\".format(p, size, \"_grayscale\" if grayscale else \"\"))\n",
"else:\n",
" tr_filename = os.path.join(folder, \"train_{}{}.parquet\".format(size, \"_grayscale\" if grayscale else \"\"))\n",
" val_filename = os.path.join(folder, \"val_{}{}.parquet\".format(size, \"_grayscale\" if grayscale else \"\"))\n",
"train_df = spark.read.load(tr_filename)\n",
"val_df = spark.read.load(val_filename)\n",
"train_df, val_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tc = train_df.count()\n",
"vc = val_df.count()\n",
"tc, vc, tc + vc"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"train_df.select(max(col(\"__INDEX\"))).show()\n",
"train_df.groupBy(\"tumor_score\").count().show()\n",
"val_df.groupBy(\"tumor_score\").count().show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Extract X and Y matrices"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Note: Must use the row index column, or X may not\n",
"# necessarily correspond correctly to Y\n",
"X_df = train_df.select(\"__INDEX\", \"sample\")\n",
"X_val_df = val_df.select(\"__INDEX\", \"sample\")\n",
"y_df = train_df.select(\"__INDEX\", \"tumor_score\")\n",
"y_val_df = val_df.select(\"__INDEX\", \"tumor_score\")\n",
"X_df, X_val_df, y_df, y_val_df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Convert to SystemML Matrices\n",
"Note: This allows for reuse of the matrices on multiple\n",
"subsequent script invocations with only a single\n",
"conversion. Additionally, since the underlying RDDs\n",
"backing the SystemML matrices are maintained, any\n",
"caching will also be maintained."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"# Scale images to [-1,1]\n",
"X = X / 255\n",
"X_val = X_val / 255\n",
"X = X * 2 - 1\n",
"X_val = X_val * 2 - 1\n",
"\n",
"# One-hot encode the labels\n",
"num_tumor_classes = 3\n",
"n = nrow(y)\n",
"n_val = nrow(y_val)\n",
"Y = table(seq(1, n), y, n, num_tumor_classes)\n",
"Y_val = table(seq(1, n_val), y_val, n_val, num_tumor_classes)\n",
"\"\"\"\n",
"outputs = (\"X\", \"X_val\", \"Y\", \"Y_val\")\n",
"script = dml(script).input(X=X_df, X_val=X_val_df, y=y_df, y_val=y_val_df).output(*outputs)\n",
"X, X_val, Y, Y_val = ml.execute(script).get(*outputs)\n",
"X, X_val, Y, Y_val"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Trigger Caching (Optional)\n",
"Note: This will take a while and is not necessary, but doing it\n",
"once will speed up the training below. Otherwise, the cost of\n",
"caching will be spread across the first full loop through the\n",
"data during training."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# script = \"\"\"\n",
"# # Trigger conversions and caching\n",
"# # Note: This may take a while, but will enable faster iteration later\n",
"# print(sum(X))\n",
"# print(sum(Y))\n",
"# print(sum(X_val))\n",
"# print(sum(Y_val))\n",
"# \"\"\"\n",
"# script = dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val)\n",
"# ml.execute(script)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Save Matrices (Optional)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# script = \"\"\"\n",
"# write(X, \"data/X_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# write(Y, \"data/Y_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# write(X_val, \"data/X_val_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# write(Y_val, \"data/Y_val_\"+p+\"_sample_binary\", format=\"binary\")\n",
"# \"\"\"\n",
"# script = dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val, p=p)\n",
"# ml.execute(script)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Softmax Classifier"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sanity Check: Overfit Small Portion"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/softmax_clf.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 1e-2 # learning rate\n",
"mu = 0.9 # momentum\n",
"decay = 0.999 # learning rate decay constant\n",
"batch_size = 32\n",
"epochs = 500\n",
"log_interval = 1\n",
"n = 200 # sample size for overfitting sanity check\n",
"\n",
"# Train\n",
"[W, b] = clf::train(X[1:n,], Y[1:n,], X[1:n,], Y[1:n,], lr, mu, decay, batch_size, epochs, log_interval)\n",
"\"\"\"\n",
"outputs = (\"W\", \"b\")\n",
"script = dml(script).input(X=X, Y=Y, X_val=X_val, Y_val=Y_val).output(*outputs)\n",
"W, b = ml.execute(script).get(*outputs)\n",
"W, b"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/softmax_clf.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 5e-7 # learning rate\n",
"mu = 0.5 # momentum\n",
"decay = 0.999 # learning rate decay constant\n",
"batch_size = 32\n",
"epochs = 1\n",
"log_interval = 10\n",
"\n",
"# Train\n",
"[W, b] = clf::train(X, Y, X_val, Y_val, lr, mu, decay, batch_size, epochs, log_interval)\n",
"\"\"\"\n",
"outputs = (\"W\", \"b\")\n",
"script = dml(script).input(X=X, Y=Y, X_val=X_val, Y_val=Y_val).output(*outputs)\n",
"W, b = ml.execute(script).get(*outputs)\n",
"W, b"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Eval"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/softmax_clf.dml\") as clf\n",
"\n",
"# Eval\n",
"probs = clf::predict(X, W, b)\n",
"[loss, accuracy] = clf::eval(probs, Y)\n",
"probs_val = clf::predict(X_val, W, b)\n",
"[loss_val, accuracy_val] = clf::eval(probs_val, Y_val)\n",
"\"\"\"\n",
"outputs = (\"loss\", \"accuracy\", \"loss_val\", \"accuracy_val\")\n",
"script = dml(script).input(X=X, Y=Y, X_val=X_val, Y_val=Y_val, W=W, b=b).output(*outputs)\n",
"loss, acc, loss_val, acc_val = ml.execute(script).get(*outputs)\n",
"loss, acc, loss_val, acc_val"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# LeNet-like ConvNet"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sanity Check: Overfit Small Portion"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/convnet.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 1e-2 # learning rate\n",
"mu = 0.9 # momentum\n",
"decay = 0.999 # learning rate decay constant\n",
"lambda = 0 #5e-04\n",
"batch_size = 32\n",
"epochs = 300\n",
"log_interval = 1\n",
"dir = \"models/lenet-cnn/sanity/\"\n",
"n = 200 # sample size for overfitting sanity check\n",
"\n",
"# Train\n",
"[Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2] = clf::train(X[1:n,], Y[1:n,], X[1:n,], Y[1:n,], C, Hin, Win, lr, mu, decay, lambda, batch_size, epochs, log_interval, dir)\n",
"\"\"\"\n",
"outputs = (\"Wc1\", \"bc1\", \"Wc2\", \"bc2\", \"Wc3\", \"bc3\", \"Wa1\", \"ba1\", \"Wa2\", \"ba2\")\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val,\n",
" C=c, Hin=size, Win=size)\n",
" .output(*outputs))\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2 = ml.execute(script).get(*outputs)\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Hyperparameter Search"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/convnet.dml\") as clf\n",
"\n",
"dir = \"models/lenet-cnn/hyperparam-search/\"\n",
"\n",
"# TODO: Fix `parfor` so that it can be efficiently used for hyperparameter tuning\n",
"j = 1\n",
"while(j < 2) {\n",
"#parfor(j in 1:10000, par=6) {\n",
" # Hyperparameter Sampling & Settings\n",
" lr = 10 ^ as.scalar(rand(rows=1, cols=1, min=-7, max=-1)) # learning rate\n",
" mu = as.scalar(rand(rows=1, cols=1, min=0.5, max=0.9)) # momentum\n",
" decay = as.scalar(rand(rows=1, cols=1, min=0.9, max=1)) # learning rate decay constant\n",
" lambda = 10 ^ as.scalar(rand(rows=1, cols=1, min=-7, max=-1)) # regularization constant\n",
" batch_size = 32\n",
" epochs = 1\n",
" log_interval = 10\n",
" trial_dir = dir + \"j/\"\n",
"\n",
" # Train\n",
" [Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2] = clf::train(X, Y, X_val, Y_val, C, Hin, Win, lr, mu, decay, lambda, batch_size, epochs, log_interval, trial_dir)\n",
"\n",
" # Eval\n",
" #probs = clf::predict(X, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
" #[loss, accuracy] = clf::eval(probs, Y)\n",
" probs_val = clf::predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
" [loss_val, accuracy_val] = clf::eval(probs_val, Y_val)\n",
"\n",
" # Save hyperparams\n",
" str = \"lr: \" + lr + \", mu: \" + mu + \", decay: \" + decay + \", lambda: \" + lambda + \", batch_size: \" + batch_size\n",
" name = dir + accuracy_val + \",\" + j #+\",\"+accuracy+\",\"+j\n",
" write(str, name)\n",
" j = j + 1\n",
"}\n",
"\"\"\"\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val, C=c, Hin=size, Win=size))\n",
"ml.execute(script)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ml.setStatistics(True)\n",
"ml.setExplain(True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# sc.setLogLevel(\"OFF\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true,
"scrolled": false
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/convnet_distrib_sgd.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 0.00205 # learning rate\n",
"mu = 0.632 # momentum\n",
"decay = 0.99 # learning rate decay constant\n",
"lambda = 0.00385\n",
"batch_size = 1\n",
"parallel_batches = 19\n",
"epochs = 1\n",
"log_interval = 1\n",
"dir = \"models/lenet-cnn/train/\"\n",
"n = 50 #1216 # limit on number of samples (for debugging)\n",
"X = X[1:n,]\n",
"Y = Y[1:n,]\n",
"X_val = X_val[1:n,]\n",
"Y_val = Y_val[1:n,]\n",
"\n",
"# Train\n",
"[Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2] =\n",
" clf::train(X, Y, X_val, Y_val, C, Hin, Win, lr, mu, decay,\n",
" lambda, batch_size, parallel_batches, epochs,\n",
" log_interval, dir)\n",
"\"\"\"\n",
"outputs = (\"Wc1\", \"bc1\", \"Wc2\", \"bc2\", \"Wc3\", \"bc3\",\n",
" \"Wa1\", \"ba1\", \"Wa2\", \"ba2\")\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val,\n",
" C=c, Hin=size, Win=size)\n",
" .output(*outputs))\n",
"outs = ml.execute(script).get(*outputs)\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2 = outs\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/convnet_distrib_sgd.dml\") as clf\n",
"\n",
"# Hyperparameters & Settings\n",
"lr = 0.00205 # learning rate\n",
"mu = 0.632 # momentum\n",
"decay = 0.99 # learning rate decay constant\n",
"lambda = 0.00385\n",
"batch_size = 1\n",
"parallel_batches = 19\n",
"epochs = 1\n",
"log_interval = 1\n",
"dir = \"models/lenet-cnn/train/\"\n",
"\n",
"# Dummy data\n",
"[X, Y, C, Hin, Win] = clf::generate_dummy_data(50) #1216)\n",
"[X_val, Y_val, C, Hin, Win] = clf::generate_dummy_data(100)\n",
"\n",
"# Train\n",
"[Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2] =\n",
" clf::train(X, Y, X_val, Y_val, C, Hin, Win, lr, mu, decay,\n",
" lambda, batch_size, parallel_batches, epochs,\n",
" log_interval, dir)\n",
"\"\"\"\n",
"outputs = (\"Wc1\", \"bc1\", \"Wc2\", \"bc2\", \"Wc3\", \"bc3\",\n",
" \"Wa1\", \"ba1\", \"Wa2\", \"ba2\")\n",
"script = dml(script).output(*outputs)\n",
"outs = ml.execute(script).get(*outputs)\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2 = outs\n",
"Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Eval"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/convnet_distrib_sgd.dml\") as clf\n",
"\n",
"# Eval\n",
"probs = clf::predict(X, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
"[loss, accuracy] = clf::eval(probs, Y)\n",
"probs_val = clf::predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
"[loss_val, accuracy_val] = clf::eval(probs_val, Y_val)\n",
"\"\"\"\n",
"outputs = (\"loss\", \"accuracy\", \"loss_val\", \"accuracy_val\")\n",
"script = (dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val,\n",
" C=c, Hin=size, Win=size,\n",
" Wc1=Wc1, bc1=bc1,\n",
" Wc2=Wc2, bc2=bc2,\n",
" Wc3=Wc3, bc3=bc3,\n",
" Wa1=Wa1, ba1=ba1,\n",
" Wa2=Wa2, ba2=ba2)\n",
" .output(*outputs))\n",
"loss, acc, loss_val, acc_val = ml.execute(script).get(*outputs)\n",
"loss, acc, loss_val, acc_val"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"script = \"\"\"\n",
"source(\"breastcancer/convnet_distrib_sgd.dml\") as clf\n",
"\n",
"# Dummy data\n",
"[X, Y, C, Hin, Win] = clf::generate_dummy_data(1216)\n",
"[X_val, Y_val, C, Hin, Win] = clf::generate_dummy_data(100)\n",
"\n",
"# Eval\n",
"probs = clf::predict(X, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
"[loss, accuracy] = clf::eval(probs, Y)\n",
"probs_val = clf::predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)\n",
"[loss_val, accuracy_val] = clf::eval(probs_val, Y_val)\n",
"\"\"\"\n",
"outputs = (\"loss\", \"accuracy\", \"loss_val\", \"accuracy_val\")\n",
"script = (dml(script).input(Wc1=Wc1, bc1=bc1,\n",
" Wc2=Wc2, bc2=bc2,\n",
" Wc3=Wc3, bc3=bc3,\n",
" Wa1=Wa1, ba1=ba1,\n",
" Wa2=Wa2, ba2=ba2)\n",
" .output(*outputs))\n",
"loss, acc, loss_val, acc_val = ml.execute(script).get(*outputs)\n",
"loss, acc, loss_val, acc_val"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"---"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# script = \"\"\"\n",
"# N = 102400 # num examples\n",
"# C = 3 # num input channels\n",
"# Hin = 256 # input height\n",
"# Win = 256 # input width\n",
"# X = rand(rows=N, cols=C*Hin*Win, pdf=\"normal\")\n",
"# \"\"\"\n",
"# outputs = \"X\"\n",
"# script = dml(script).output(*outputs)\n",
"# thisX = ml.execute(script).get(*outputs)\n",
"# thisX"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# script = \"\"\"\n",
"# f = function(matrix[double] X) return(matrix[double] Y) {\n",
"# while(FALSE){}\n",
"# a = as.scalar(rand(rows=1, cols=1))\n",
"# Y = X * a\n",
"# }\n",
"# Y = f(X)\n",
"# \"\"\"\n",
"# outputs = \"Y\"\n",
"# script = dml(script).input(X=thisX).output(*outputs)\n",
"# thisY = ml.execute(script).get(*outputs)\n",
"# thisY"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 3 + Spark 2.x + SystemML",
"language": "python",
"name": "pyspark3_2.x"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.1"
}
},
"nbformat": 4,
"nbformat_minor": 1
}