blob: 0e4e91f0560d93f5737ac1a8d9f8f3e1f7b8baba [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
"""
Preprocessing -- Predicting Breast Cancer Proliferation Scores with
Apache SystemML
This module contains functions for the preprocessing phase of the
breast cancer project.
"""
import math
import os
import numpy as np
import openslide
from openslide import OpenSlideError
from openslide.deepzoom import DeepZoomGenerator
import pandas as pd
from pyspark.ml.linalg import Vectors
import pyspark.sql.functions as F
from scipy.ndimage.morphology import binary_fill_holes
from skimage.color import rgb2gray
from skimage.feature import canny
from skimage.morphology import binary_closing, binary_dilation, disk
# Open Whole-Slide Image
def open_slide(slide_num, folder, training):
"""
Open a whole-slide image, given an image number.
Args:
slide_num: Slide image number as an integer.
folder: Directory in which the slides folder is stored, as a string.
This should contain either a `training_image_data` folder with
images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
folder with images in the format `TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
Returns:
An OpenSlide object representing a whole-slide image.
"""
if training:
filename = os.path.join(folder, "training_image_data",
"TUPAC-TR-{}.svs".format(str(slide_num).zfill(3)))
else:
# Testing images
filename = os.path.join(folder, "testing_image_data",
"TUPAC-TE-{}.svs".format(str(slide_num).zfill(3)))
try:
slide = openslide.open_slide(filename)
except OpenSlideError:
slide = None
except FileNotFoundError:
slide = None
return slide
# Create Tile Generator
def create_tile_generator(slide, tile_size, overlap):
"""
Create a tile generator for the given slide.
This generator is able to extract tiles from the overall
whole-slide image.
Args:
slide: An OpenSlide object representing a whole-slide image.
tile_size: The width and height of a square tile to be generated.
overlap: Number of pixels by which to overlap the tiles.
Returns:
A DeepZoomGenerator object representing the tile generator. Each
extracted tile is a PIL Image with shape
(tile_size, tile_size, channels).
Note: This generator is not a true "Python generator function", but
rather is an object that is capable of extracting individual tiles.
"""
generator = DeepZoomGenerator(slide, tile_size=tile_size, overlap=overlap, limit_bounds=True)
return generator
# Determine 20x Magnification Zoom Level
def get_20x_zoom_level(slide, generator):
"""
Return the zoom level that corresponds to a 20x magnification.
The generator can extract tiles from multiple zoom levels,
downsampling by a factor of 2 per level from highest to lowest
resolution.
Args:
slide: An OpenSlide object representing a whole-slide image.
generator: A DeepZoomGenerator object representing a tile generator.
Note: This generator is not a true "Python generator function",
but rather is an object that is capable of extracting individual
tiles.
Returns:
Zoom level corresponding to a 20x magnification, or as close as
possible.
"""
highest_zoom_level = generator.level_count - 1 # 0-based indexing
try:
mag = int(slide.properties[openslide.PROPERTY_NAME_OBJECTIVE_POWER])
# `mag / 20` gives the downsampling factor between the slide's
# magnification and the desired 20x magnification.
# `(mag / 20) / 2` gives the zoom level offset from the highest
# resolution level, based on a 2x downsampling factor in the
# generator.
offset = math.floor((mag / 20) / 2)
level = highest_zoom_level - offset
except ValueError:
# In case the slide magnification level is unknown, just
# use the highest resolution.
level = highest_zoom_level
return level
# Generate Tile Indices For Whole-Slide Image.
def process_slide(slide_num, folder, training, tile_size, overlap):
"""
Generate all possible tile indices for a whole-slide image.
Given a slide number, tile size, and overlap, generate
all possible (slide_num, tile_size, overlap, zoom_level, col, row)
indices.
Args:
slide_num: Slide image number as an integer.
folder: Directory in which the slides folder is stored, as a string.
This should contain either a `training_image_data` folder with
images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
folder with images in the format `TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
tile_size: The width and height of a square tile to be generated.
overlap: Number of pixels by which to overlap the tiles.
Returns:
A list of (slide_num, tile_size, overlap, zoom_level, col, row)
integer index tuples representing possible tiles to extract.
"""
# Open slide.
slide = open_slide(slide_num, folder, training)
# Create tile generator.
generator = create_tile_generator(slide, tile_size, overlap)
# Get 20x zoom level.
zoom_level = get_20x_zoom_level(slide, generator)
# Generate all possible (zoom_level, col, row) tile index tuples.
cols, rows = generator.level_tiles[zoom_level]
tile_indices = [(slide_num, tile_size, overlap, zoom_level, col, row)
for col in range(cols) for row in range(rows)]
return tile_indices
# Generate Tile From Tile Index
def process_tile_index(tile_index, folder, training):
"""
Generate a tile from a tile index.
Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile
index, generate a (slide_num, tile) tuple.
Args:
tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row)
integer index tuple representing a tile to extract.
folder: Directory in which the slides folder is stored, as a string.
This should contain either a `training_image_data` folder with
images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
folder with images in the format `TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
Returns:
A (slide_num, tile) tuple, where slide_num is an integer, and tile
is a 3D NumPy array of shape (tile_size, tile_size, channels) in
RGB format.
"""
slide_num, tile_size, overlap, zoom_level, col, row = tile_index
# Open slide.
slide = open_slide(slide_num, folder, training)
# Create tile generator.
generator = create_tile_generator(slide, tile_size, overlap)
# Generate tile.
tile = np.asarray(generator.get_tile(zoom_level, (col, row)))
return (slide_num, tile)
# Filter Tile For Dimensions & Tissue Threshold
def optical_density(tile):
"""
Convert a tile to optical density values.
Args:
tile: A 3D NumPy array of shape (tile_size, tile_size, channels).
Returns:
A 3D NumPy array of shape (tile_size, tile_size, channels)
representing optical density values.
"""
tile = tile.astype(np.float64)
#od = -np.log10(tile/255 + 1e-8)
od = -np.log((tile+1)/240)
return od
def keep_tile(tile_tuple, tile_size, tissue_threshold):
"""
Determine if a tile should be kept.
This filters out tiles based on size and a tissue percentage
threshold, using a custom algorithm. If a tile has height &
width equal to (tile_size, tile_size), and contains greater
than or equal to the given percentage, then it will be kept;
otherwise it will be filtered out.
Args:
tile_tuple: A (slide_num, tile) tuple, where slide_num is an
integer, and tile is a 3D NumPy array of shape
(tile_size, tile_size, channels).
tile_size: The width and height of a square tile to be generated.
tissue_threshold: Tissue percentage threshold.
Returns:
A Boolean indicating whether or not a tile should be kept for
future usage.
"""
slide_num, tile = tile_tuple
if tile.shape[0:2] == (tile_size, tile_size):
tile_orig = tile
# Check 1
# Convert 3D RGB image to 2D grayscale image, from
# 0 (dense tissue) to 1 (plain background).
tile = rgb2gray(tile)
# 8-bit depth complement, from 1 (dense tissue)
# to 0 (plain background).
tile = 1 - tile
# Canny edge detection with hysteresis thresholding.
# This returns a binary map of edges, with 1 equal to
# an edge. The idea is that tissue would be full of
# edges, while background would not.
tile = canny(tile)
# Binary closing, which is a dilation followed by
# an erosion. This removes small dark spots, which
# helps remove noise in the background.
tile = binary_closing(tile, disk(10))
# Binary dilation, which enlarges bright areas,
# and shrinks dark areas. This helps fill in holes
# within regions of tissue.
tile = binary_dilation(tile, disk(10))
# Fill remaining holes within regions of tissue.
tile = binary_fill_holes(tile)
# Calculate percentage of tissue coverage.
percentage = tile.mean()
check1 = percentage >= tissue_threshold
# Check 2
# Convert to optical density values
tile = optical_density(tile_orig)
# Threshold at beta
beta = 0.15
tile = np.min(tile, axis=2) >= beta
# Apply morphology for same reasons as above.
tile = binary_closing(tile, disk(2))
tile = binary_dilation(tile, disk(2))
tile = binary_fill_holes(tile)
percentage = tile.mean()
check2 = percentage >= tissue_threshold
return check1 and check2
else:
return False
# Generate Samples From Tile
def process_tile(tile_tuple, sample_size, grayscale):
"""
Process a tile into a group of smaller samples.
Cut up a tile into smaller blocks of sample_size x sample_size pixels,
change the shape of each sample from (H, W, channels) to
(channels, H, W), then flatten each into a vector of length
channels*H*W.
Args:
tile_tuple: A (slide_num, tile) tuple, where slide_num is an
integer, and tile is a 3D NumPy array of shape
(tile_size, tile_size, channels).
sample_size: The new width and height of the square samples to be
generated.
grayscale: Whether or not to generate grayscale samples, rather
than RGB.
Returns:
A list of (slide_num, sample) tuples representing cut up tiles,
where each sample is a 3D NumPy array of shape
(sample_size_x, sample_size_y, channels).
"""
slide_num, tile = tile_tuple
if grayscale:
tile = rgb2gray(tile)[:, :, np.newaxis] # Grayscale
# Save disk space and future IO time by converting from [0,1] to [0,255],
# at the expense of some minor loss of information.
tile = np.round(tile * 255).astype("uint8")
x, y, ch = tile.shape
# 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, sample_size_y, ch), where
# num_x and num_y are the number of chopped tiles on the x and y axes, respectively.
# 2. Swap sample_size_x and num_y axes to create
# (num_x, num_y, sample_size_x, sample_size_y, ch).
# 3. Combine num_x and num_y into single axis, returning
# (num_samples, sample_size_x, sample_size_y, ch).
samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch))
.swapaxes(1,2)
.reshape((-1, sample_size, sample_size, ch)))
samples = [(slide_num, sample) for sample in list(samples)]
return samples
# Normalize staining
def normalize_staining(sample_tuple, beta=0.15, alpha=1, light_intensity=255):
"""
Normalize the staining of H&E histology slides.
This function normalizes the staining of H&E histology slides.
References:
- Macenko, Marc, et al. "A method for normalizing histology slides
for quantitative analysis." Biomedical Imaging: From Nano to Macro,
2009. ISBI'09. IEEE International Symposium on. IEEE, 2009.
- http://wwwx.cs.unc.edu/~mn/sites/default/files/macenko2009.pdf
- https://github.com/mitkovetta/staining-normalization
Args:
sample_tuple: A (slide_num, sample) tuple, where slide_num is an
integer, and sample is a 3D NumPy array of shape (H,W,C).
Returns:
A (slide_num, sample) tuple, where the sample is a 3D NumPy array
of shape (H,W,C) that has been stain normalized.
"""
# Setup.
slide_num, sample = sample_tuple
x = np.asarray(sample)
h, w, c = x.shape
x = x.reshape(-1, c).astype(np.float64) # shape (H*W, C)
# Reference stain vectors and stain saturations. We will normalize all slides
# to these references. To create these, grab the stain vectors and stain
# saturations from a desirable slide.
# Values in reference implementation for use with eigendecomposition approach, natural log,
# and `light_intensity=240`.
#stain_ref = np.array([0.5626, 0.2159, 0.7201, 0.8012, 0.4062, 0.5581]).reshape(3,2)
#max_sat_ref = np.array([1.9705, 1.0308]).reshape(2,1)
# SVD w/ log10, and `light_intensity=255`.
stain_ref = (np.array([0.54598845, 0.322116, 0.72385198, 0.76419107, 0.42182333, 0.55879629])
.reshape(3,2))
max_sat_ref = np.array([0.82791151, 0.61137274]).reshape(2,1)
# Convert RGB to OD.
# Note: The original paper used log10, and the reference implementation used the natural log.
#OD = -np.log((x+1)/light_intensity) # shape (H*W, C)
OD = -np.log10(x/light_intensity + 1e-8)
# Remove data with OD intensity less than beta.
# I.e. remove transparent pixels.
# Note: This needs to be checked per channel, rather than
# taking an average over all channels for a given pixel.
OD_thresh = OD[np.all(OD >= beta, 1), :] # shape (K, C)
# Calculate eigenvectors.
# Note: We can either use eigenvector decomposition, or SVD.
#eigvals, eigvecs = np.linalg.eig(np.cov(OD_thresh.T)) # np.cov results in inf/nans
U, s, V = np.linalg.svd(OD_thresh, full_matrices=False)
# Extract two largest eigenvectors.
# Note: We swap the sign of the eigvecs here to be consistent
# with other implementations. Both +/- eigvecs are valid, with
# the same eigenvalue, so this is okay.
#top_eigvecs = eigvecs[:, np.argsort(eigvals)[-2:]] * -1
top_eigvecs = V[0:2, :].T * -1 # shape (C, 2)
# Project thresholded optical density values onto plane spanned by
# 2 largest eigenvectors.
proj = np.dot(OD_thresh, top_eigvecs) # shape (K, 2)
# Calculate angle of each point wrt the first plane direction.
# Note: the parameters are `np.arctan2(y, x)`
angles = np.arctan2(proj[:, 1], proj[:, 0]) # shape (K,)
# Find robust extremes (a and 100-a percentiles) of the angle.
min_angle = np.percentile(angles, alpha)
max_angle = np.percentile(angles, 100-alpha)
# Convert min/max vectors (extremes) back to optimal stains in OD space.
# This computes a set of axes for each angle onto which we can project
# the top eigenvectors. This assumes that the projected values have
# been normalized to unit length.
extreme_angles = np.array(
[[np.cos(min_angle), np.cos(max_angle)],
[np.sin(min_angle), np.sin(max_angle)]]
) # shape (2,2)
stains = np.dot(top_eigvecs, extreme_angles) # shape (C, 2)
# Merge vectors with hematoxylin first, and eosin second, as a heuristic.
if stains[0, 0] < stains[0, 1]:
stains[:, [0, 1]] = stains[:, [1, 0]] # swap columns
# Calculate saturations of each stain.
# Note: Here, we solve
# OD = VS
# S = V^{-1}OD
# where `OD` is the matrix of optical density values of our image,
# `V` is the matrix of stain vectors, and `S` is the matrix of stain
# saturations. Since this is an overdetermined system, we use the
# least squares solver, rather than a direct solve.
sats, _, _, _ = np.linalg.lstsq(stains, OD.T)
# Normalize stain saturations to have same pseudo-maximum based on
# a reference max saturation.
max_sat = np.percentile(sats, 99, axis=1, keepdims=True)
sats = sats / max_sat * max_sat_ref
# Compute optimal OD values.
OD_norm = np.dot(stain_ref, sats)
# Recreate image.
# Note: If the image is immediately converted to uint8 with `.astype(np.uint8)`, it will
# not return the correct values due to the initital values being outside of [0,255].
# To fix this, we round to the nearest integer, and then clip to [0,255], which is the
# same behavior as Matlab.
#x_norm = np.exp(OD_norm) * light_intensity # natural log approach
x_norm = 10**(-OD_norm) * light_intensity - 1e-8 # log10 approach
x_norm = np.clip(np.round(x_norm), 0, 255).astype(np.uint8)
x_norm = x_norm.astype(np.uint8)
x_norm = x_norm.T.reshape(h,w,c)
return (slide_num, x_norm)
def flatten_sample(sample_tuple):
"""
Flatten a (H,W,C) sample into a (C*H*W) row vector.
Transpose each sample from (H, W, channels) to (channels, H, W), then
flatten each into a vector of length channels*H*W.
Args:
sample_tuple: A (slide_num, sample) tuple, where slide_num is an
integer, and sample is a 3D NumPy array of shape (H,W,C).
Returns:
A (slide_num, sample) tuple, where the sample has been transposed
from (H,W,C) to (C,H,W), and flattened to a vector of length
(C*H*W).
"""
slide_num, sample = sample_tuple
# 1. Swap axes from (sample_size_x, sample_size_y, ch) to
# (ch, sample_size_x, sample_size_y).
# 2. Flatten sample into (ch*sample_size_x*sample_size_y).
flattened_sample = sample.transpose(2,0,1).reshape(-1)
return (slide_num, flattened_sample)
# Get Ground Truth Labels
def get_labels_df(folder, filename="training_ground_truth.csv"):
"""
Create a DataFrame with the ground truth labels for each slide.
Args:
folder: Directory containing a `training_ground_truth.csv` file
containing the ground truth "tumor_score" and "molecular_score"
labels for each slide.
Returns:
A Pandas DataFrame containing the ground truth labels for each
slide.
"""
filepath = os.path.join(folder, filename)
labels_df = pd.read_csv(filepath, names=["tumor_score", "molecular_score"], header=None)
labels_df["slide_num"] = labels_df.index + 1 # slide numbering starts at 1
labels_df.set_index("slide_num", drop=False, inplace=True) # use the slide num as index
return labels_df
# Process All Slides Into A Spark DataFrame
def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):
"""
Preprocess a set of whole-slide images.
Preprocess a set of whole-slide images as follows:
1. Tile the slides into tiles of size (tile_size, tile_size, 3).
2. Filter the tiles to remove unnecessary tissue.
3. Cut the remaining tiles into samples of size
(sample_size, sample_size, ch), where `ch` is 1 if `grayscale`
is true, or 3 otherwise.
Args:
spark: SparkSession.
slide_nums: List of whole-slide numbers to process.
folder: Local directory in which the slides folder and ground truth
file is stored, as a string. This should contain a
`training_image_data` folder with images in the format
`TUPAC-TR-###.svs`, as well as a `training_ground_truth.csv` file
containing the ground truth "tumor_score" and "molecular_score"
labels for each slide. Alternatively, the folder should contain a
`testing_image_data` folder with images in the format
`TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
tile_size: The width and height of a square tile to be generated.
overlap: Number of pixels by which to overlap the tiles.
tissue_threshold: Tissue percentage threshold for filtering.
sample_size: The new width and height of the square samples to be
generated.
grayscale: Whether or not to generate grayscale samples, rather
than RGB.
normalize_stains: Whether or not to apply stain normalization.
num_partitions: Number of partitions to use during processing.
Returns:
A Spark DataFrame in which each row contains the slide number, tumor
score, molecular score, and the sample stretched out into a Vector.
"""
# Filter out broken slides
# Note: "Broken" here is due to a "version of OpenJPEG with broken support for chroma-subsampled
# images".
slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
# Create DataFrame of all tile locations and increase number of partitions
# to avoid OOM during subsequent processing.
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
# TODO: Explore computing the ideal paritition sizes based on projected number
# of tiles after filtering. I.e. something like the following:
#rows = tile_indices.count()
#part_size = 128
#channels = 1 if grayscale else 3
#row_mb = tile_size * tile_size * channels * 8 / 1024 / 1024 # size of one row in MB
#rows_per_part = round(part_size / row_mb)
#num_parts = rows / rows_per_part
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()
# Extract all tiles into an RDD, filter, cut into smaller samples, apply stain
# normalization, and flatten.
tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))
samples = samples.map(lambda sample: flatten_sample(sample))
# Convert to a DataFrame
if training:
# Append labels
labels_df = get_labels_df(folder)
samples_with_labels = (samples.map(
lambda tup: (int(tup[0]), int(labels_df.at[tup[0],"tumor_score"]),
float(labels_df.at[tup[0],"molecular_score"]), Vectors.dense(tup[1]))))
df = samples_with_labels.toDF(["slide_num", "tumor_score", "molecular_score", "sample"])
df = df.select(df.slide_num.astype("int"), df.tumor_score.astype("int"),
df.molecular_score, df["sample"])
else: # testing data -- no labels
df = samples.toDF(["slide_num", "sample"])
df = df.select(df.slide_num.astype("int"), df["sample"])
return df
# Save DataFrame
def save(df, filepath, sample_size, grayscale, mode="error", format="parquet", file_size=128):
"""
Save a preprocessed DataFrame with a constraint on the file sizes.
Args:
df: A Spark DataFrame.
filepath: Hadoop-supported path at which to save `df`.
sample_size: The width and height of the square samples.
grayscale: Whether or not to the samples are in grayscale format,
rather than RGB.
mode: Specifies the behavior of `df.write.mode` when the data
already exists. Options include:
* `append`: Append contents of this DataFrame to
existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already
exists.
format: The format in which to save the DataFrame.
file_size: Size in MB of each saved file. 128 MB is an
empirically ideal size.
"""
channels = 1 if grayscale else 3
row_mb = sample_size * sample_size * channels * 8 / 1024 / 1024 # size of one row in MB
rows_per_file = round(file_size / row_mb)
df.write.option("maxRecordsPerFile", rows_per_file).mode(mode).save(filepath, format=format)
# Utilities
def add_row_indices(df, training=True):
"""
Add a row index column for faster data ingestion times with SystemML.
Args:
df: A Spark DataFrame in which each row contains the slide number,
tumor score, molecular score, and the sample stretched out into a
Vector.
training: Boolean for training or testing datasets.
Returns:
The Spark DataFrame with a row index column called "__INDEX".
"""
rdd = (df.rdd
.zipWithIndex()
.map(lambda r: (r[1] + 1, *r[0]))) # flatten & convert index to 1-based indexing
if training:
df = rdd.toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample'])
df = df.select(df["__INDEX"].astype("int"), df.slide_num.astype("int"),
df.tumor_score.astype("int"), df.molecular_score, df["sample"])
else: # testing data -- no labels
df = rdd.toDF(["__INDEX", "slide_num", "sample"])
df = df.select(df["__INDEX"].astype("int"), df.slide_num.astype("int"), df["sample"])
return df
def sample(df, frac, training=True, seed=None):
"""
Sample the DataFrame, stratified on the class.
Args:
df: A Spark DataFrame in which each row contains the slide number,
tumor score, molecular score, and the sample stretched out into a
Vector.
frac: Fraction of rows to keep.
training: Boolean for training or testing datasets.
seed: Random seed used for the sampling.
Returns:
A stratified sample of the original Spark DataFrame.
"""
df_sample = df.sampleBy("tumor_score", fractions={1: frac, 2: frac, 3: frac}, seed=seed)
return df_sample