blob: ed647b9c5149fda149f21d7f40fe0b34a5e01c46 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Predicting Breast Cancer Proliferation Scores with Apache Spark and Apache SystemML\n",
"## Preprocessing\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"%matplotlib inline\n",
"\n",
"import math\n",
"\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"import openslide\n",
"from openslide.deepzoom import DeepZoomGenerator\n",
"import pandas as pd\n",
"from pyspark.mllib.linalg import Vectors\n",
"from scipy.ndimage.morphology import binary_fill_holes\n",
"from skimage.color import rgb2gray\n",
"from skimage.feature import canny\n",
"from skimage.morphology import binary_closing, binary_dilation, disk\n",
"\n",
"plt.rcParams['figure.figsize'] = (10, 6)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Open Whole-Slide Image"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def open_slide(slide_num, folder, training):\n",
" \"\"\"\n",
" Open a whole-slide image, given an image number.\n",
" \n",
" Args:\n",
" slide_num: Slide image number as an integer.\n",
" folder: Directory in which the slides folder is stored, as a string.\n",
" This should contain either a `training_image_data` folder with\n",
" images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
" folder with images in the format `TUPAC-TE-###.svs`.\n",
" training: Boolean for training or testing datasets.\n",
" \n",
" Returns:\n",
" An OpenSlide object representing a whole-slide image.\n",
" \"\"\"\n",
" if training:\n",
" filename = os.path.join(folder, \"training_image_data\", \"TUPAC-TR-{}.svs\".format(str(slide_num).zfill(3)))\n",
" else:\n",
" # Testing images\n",
" filename = os.path.join(folder, \"testing_image_data\", \"TUPAC-TE-{}.svs\".format(str(slide_num).zfill(3)))\n",
" slide = openslide.open_slide(filename)\n",
" return slide"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create Tile Generator"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def create_tile_generator(slide, tile_size, overlap):\n",
" \"\"\"\n",
" Create a tile generator for the given slide.\n",
" \n",
" This generator is able to extract tiles from the overall\n",
" whole-slide image.\n",
" \n",
" Args:\n",
" slide: An OpenSlide object representing a whole-slide image.\n",
" tile_size: The width and height of a square tile to be generated.\n",
" overlap: Number of pixels by which to overlap the tiles.\n",
" \n",
" Returns:\n",
" A DeepZoomGenerator object representing the tile generator. Each\n",
" extracted tile is an Image with shape (tile_size, tile_size, channels).\n",
" Note: This generator is not a true \"Python generator function\", but\n",
" rather is an object that is capable of extracting individual tiles.\n",
" \"\"\"\n",
" generator = DeepZoomGenerator(slide, tile_size=tile_size, overlap=overlap, limit_bounds=True)\n",
" return generator"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Determine 20x Magnification Zoom Level"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def get_20x_zoom_level(slide, generator):\n",
" \"\"\"\n",
" Return the zoom level that corresponds to a 20x magnification.\n",
" \n",
" The generator can extract tiles from multiple zoom levels, downsampling\n",
" by a factor of 2 per level from highest to lowest resolution.\n",
" \n",
" Args:\n",
" slide: An OpenSlide object representing a whole-slide image.\n",
" generator: A DeepZoomGenerator object representing a tile generator.\n",
" Note: This generator is not a true \"Python generator function\", but\n",
" rather is an object that is capable of extracting individual tiles.\n",
" \n",
" Returns:\n",
" Zoom level corresponding to a 20x magnification, or as close as possible.\n",
" \"\"\"\n",
" highest_zoom_level = generator.level_count - 1 # 0-based indexing\n",
" try:\n",
" mag = int(slide.properties[openslide.PROPERTY_NAME_OBJECTIVE_POWER])\n",
" # `mag / 20` gives the downsampling factor between the slide's\n",
" # magnification and the desired 20x magnification.\n",
" # `(mag / 20) / 2` gives the zoom level offset from the highest\n",
" # resolution level, based on a 2x downsampling factor in the\n",
" # generator.\n",
" offset = math.floor((mag / 20) / 2)\n",
" level = highest_zoom_level - offset\n",
" except ValueError:\n",
" # In case the slide magnification level is unknown, just\n",
" # use the highest resolution.\n",
" level = highest_zoom_level\n",
" return level"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Generate Tile Indices For Whole-Slide Image."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def process_slide(slide_num, folder, training, tile_size, overlap):\n",
" \"\"\"\n",
" Generate all possible tile indices for a whole-slide image.\n",
" \n",
" Given a slide number, tile size, and overlap, generate\n",
" all possible (slide_num, tile_size, overlap, zoom_level, col, row)\n",
" indices.\n",
" \n",
" Args:\n",
" slide_num: Slide image number as an integer.\n",
" folder: Directory in which the slides folder is stored, as a string.\n",
" This should contain either a `training_image_data` folder with\n",
" images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
" folder with images in the format `TUPAC-TE-###.svs`.\n",
" training: Boolean for training or testing datasets.\n",
" tile_size: The width and height of a square tile to be generated.\n",
" overlap: Number of pixels by which to overlap the tiles.\n",
" \n",
" Returns:\n",
" A list of (slide_num, tile_size, overlap, zoom_level, col, row)\n",
" integer index tuples representing possible tiles to extract.\n",
" \"\"\"\n",
" # Open slide.\n",
" slide = open_slide(slide_num, folder, training)\n",
" # Create tile generator.\n",
" generator = create_tile_generator(slide, tile_size, overlap)\n",
" # Get 20x zoom level.\n",
" zoom_level = get_20x_zoom_level(slide, generator)\n",
" # Generate all possible (zoom_level, col, row) tile index tuples.\n",
" cols, rows = generator.level_tiles[zoom_level]\n",
" tile_indices = [(slide_num, tile_size, overlap, zoom_level, col, row)\n",
" for col in range(cols) for row in range(rows)]\n",
" return tile_indices"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Generate Tile From Tile Index"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def process_tile_index(tile_index, folder, training):\n",
" \"\"\"\n",
" Generate a tile from a tile index.\n",
" \n",
" Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile\n",
" index, generate a (slide_num, tile) tuple.\n",
" \n",
" Args:\n",
" tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row)\n",
" integer index tuple representing a tile to extract.\n",
" folder: Directory in which the slides folder is stored, as a string.\n",
" This should contain either a `training_image_data` folder with\n",
" images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
" folder with images in the format `TUPAC-TE-###.svs`.\n",
" training: Boolean for training or testing datasets.\n",
" \n",
" Returns:\n",
" A (slide_num, tile) tuple, where slide_num is an integer, and tile\n",
" is a 3D NumPy array of shape (tile_size, tile_size, channels) in\n",
" RGB format.\n",
" \"\"\"\n",
" slide_num, tile_size, overlap, zoom_level, col, row = tile_index\n",
" # Open slide.\n",
" slide = open_slide(slide_num, folder, training)\n",
" # Create tile generator.\n",
" generator = create_tile_generator(slide, tile_size, overlap)\n",
" # Generate tile\n",
" tile = np.array(generator.get_tile(zoom_level, (col, row)))\n",
" return (slide_num, tile)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Filter Tile For Dimensions & Tissue Threshold"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def keep_tile(tile_tuple, tile_size, tissue_threshold):\n",
" \"\"\"\n",
" Determine if a tile should be kept.\n",
" \n",
" This filters out tiles based on size and a tissue percentage\n",
" threshold, using a custom algorithm. If a tile has height &\n",
" width equal to (tile_size, tile_size), and contains greater\n",
" than or equal to the given percentage, then it will be kept;\n",
" otherwise it will be filtered out.\n",
" \n",
" Args:\n",
" tile_tuple: A (slide_num, tile) tuple, where slide_num is an\n",
" integer, and tile is a 3D NumPy array of shape \n",
" (tile_size, tile_size, channels) in RGB format.\n",
" tile_size: The width and height of a square tile to be generated.\n",
" tissue_threshold: Tissue percentage threshold.\n",
" \n",
" Returns:\n",
" A Boolean indicating whether or not a tile should be kept\n",
" for future usage.\n",
" \"\"\"\n",
" slide_num, tile = tile_tuple\n",
" if tile.shape[0:2] == (tile_size, tile_size):\n",
" # Convert 3D RGB image to 2D grayscale image, from\n",
" # 0 (dense tissue) to 1 (plain background).\n",
" tile = rgb2gray(tile)\n",
" # 8-bit depth complement, from 1 (dense tissue)\n",
" # to 0 (plain background).\n",
" tile = 1 - tile\n",
" # Canny edge detection with hysteresis thresholding.\n",
" # This returns a binary map of edges, with 1 equal to\n",
" # an edge. The idea is that tissue would be full of\n",
" # edges, while background would not.\n",
" tile = canny(tile)\n",
" # Binary closing, which is a dilation followed by\n",
" # an erosion. This removes small dark spots, which\n",
" # helps remove noise in the background.\n",
" tile = binary_closing(tile, disk(10))\n",
" # Binary dilation, which enlarges bright areas,\n",
" # and shrinks dark areas. This helps fill in holes\n",
" # within regions of tissue.\n",
" tile = binary_dilation(tile, disk(10))\n",
" # Fill remaining holes within regions of tissue.\n",
" tile = binary_fill_holes(tile)\n",
" # Calculate percentage of tissue coverage.\n",
" percentage = tile.mean()\n",
" return percentage >= tissue_threshold\n",
" else:\n",
" return False"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Generate Flattened Samples From Tile"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def process_tile(tile_tuple, sample_size, grayscale):\n",
" \"\"\"\n",
" Process a tile into a group of smaller samples.\n",
" \n",
" Cut up a tile into smaller blocks of sample_size x sample_size pixels,\n",
" change the shape of each sample from (H, W, channels) to \n",
" (channels, H, W), then flatten each into a vector of length\n",
" channels*H*W.\n",
" \n",
" Args:\n",
" tile_tuple: A (slide_num, tile) tuple, where slide_num is an\n",
" integer, and tile is a 3D NumPy array of shape \n",
" (tile_size, tile_size, channels).\n",
" sample_size: The new width and height of the square samples to be\n",
" generated.\n",
" grayscale: Whether or not to generate grayscale samples, rather\n",
" than RGB.\n",
" \n",
" Returns:\n",
" A list of (slide_num, sample) tuples representing cut up tiles,\n",
" where each sample has been transposed from\n",
" (sample_size_x, sample_size_y, channels) to (channels, sample_size_x, sample_size_y),\n",
" and flattened to a vector of length (channels*sample_size_x*sample_size_y).\n",
" \"\"\"\n",
" slide_num, tile = tile_tuple\n",
" if grayscale:\n",
" tile = rgb2gray(tile)[:, :, np.newaxis] # Grayscale\n",
" # Save disk space and future IO time by converting from [0,1] to [0,255],\n",
" # at the expense of some minor loss of information.\n",
" tile = np.round(tile * 255).astype(\"uint8\")\n",
" x, y, ch = tile.shape\n",
" # 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, sample_size_y, ch), where\n",
" # num_x and num_y are the number of chopped tiles on the x and y axes, respectively.\n",
" # 2. Swap sample_size_x and num_y axes to create (num_x, num_y, sample_size_x, sample_size_y, ch).\n",
" # 3. Combine num_x and num_y into single axis, returning\n",
" # (num_samples, sample_size_x, sample_size_y, ch).\n",
" # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to\n",
" # (num_samples, ch, sample_size_x, sample_size_y).\n",
" # 5. Flatten samples into (num_samples, ch*sample_size_x*sample_size_y).\n",
" samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch))\n",
" .swapaxes(1,2)\n",
" .reshape((-1, sample_size, sample_size, ch))\n",
" .transpose(0,3,1,2))\n",
" samples = samples.reshape(samples.shape[0], -1)\n",
" samples = [(slide_num, sample) for sample in list(samples)]\n",
" return samples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Visualize Tile"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def visualize_tile(tile):\n",
" \"\"\"\n",
" Plot a tissue tile.\n",
" \n",
" Args:\n",
" tile: A 3D NumPy array of shape (tile_size, tile_size, channels).\n",
" \n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" plt.imshow(tile)\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Visualize Sample"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def visualize_sample(sample, size=256):\n",
" \"\"\"\n",
" Plot a tissue sample.\n",
" \n",
" Args:\n",
" sample: A square sample flattened to a vector of size\n",
" (channels*size_x*size_y).\n",
" size: The width and height of the square samples.\n",
" \n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" # Change type, reshape, transpose to (size_x, size_y, channels).\n",
" length = sample.shape[0]\n",
" channels = int(length / (size * size))\n",
" if channels > 1:\n",
" sample = sample.astype('uint8').reshape((channels, size, size)).transpose(1,2,0)\n",
" plt.imshow(sample)\n",
" else:\n",
" vmax = 255 if sample.max() > 1 else 1\n",
" sample = sample.reshape((size, size))\n",
" plt.imshow(sample, cmap=\"gray\", vmin=0, vmax=vmax)\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Get Ground Truth Labels"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def create_ground_truth_maps(folder):\n",
" \"\"\"\n",
" Create lookup maps for ground truth labels.\n",
" \n",
" Args:\n",
" folder: Directory in which the slides folder is stored, as a string.\n",
" This should contain a `training_ground_truth.csv` file.\n",
" \"\"\"\n",
" filename = os.path.join(folder, \"training_ground_truth.csv\")\n",
" labels = pd.read_csv(filename, names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
" labels[\"slide_num\"] = range(1, 501)\n",
"\n",
" # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries\n",
" tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n",
" molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}\n",
" return tumor_score_dict, molecular_score_dict"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Process All Slides Into A Saved Spark DataFrame"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def preprocess(slide_nums, folder=\"data\", training=True, tile_size=1024, overlap=0,\n",
" tissue_threshold=0.9, sample_size=256, grayscale=False, num_partitions=20000):\n",
" \"\"\"\n",
" Preprocess a set of whole-slide images.\n",
" \n",
" Preprocess a set of whole-slide images as follows:\n",
" 1. Tile the slides into tiles of size (tile_size, tile_size, 3).\n",
" 2. Filter the tiles to remove unnecessary tissue.\n",
" 3. Cut the remaining tiles into samples of size (sample_size, sample_size, ch),\n",
" where `ch` is 1 if `grayscale` is true, or 3 otherwise.\n",
" \n",
" Args:\n",
" slide_nums: List of whole-slide numbers to process.\n",
" folder: Directory in which the slides folder is stored, as a string.\n",
" This should contain either a `training_image_data` folder with\n",
" images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
" folder with images in the format `TUPAC-TE-###.svs`.\n",
" training: Boolean for training or testing datasets.\n",
" tile_size: The width and height of a square tile to be generated.\n",
" overlap: Number of pixels by which to overlap the tiles.\n",
" tissue_threshold: Tissue percentage threshold for filtering.\n",
" sample_size: The new width and height of the square samples to be\n",
" generated.\n",
" grayscale: Whether or not to generate grayscale samples, rather\n",
" than RGB.\n",
" num_partitions: Number of partitions to use during processing.\n",
" \n",
" Returns:\n",
" A DataFrame in which each row contains the slide number, tumor score,\n",
" molecular score, and the sample stretched out into a Vector.\n",
" \"\"\"\n",
" slides = sc.parallelize(slide_nums)\n",
" # Force even partitioning by collecting and parallelizing -- for memory issues\n",
" ## HACK Note: This was a PySpark bug with a fix in the master branch now.\n",
" tile_indices = slides.flatMap(lambda slide: process_slide(slide, folder, training, tile_size, overlap)).collect()\n",
" tile_indices = sc.parallelize(tile_indices, num_partitions)\n",
" ## END HACK -- update later\n",
" tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))\n",
" filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))\n",
" samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))\n",
" if training:\n",
" tumor_score_dict, molecular_score_dict = create_ground_truth_maps(folder)\n",
" samples_with_labels = (samples.map(lambda tup: \n",
" (tup[0], tumor_score_dict[tup[0]], molecular_score_dict[tup[0]],\n",
" Vectors.dense(tup[1]))))\n",
" df = samples_with_labels.toDF([\"slide_num\", \"tumor_score\", \"molecular_score\", \"sample\"])\n",
" df = df.select(df.slide_num.astype(\"int\"), df.tumor_score.astype(\"int\"), df.molecular_score, df[\"sample\"])\n",
" else: # testing data -- no labels\n",
" df = samples.toDF([\"slide_num\", \"sample\"])\n",
" df = df.select(df.slide_num.astype(\"int\"), df[\"sample\"])\n",
" df = df.repartition(num_partitions) # Even out the partitions\n",
" return df\n",
"\n",
"def save(df, training=True, sample_size=256, grayscale=False, mode=\"error\"):\n",
" \"\"\"\n",
" Save a preprocessed DataFrame of samples in Parquet format.\n",
" \n",
" The filename will be formatted as follows:\n",
" `samples_{labels|testing}_SAMPLE-SIZE[_grayscale].parquet`\n",
" \n",
" Args:\n",
" df: A DataFrame in which each row contains the slide number, tumor score,\n",
" molecular score, and the sample stretched out into a Vector.\n",
" training: Boolean for training or testing datasets.\n",
" sample_size: The width and height of the square samples.\n",
" grayscale: Whether or not to the samples are in grayscale format, rather\n",
" than RGB.\n",
" mode: Specifies the behavior of `df.write.mode` when the data already exists.\n",
" Options include:\n",
" * `append`: Append contents of this :class:`DataFrame` to existing data.\n",
" * `overwrite`: Overwrite existing data.\n",
" * `error`: Throw an exception if data already exists.\n",
" * `ignore`: Silently ignore this operation if data already exists.\n",
" \"\"\"\n",
" filename = \"samples_{}_{}{}.parquet\".format(\"labels\" if training else \"testing\",\n",
" sample_size,\n",
" \"_grayscale\" if grayscale else \"\")\n",
" filepath = os.path.join(\"data\", filename)\n",
" df.write.mode(mode).save(filepath, format=\"parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Execute Preprocessing & Save"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# TODO: Filtering tiles and then cutting into samples could result\n",
"# in samples with less tissue than desired, despite that being the\n",
"# procedure of the paper. Look into simply selecting tiles of the\n",
"# desired size to begin with."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Get list of image numbers, minus the broken ones\n",
"broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}\n",
"slide_nums = sorted(set(range(1,501)) - broken)\n",
"\n",
"# Settings\n",
"training = True\n",
"tile_size = 1024\n",
"sample_size = 256\n",
"grayscale = False\n",
"num_partitions = 20000\n",
"folder = \"/home/MDM/breast_cancer/data\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Process all slides\n",
"df = preprocess(slide_nums, tile_size=tile_size, sample_size=sample_size, grayscale=grayscale,\n",
" training=training, num_partitions=num_partitions, folder=folder)\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Save DataFrame of samples\n",
"save(df, sample_size=sample_size, grayscale=grayscale, training=training)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Split Into Separate Train & Validation DataFrames Based On Slide Number"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### TODO: Wrap this in a function with appropriate default arguments"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"filename = \"samples_{}_{}{}.parquet\".format(\"labels\" if training else \"testing\",\n",
" sample_size,\n",
" \"_grayscale\" if grayscale else \"\")\n",
"filepath = os.path.join(\"data\", filename)\n",
"df = sqlContext.read.load(filepath)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"labels = pd.read_csv(\"data/training_ground_truth.csv\", names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
"labels[\"slide_num\"] = range(1, 501)\n",
"\n",
"# Create slide_num -> tumor_score and slide_num -> molecular_score dictionaries\n",
"tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n",
"molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print(labels[\"tumor_score\"].value_counts() / labels.tumor_score.count())\n",
"print(labels[labels.slide_num > 400][\"tumor_score\"].value_counts() / labels[labels.slide_num > 400].tumor_score.count())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"train = (df.where(df.slide_num <= 400)\n",
" .rdd\n",
" .zipWithIndex()\n",
" .map(lambda r: (r[1] + 1, *r[0]))\n",
" .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
"train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), train.tumor_score.astype(\"int\"),\n",
" train.molecular_score, train[\"sample\"])\n",
"\n",
"val = (df.where(df.slide_num > 400)\n",
" .rdd\n",
" .zipWithIndex()\n",
" .map(lambda r: (r[1] + 1, *r[0]))\n",
" .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
"val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"), val.tumor_score.astype(\"int\"),\n",
" val.molecular_score, val[\"sample\"])\n",
"\n",
"train, val"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Write\n",
"# TODO: Wrap this in a function with appropriate default arguments\n",
"mode = \"error\"\n",
"tr_filename = os.path.join(\"data\", \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n",
"val_filename = os.path.join(\"data\", \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n",
"train.write.mode(mode).save(tr_filename, format=\"parquet\")\n",
"val.write.mode(mode).save(val_filename, format=\"parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sample Data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### TODO: Wrap this in a function with appropriate default arguments"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tr_filename = os.path.join(\"data\", \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n",
"val_filename = os.path.join(\"data\", \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n",
"train = sqlContext.read.load(tr_filename)\n",
"val = sqlContext.read.load(val_filename)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Take a stratified sample\n",
"p=0.01\n",
"train_sample = train.drop(\"__INDEX\").sampleBy(\"tumor_score\", fractions={1: p, 2: p, 3: p}, seed=42)\n",
"val_sample = val.drop(\"__INDEX\").sampleBy(\"tumor_score\", fractions={1: p, 2: p, 3: p}, seed=42)\n",
"train_sample, val_sample"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# TODO: turn this into a function\n",
"# Repartition to get ~128MB partitions\n",
"\n",
"# TODO: Update logic to use the following to automatically\n",
"# select the number of partitions:\n",
"# ex_mb = SIZE*SIZE*CHANNELS * 8 / 1024 / 1024 # size of one example in MB\n",
"# ideal_part_size_mb = 128 # 128 MB partitions sizes are empirically ideal\n",
"# ideal_exs_per_part = round(ideal_part_size_mb / ex_mb)\n",
"# tr_parts = round(tc / ideal_exs_per_part)\n",
"# val_parts = round(vc / ideal_exs_per_part)\n",
"\n",
"if grayscale:\n",
" train_sample = train_sample.repartition(150) #300) #3000)\n",
" val_sample = val_sample.repartition(40) #80) #800)\n",
"else: # 3x\n",
" train_sample = train_sample.repartition(450) #900) #9000)\n",
" val_sample = val_sample.repartition(120) #240) #2400)\n",
"\n",
"# Reassign row indices\n",
"train_sample = (\n",
" train_sample.rdd\n",
" .zipWithIndex()\n",
" .map(lambda r: (r[1] + 1, *r[0]))\n",
" .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
"train_sample = train_sample.select(train_sample[\"__INDEX\"].astype(\"int\"), train_sample.slide_num.astype(\"int\"), \n",
" train_sample.tumor_score.astype(\"int\"), train_sample.molecular_score, train_sample[\"sample\"])\n",
"\n",
"val_sample = (\n",
" val_sample.rdd\n",
" .zipWithIndex()\n",
" .map(lambda r: (r[1] + 1, *r[0]))\n",
" .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
"val_sample = val_sample.select(val_sample[\"__INDEX\"].astype(\"int\"), val_sample.slide_num.astype(\"int\"), \n",
" val_sample.tumor_score.astype(\"int\"), val_sample.molecular_score, val_sample[\"sample\"])\n",
"\n",
"train_sample, val_sample\n",
"\n",
"# Write\n",
"# TODO: Wrap this in a function with appropriate default arguments\n",
"mode = \"error\"\n",
"tr_sample_filename = os.path.join(\"data\", \"train_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\"))\n",
"val_sample_filename = os.path.join(\"data\", \"val_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\"))\n",
"train_sample.write.mode(mode).save(tr_sample_filename, format=\"parquet\")\n",
"val_sample.write.mode(mode).save(val_sample_filename, format=\"parquet\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.0"
}
},
"nbformat": 4,
"nbformat_minor": 1
}