blob: 71055adb59e7380270827a336647fa256b39e67c [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
"""
Preprocess -- Predicting Breast Cancer Proliferation Scores with
Apache SystemML
This script runs the preprocessing phase of the breast cancer project.
"""
import os
import shutil
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from pyspark.sql import SparkSession
from breastcancer.preprocessing import add_row_indices, get_labels_df, preprocess, save, sample
# Create new SparkSession
spark = (SparkSession.builder
.appName("Breast Cancer -- Preprocessing")
.getOrCreate())
# Ship a fresh copy of the `breastcancer` package to the Spark workers.
# Note: The zip must include the `breastcancer` directory itself,
# as well as all files within it for `addPyFile` to work correctly.
# This is equivalent to `zip -r breastcancer.zip breastcancer`.
dirname = "breastcancer"
zipname = dirname + ".zip"
shutil.make_archive(dirname, 'zip', dirname + "/..", dirname)
spark.sparkContext.addPyFile(zipname)
# Execute Preprocessing & Save
# TODO: Filtering tiles and then cutting into samples could result
# in samples with less tissue than desired, despite that being the
# procedure of the paper. Look into simply selecting tiles of the
# desired size to begin with.
# Settings
# TODO: Convert this to a set of parsed command line arguments
tile_size = 256
sample_size = 256
grayscale = False
num_partitions = 20000
training = True
row_indices = False
train_frac = 0.8
sample_frac=0.01
seed = 42
folder = "data" # Linux-filesystem directory to read raw WSI data
save_folder = "data" # Hadoop-supported directory in which to save DataFrames
train_df_path = os.path.join(save_folder, "train_{}{}.parquet".format(sample_size,
"_grayscale" if grayscale else ""))
val_df_path = os.path.join(save_folder, "val_{}{}.parquet".format(sample_size,
"_grayscale" if grayscale else ""))
train_sample_path = os.path.join(save_folder, "train_{}_sample_{}{}.parquet".format(sample_frac,
sample_size, "_grayscale" if grayscale else ""))
val_sample_path = os.path.join(save_folder, "val_{}_sample_{}{}.parquet".format(sample_frac,
sample_size, "_grayscale" if grayscale else ""))
# Get labels
labels_df = get_labels_df(folder)
# Split into train and validation sets based on slide number, stratified by class
train, val = train_test_split(labels_df, train_size=train_frac, stratify=labels_df['tumor_score'],
random_state=seed)
# Process train & val slides
train_df = preprocess(spark, train.index, tile_size=tile_size, sample_size=sample_size,
grayscale=grayscale, num_partitions=num_partitions, folder=folder)
val_df = preprocess(spark, val.index, tile_size=tile_size, sample_size=sample_size,
grayscale=grayscale, num_partitions=num_partitions, folder=folder)
if row_indices:
# Add row indices
train_df = add_row_indices(train_df)
val_df = add_row_indices(val_df)
# Save train & val DataFrames
save(train_df, train_df_path, sample_size, grayscale)
save(val_df, val_df_path, sample_size, grayscale)
if sample_frac > 0:
# Sample Data
train_df = spark.read.load(train_df_path)
val_df = spark.read.load(val_df_path)
train_sample = sample(train_df, sample_frac, seed)
val_sample = sample(val_df, sample_frac, seed)
# Save sampled DataFrames.
save(train_sample, train_sample_path, sample_size, grayscale)
save(val_sample, val_sample_path, sample_size, grayscale)