blob: f7cd10491349d0dc07f206694551ffd4cee5afae [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Predicting Breast Cancer Proliferation Scores with Apache Spark and Apache SystemML\n",
"## Preprocessing\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"%matplotlib inline\n",
"\n",
"import os\n",
"import shutil\n",
"\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"\n",
"from breastcancer.preprocessing import preprocess, save, train_val_split\n",
"\n",
"# Ship a fresh copy of the `breastcancer` package to the Spark workers.\n",
"# Note: The zip must include the `breastcancer` directory itself,\n",
"# as well as all files within it for `addPyFile` to work correctly.\n",
"# This is equivalent to `zip -r breastcancer.zip breastcancer`.\n",
"dirname = \"breastcancer\"\n",
"zipname = dirname + \".zip\"\n",
"shutil.make_archive(dirname, 'zip', dirname + \"/..\", dirname)\n",
"spark.sparkContext.addPyFile(zipname)\n",
"\n",
"plt.rcParams['figure.figsize'] = (10, 6)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Execute Preprocessing & Save"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# TODO: Filtering tiles and then cutting into samples could result\n",
"# in samples with less tissue than desired, despite that being the\n",
"# procedure of the paper. Look into simply selecting tiles of the\n",
"# desired size to begin with."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Get list of image numbers, minus the broken ones.\n",
"broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}\n",
"slide_nums = sorted(set(range(1,501)) - broken)\n",
"\n",
"# Settings\n",
"training = True\n",
"tile_size = 256\n",
"sample_size = 256\n",
"grayscale = False\n",
"num_partitions = 20000\n",
"add_row_indices = True\n",
"train_frac = 0.8\n",
"split_seed = 24\n",
"folder = \"data\" # Linux-filesystem directory to read raw data\n",
"save_folder = \"data\" # Hadoop-supported directory in which to save DataFrames\n",
"df_path = os.path.join(save_folder, \"samples_{}_{}{}.parquet\".format(\n",
" \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\"))\n",
"train_df_path = os.path.join(save_folder, \"train_{}{}.parquet\".format(sample_size,\n",
" \"_grayscale\" if grayscale else \"\"))\n",
"val_df_path = os.path.join(save_folder, \"val_{}{}.parquet\".format(sample_size,\n",
" \"_grayscale\" if grayscale else \"\"))\n",
"\n",
"df_path, train_df_path, val_df_path"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Process all slides.\n",
"df = preprocess(spark, slide_nums, tile_size=tile_size, sample_size=sample_size,\n",
" grayscale=grayscale, training=training, num_partitions=num_partitions,\n",
" folder=folder)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Save DataFrame of samples.\n",
"save(df, df_path, sample_size, grayscale)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Load full DataFrame from disk.\n",
"df = spark.read.load(df_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Split into train and validation DataFrames based On slide number\n",
"train, val = train_val_split(spark, df, slide_nums, folder, train_frac, add_row_indices,\n",
" seed=split_seed)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Save train and validation DataFrames.\n",
"save(train, train_df_path, sample_size, grayscale)\n",
"save(val, val_df_path, sample_size, grayscale)"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sample Data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### TODO: Wrap this in a function with appropriate default arguments"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Load train and validation DataFrames from disk.\n",
"train = spark.read.load(train_df_path)\n",
"val = spark.read.load(val_df_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Take a stratified sample.\n",
"p=0.01\n",
"train_sample = train.drop(\"__INDEX\").sampleBy(\"tumor_score\", fractions={1: p, 2: p, 3: p}, seed=42)\n",
"val_sample = val.drop(\"__INDEX\").sampleBy(\"tumor_score\", fractions={1: p, 2: p, 3: p}, seed=42)\n",
"\n",
"train_sample, val_sample"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Reassign row indices.\n",
"# TODO: Wrap this in a function with appropriate default arguments.\n",
"train_sample = (\n",
" train_sample.rdd\n",
" .zipWithIndex()\n",
" .map(lambda r: (r[1] + 1, *r[0]))\n",
" .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
"train_sample = train_sample.select(train_sample[\"__INDEX\"].astype(\"int\"),\n",
" train_sample.slide_num.astype(\"int\"),\n",
" train_sample.tumor_score.astype(\"int\"),\n",
" train_sample.molecular_score,\n",
" train_sample[\"sample\"])\n",
"\n",
"val_sample = (\n",
" val_sample.rdd\n",
" .zipWithIndex()\n",
" .map(lambda r: (r[1] + 1, *r[0]))\n",
" .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
"val_sample = val_sample.select(val_sample[\"__INDEX\"].astype(\"int\"),\n",
" val_sample.slide_num.astype(\"int\"),\n",
" val_sample.tumor_score.astype(\"int\"),\n",
" val_sample.molecular_score,\n",
" val_sample[\"sample\"])\n",
"\n",
"train_sample, val_sample"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Save train and validation DataFrames.\n",
"tr_sample_filename = \"train_{}_sample_{}{}.parquet\".format(p, sample_size,\n",
" \"_grayscale\" if grayscale else \"\")\n",
"val_sample_filename = \"val_{}_sample_{}{}.parquet\".format(p, sample_size,\n",
" \"_grayscale\" if grayscale else \"\")\n",
"train_sample_path = os.path.join(save_folder, tr_sample_filename)\n",
"val_sample_path = os.path.join(save_folder, val_sample_filename)\n",
"save(train_sample, train_sample_path, sample_size, grayscale)\n",
"save(val_sample, val_sample_path, sample_size, grayscale)"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 3 + Spark 2.x + SystemML",
"language": "python",
"name": "pyspark3_2.x"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.1"
}
},
"nbformat": 4,
"nbformat_minor": 1
}