blob: d60e622714f45299c550c6f3d9b552a9d48b5a6b [file] [log] [blame]
"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"# Imports"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"%matplotlib inline\n",
"import math\n",
"import os\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"from PIL import Image\n",
"import tensorflow as tf\n",
"import pyspark.sql.functions as F\n",
"from breastcancer import input_data\n",
"plt.rcParams['figure.figsize'] = (10, 6)"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# from pyspark.sql import SparkSession\n",
"# spark = (SparkSession.builder.appName(\"KerasResNet50\").getOrCreate())"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Settings"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"size = 256\n",
"channels = 3\n",
"features = size * size * channels\n",
"classes = 3\n",
"p = 1\n",
"val_p = 1\n",
"use_caching = False\n",
"normalize_class_distribution = False\n",
"seed = 123"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Read in train & val data"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Read and sample from full DataFrames\n",
"# TODO: Pull filenames out and simply pass them in as arguments.\n",
"# NOTE: ***Currently hacked read_* with updated data filenames.***\n",
"train_df = input_data.read_train_data(spark, size, channels, p, normalize_class_distribution, seed)\n",
"val_df = input_data.read_val_data(spark, size, channels, val_p, normalize_class_distribution, seed)"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# # Save DataFrames (Optional)\n",
"# mode = \"error\"\n",
"# tr_sample_filename = os.path.join(\"data\", \"train_{}_sample_{}.parquet\".format(p, size))\n",
"# val_sample_filename = os.path.join(\"data\", \"val_{}_sample_{}.parquet\".format(val_p, size))\n",
"# train_df.write.mode(mode).save(tr_sample_filename, format=\"parquet\")\n",
"# val_df.write.mode(mode).save(val_sample_filename, format=\"parquet\")"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"if use_caching:\n",
" train_df.cache()\n",
" val_df.cache()"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Explore class distributions.\n",
"for df in [train_df, val_df]:\n",
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"tc = train_df.count()\n",
"vc = val_df.count()\n",
"print(tc, vc) # updated norm vs: 1801835 498183; original: 3560187 910918"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Sanity check that there are no duplicates.\n",
"if p < 1:\n",
" assert train_df.dropDuplicates().count() == tc\n",
"if val_p < 1:\n",
" assert val_df.dropDuplicates().count() == vc"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Normalize Staining"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"def normalize_staining(x, beta=0.15, alpha=1, light_intensity=240):\n",
" \"\"\"\n",
" Normalize the staining of H&E histology slides.\n",
" \n",
" This function normalizes the staining of H&E histoloy slides.\n",
" \n",
" References:\n",
" - Macenko, Marc, et al. \"A method for normalizing histology slides for\n",
" quantitative analysis.\" Biomedical Imaging: From Nano to Macro, 2009.\n",
" ISBI'09. IEEE International Symposium on. IEEE, 2009.\n",
" -\n",
" -\n",
" \"\"\"\n",
" # Setup.\n",
" x = np.asarray(x)\n",
" h, w, c = x.shape\n",
" x = x.reshape(-1, c).astype(np.float64) # shape (H*W, C)\n",
" \n",
" # Reference stain vectors and stain saturations. We will normalize all slides\n",
" # to these references. To create these, grab the stain vectors and stain\n",
" # saturations from a desirable slide.\n",
" ## Values in reference implementation for use with eigendecomposition approach.\n",
" stain_ref = np.array([0.5626, 0.2159, 0.7201, 0.8012, 0.4062, 0.5581]).reshape(3,2)\n",
" max_sat_ref = np.array([1.9705, 1.0308]).reshape(2,1)\n",
" ## Values for use with SVD approach. These were computed by (1) running the\n",
" ## the eigendecomposition approach to normalize an image, (2) running the\n",
" ## SVD approach on the normalized image, and (3) recording the stain vectors\n",
" ## and max saturations for this (ideal) normalized image.\n",
"# stain_ref = np.array([0.20730702, 0.56170196, 0.80308092, 0.72012455, 0.55864554, 0.4073224]).reshape(3,2)\n",
"# max_sat_ref = np.array([0.99818645, 1.96029115]).reshape(2,1)\n",
" \n",
" # Convert RGB to OD.\n",
" OD = -np.log((x+1)/light_intensity) # shape (H*W, C)\n",
"# OD = -np.log(x/255 + 1e-8)\n",
" \n",
" # Remove data with OD intensity less than beta.\n",
" # I.e. remove transparent pixels.\n",
" # Note: This needs to be checked per channel, rather than\n",
" # taking an average over all channels for a given pixel.\n",
" #OD_thresh = OD[np.logical_not(np.any(OD < beta, 1)), :]\n",
" OD_thresh = OD[np.all(OD >= beta, 1), :] # shape (K, C)\n",
" \n",
" # Calculate eigenvectors.\n",
" eigvals, eigvecs = np.linalg.eig(np.cov(OD_thresh.T)) # np.cov results in inf/nans\n",
"# U, s, V = np.linalg.svd(OD_thresh, full_matrices=False)\n",
" \n",
" # Extract two largest eigenvectors.\n",
" # Note: We swap the sign of the eigvecs here to be consistent\n",
" # with other implementations. Both +/- eigvecs are valid, with\n",
" # the same eigenvalue, so this is okay.\n",
" top_eigvecs = eigvecs[:, np.argsort(eigvals)[-2:]] * -1\n",
"# top_eigvecs = V[0:2, :].T * -1 # shape (C, 2)\n",
" \n",
" # Project thresholded optical density values onto plane spanned by\n",
" # 2 largest eigenvectors.\n",
" proj =, top_eigvecs) # shape (K, 2)\n",
" \n",
" # Calculate angle of each point wrt the first plane direction.\n",
" # Note: the parameters are `np.arctan2(y, x)`\n",
" angles = np.arctan2(proj[:, 1], proj[:, 0]) # shape (K,)\n",
" \n",
" # Find robust extremes (a and 100-a percentiles) of the angle.\n",
" min_angle = np.percentile(angles, alpha)\n",
" max_angle = np.percentile(angles, 100-alpha)\n",
" \n",
" # Convert min/max vectors (extremes) back to OD space.\n",
"# extreme_angles = np.array(\n",
"# [np.cos(min_angle), np.cos(max_angle), np.sin(min_angle), np.sin(max_angle)]\n",
"# ).reshape(2,2)\n",
"# stains =, extreme_angles) # shape (C, 2)\n",
" min_vec =, np.array([np.cos(min_angle), np.sin(min_angle)]).reshape(2,1))\n",
" max_vec =, np.array([np.cos(max_angle), np.sin(max_angle)]).reshape(2,1))\n",
" \n",
" # Merge vectors with hematoxylin first, and eosin second, as a heuristic.\n",
" if min_vec[0] > max_vec[0]:\n",
" stains = np.hstack((min_vec, max_vec))\n",
" else:\n",
" stains = np.hstack((max_vec, min_vec))\n",
" # Calculate saturations of each stain.\n",
" # Note: Here, we solve\n",
" # OD = VS\n",
" # S = V^{-1}OD\n",
" # where `OD` is the matrix of optical density values of our image,\n",
" # `V` is the matrix of stain vectors, and `S` is the matrix of stain\n",
" # saturations. Since this is an overdetermined system, we use the\n",
" # least squares solver, rather than a direct solve.\n",
" sats, _, _, _ = np.linalg.lstsq(stains, OD.T)\n",
" \n",
" # Normalize stain saturations.\n",
" max_sat = np.percentile(sats, 99, axis=1, keepdims=True)\n",
" sats = sats / max_sat * max_sat_ref\n",
" \n",
" # Recreate image.\n",
" # Note: If the image is immediately converted to uint8 with `.astype(np.uint8)`, it will\n",
" # not return the correct values due to the initital values being outside of [0,255].\n",
" # To fix this, we round to the nearest integer, and then clip to [0,255], which is the\n",
" # same behavior as Matlab.\n",
" x_norm = np.exp(, sats)) * light_intensity #- 1\n",
"# x_norm = np.exp(, sats)) * 255 - 1e-8\n",
" x_norm = np.clip(np.round(x_norm), 0, 255).astype(np.uint8)\n",
" x_norm = x_norm.T.reshape(h,w,c)\n",
" \n",
" # Debug.\n",
"# print(\"OD shape: \", OD.shape)\n",
"# print(\"OD_thresh shape: \", OD_thresh.shape)\n",
"# print(\"eigvals: \", eigvals)\n",
"# print(\"sorted eigvals: \", np.argsort(eigvals))\n",
"# print(\"top_eigvecs shape: \", top_eigvecs.shape)\n",
"# print(\"top_eigvecs: \", top_eigvecs)\n",
"# print(\"top 2 eigval indices: \", np.argsort(eigvals)[-2:])\n",
"# print(\"proj shape: \", proj.shape)\n",
"# print(\"proj mean: \", np.mean(proj, axis=0))\n",
"# print(\"angles shape: \", angles.shape)\n",
"# print(\"angles mean: \", np.mean(angles))\n",
"# print(\"min/max angles: \", min_angle, max_angle)\n",
"# print(\"min_vec shape: \", min_vec.shape)\n",
"# print(\"min_vec mean: \", np.mean(min_vec))\n",
"# print(\"max_vec mean: \", np.mean(max_vec))\n",
"# print(\"stains shape: \", stains.shape)\n",
"# print(\"stains: \", stains)\n",
"# print(\"sats shape: \", sats.shape)\n",
"# print(\"sats mean: \", np.mean(sats, axis=1))\n",
"# print(\"max_sat shape: \", max_sat.shape)\n",
"# print(\"max_sat: \", max_sat)\n",
"# print(\"x_norm shape: \", x_norm.shape)\n",
"# print(\"x_norm mean: \", np.mean(x_norm, axis=(0,1)))\n",
"# print(\"x_norm min: \", np.min(x_norm, axis=(0,1)))\n",
"# print(\"x_norm max: \", np.max(x_norm, axis=(0,1)))\n",
"# print(x_norm.dtype)\n",
"# print()\n",
"# # x = x.reshape(h,w,c).astype(np.uint8)\n",
" \n",
" return x_norm"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Compute image channel means"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# tr_means = input_data.compute_channel_means(train_df.rdd, channels, size)\n",
"# val_means = input_data.compute_channel_means(val_df.rdd, channels, size)\n",
"# print(tr_means.shape)\n",
"# print(tr_means, val_means)\n",
"# # Train: [ 194.27633667 145.3067627 181.27861023]\n",
"# # Val: [ 192.92971802 142.83534241 180.18870544]"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"def array_to_img(x, channels, size):\n",
" x = x.reshape((channels,size,size)).transpose((1,2,0)) # shape (H,W,C)\n",
" img = Image.fromarray(x.astype(np.uint8), 'RGB')\n",
" return img\n",
"def img_to_array(img):\n",
" x = np.asarray(img).astype(np.float64) # shape (H,W,C)\n",
" x = x.transpose(2,0,1).ravel() # shape (C*H*W)\n",
" return x"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"def filter_empty(row, beta=0.15, light_intensity=240):\n",
" x = row.sample.values\n",
"# x = array_to_img(x, channels, size)\n",
" x = x.reshape((channels,size,size)).transpose((1,2,0)) # shape (H,W,C)\n",
" h, w, c = x.shape\n",
" x = x.reshape(-1, c) # shape (H*W, C)\n",
" OD = -np.log((x+1)/light_intensity) # shape (H*W, C)\n",
" # Remove data with OD intensity less than beta.\n",
" # I.e. remove transparent pixels.\n",
" OD_thresh = OD[np.all(OD >= beta, 1), :]\n",
" return OD_thresh.size > 2*c"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Filter ~empty samples.\n",
"train_rdd = train_df.rdd.filter(filter_empty)\n",
"val_rdd = val_df.rdd.filter(filter_empty)"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Sanity checks\n",
"# first = train_df.first()\n",
"# s = first.sample.values\n",
"# i = array_to_img(s, channels, size)\n",
"# s2 = img_to_array(i)\n",
"# assert np.allclose(s, s2)\n",
"# def assert_finite(row):\n",
"# x = row.sample.values\n",
"# x = x.reshape((channels,size,size)).transpose((1,2,0)) \n",
"# h, w, c = x.shape\n",
"# x = x.reshape(-1, c).astype(np.float64)\n",
"# OD = -np.log((x+1)/240)\n",
"# OD_thresh = OD[np.all(OD >= 0.15, 1), :]\n",
"# assert np.all(np.isfinite(OD_thresh.T))\n",
"# train_df.rdd.foreach(assert_finite)"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"def compute_channel_means(rdd, channels, size):\n",
" \"\"\"Compute the means of each color channel across the dataset.\"\"\"\n",
" def helper(x):\n",
" x = x.sample.values\n",
"# x = array_to_img(x, channels, size)\n",
" x = x.reshape((channels,size,size)).transpose((1,2,0)) # shape (H,W,C)\n",
" x = normalize_staining(x)\n",
" x = np.asarray(x).astype(np.float64) # shape (H,W,C)\n",
" mu = np.mean(x, axis=(0,1))\n",
" return mu\n",
" means =\n",
" means = np.array(means)\n",
" means = np.mean(means, axis=0)\n",
" return means"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true,
"scrolled": false
"outputs": [],
"source": [
"tr_means = compute_channel_means(train_rdd, channels, size)\n",
"val_means = compute_channel_means(val_rdd, channels, size)\n",
"print(tr_means, val_means)\n",
"# Means: [194.27633667 145.3067627 181.27861023]\n",
"# Means with norm: train [189.54944625 152.73427159 176.89543273] val [187.45282379 150.25695602 175.23754894]\n",
"# Means with norm on updated data:\n",
"# [ 177.27269518 136.06809866 165.07305029] [ 176.21991047 134.39199187 163.81433421]\n",
"# Means with norm on updated data v3:\n",
"# [ 183.36777842 138.81743141 166.07406199] [ 182.41870536 137.15523608 164.81227273]"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Save every image as a JPEG"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"def helper(row, channels, size, save_dir):\n",
" tumor_score = row.tumor_score\n",
" sample = row.sample.values\n",
"# img = array_to_img(sample, channels, size)\n",
" x = sample.reshape((channels,size,size)).transpose((1,2,0)) # shape (H,W,C)\n",
" x = normalize_staining(x)\n",
" img = Image.fromarray(x.astype(np.uint8), 'RGB')\n",
" filename = '{index}_{slide_num}_{hash}.jpeg'.format(\n",
" index=row[\"__INDEX\"], slide_num=row.slide_num, hash=np.random.randint(1e4))\n",
" class_dir = os.path.join(save_dir, str(tumor_score))\n",
" path = os.path.join(class_dir, filename)\n",
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"tr_save_dir = \"images/{stage}/{p}\".format(stage=\"train_updated_norm_v3\", p=p)\n",
"val_save_dir = \"images/{stage}/{p}\".format(stage=\"val_updated_norm_v3\", p=val_p)\n",
"print(tr_save_dir, val_save_dir)"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"%%bash -s \"$tr_save_dir\" \"$val_save_dir\"\n",
"for i in 1 2 3\n",
" sudo mkdir -p $1/$i\n",
" sudo mkdir -p $2/$i\n",
"sudo chmod 777 -R $1\n",
"sudo chmod 777 -R $2"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Note: Use this if the DataFrame doesn't have an __INDEX column yet.\n",
"# train_df = train_df.withColumn(\"__INDEX\", F.monotonically_increasing_id())\n",
"# val_df = val_df.withColumn(\"__INDEX\", F.monotonically_increasing_id())"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"train_df.rdd.filter(filter_empty).foreach(lambda row: helper(row, channels, size, tr_save_dir))\n",
"val_df.rdd.filter(filter_empty).foreach(lambda row: helper(row, channels, size, val_save_dir))"
"cell_type": "markdown",
"metadata": {},
"source": [
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"def show_random_image(save_dir):\n",
" c = np.random.randint(1, 4)\n",
" class_dir = os.path.join(save_dir, str(c))\n",
" files = os.listdir(class_dir)\n",
" i = np.random.randint(0, len(files))\n",
" fname = os.path.join(class_dir, files[i])\n",
" print(fname)\n",
" img =\n",
" plt.imshow(img)"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"metadata": {
"kernelspec": {
"display_name": "Python 3 + Spark 2.x + SystemML",
"language": "python",
"name": "pyspark3_2.x"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.2"
"nbformat": 4,
"nbformat_minor": 1